前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

官方:https://liteflow.yomahub.com/pages/9bf839/

常用流程编排框架

  liteflow asyncTool JDEasyFlow disruptor
开发团队 由阿里巴巴开发,是一个开源的流程编排框架。 由阿里巴巴开发,是一个开源的异步流程处理框架。 Jdon 公司开发,是一个基于Java的流程编排框架。 LMAX Disruptor公司开发,是一个开源的高性能线程间消息传递框架。
介绍 LiteFlow 是一个非常强大的现代化的规则引擎框架,融合了编排特性和规则引擎的所有特性。如果你要对复杂业务逻辑进行新写或者重构,用LiteFlow 最合适不过。它是一个编排式的规则引擎框架,组件编排,帮助解耦业务代码,让每一个业务片段都是一个组件。 asyncTool提供了一种异步的流程处理方式,可以将耗时的操作异步化处理,提高系统的吞吐量和响应速度。它提供了一系列的异步任务节点,如异步 Java 任务、异步Shell任务、异步HTTP任务等,以及异步线程池和异步回调机制。 JDEasyFlow 提供了一系列的流程模型和任务节点,支持顺序流、条件流、循环流和并行流等。它提供了一个基于XML的流程定义语言,以及可视化的流程设计工具,还支持多种任务节点和事件处理机制。 disruptor是一个用于实现高性能和低延迟应用的消息传递框架,旨在提供快速的事件处理能力。它提供了一种高效的环形缓冲区数据结构,支持多生产者和多消费者模式,并且具有可预测的性能表现。
优点 轻量级、易用性好、灵活性强、支持多种流程模型和任务节点,同时具有良好的性能和稳定性。复杂业务流程编排、社区成熟, 异步处理能力强、性能高、灵活性强、可以适用于高并发和高吞吐量的场景。 简单、灵活、易扩展 高性能、低延迟、可预测的性能表现、灵活的事件处理机制。
缺点 开源框架较重,有一定学习成本,社区活跃度相对较低,文档和示例相对较少 相对较窄的应用场景,主要适用于处理耗时操作。 相对于其他流程编排框架,JDEasyFlow可能较为复杂,需要更多的配置和维护工作。 主要适用于线程间消息传递和事件处理的场景,可能不太适合用于流程编排。
社区活跃度 尽管社区活跃度相对较低,但该框架仍然有一个相对稳定的用户群体。 asyncTool 的社区相对较小,但仍然有一个稳定的用户群体。 JDEasyFlow 的社区相对较小,但仍然有一定的用户群体。 disruptor 是一个广受欢迎的开源框架,拥有一个活跃的社区和大量的用户群体。

一、接入 SpringBoot

1、pom.xml

<!--liteflow-->
<dependency>
    <groupId>com.yomahub</groupId>
    <artifactId>liteflow-spring-boot-starter</artifactId>
    <version>2.10.5</version>
</dependency>

2、流程节点

1)ACmp

/**
 * ACmp
 *
 * @author zhangyujin
 * @date 2023/7/24
 */
@LiteflowComponent("流程A")
@Slf4j
public class ACmp extends NodeComponent {

    @Override
    public void process() {
        log.info("[ACmp#]process");
    }
}

2)BCmp

package com.healerjean.proj.liteflow.flow;

import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.NodeComponent;
import lombok.extern.slf4j.Slf4j;

/**
 * BCmp
 *
 * @author zhangyujin
 * @date 2023/7/24
 */
@Slf4j
@LiteflowComponent("流程B")
public class BCmp extends NodeComponent {

    @Override
    public void process() {
        log.info("[BCmp#]process");
    }
}

3)CCmp

package com.healerjean.proj.liteflow.flow;

import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.NodeComponent;
import lombok.extern.slf4j.Slf4j;

/**
 * CCmp
 *
 * @author zhangyujin
 * @date 2023/7/24
 */
@Slf4j
@LiteflowComponent("流程C")
public class CCmp extends NodeComponent {

    @Override
    public void process() {
        log.info("[CCmp#]process");
    }
}

3、yml 文件

liteflow:
  #规则文件路径
  rule-source: config/flow.el.xml

4、flow.el.xml

<?xml version="1.0" encoding="UTF-8"?>
<flow>
    <chain name="chain1">
        THEN(流程A, 流程B, 流程C);
    </chain>
</flow>

4、单元测试

package com.healerjean.proj.liteflow;

import com.healerjean.proj.base.BaseJunit5SpringTest;
import com.healerjean.proj.liteflow.context.DemoContext;
import com.healerjean.proj.utils.JsonUtils;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.LiteflowResponse;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import javax.annotation.Resource;

/**
* LiteFlowTest
*
* @author zhangyujin
* @date 2023/7/24
*/
@DisplayName("LiteFlowTest")
@Slf4j
public class LiteFlowTest extends BaseJunit5SpringTest {

  @Resource
  private FlowExecutor flowExecutor;

  @DisplayName("flowExecutor.execute2Resp")
  @Test
  public void test(){
      DemoContext demoContext = new DemoContext();
      demoContext.setBusinessType("junitTestType");
      LiteflowResponse response = flowExecutor.execute2Resp("chain1", demoContext);
      log.info("[LiteFlowTest#flowExecutor.execute2Resp] res:{}", JsonUtils.toString(response));
  }
}

二、常规组件

1、普通组件

普通组件节点需要继承 NodeComponent,可用于 THENWHEN 关键字中。需要实现process方法

1)可以覆盖的方法

方法 说明
isAccess 推荐实现isAccess方法,表示是否进入该节点,可以用于业务参数的预先判断
isContinueOnError 表示出错是否继续往下执行下一个组件,默认为false
isEnd 如果覆盖后,返回 true,则表示在这个组件执行完之后立马终止整个流程。对于这种方式,由于是用户主动结束的流程,属于正常结束,所以最终的 isSuccess 是为 true 的。需要注意的是,如果isContinueOnError true的情况下,调用了this.setIsEnd(true),那么依旧会终止。response 里的isSuccess还是true
beforeProcessafterProcess 流程的前置和后置处理器,其中前置处理器,在isAccess 之后执行。
onSuccess 流程的成功事件回调
onError 流程的失败事件回调。 1、onError 方法执行后,因为主方法抛出异常,所以整个流程依旧是失败状态。response对象里依旧是主方法抛出的错。2、如果 onError 方法本身抛错,那么最终抛到最外面的错,是主方法里的错,而 onError 方法所产生的异常会被打出堆栈

2)This 关键字可以调用的方法

方法 说明
getNodeId 获取组件 ID
getName 获取组件别名
getChainId 获取当前执行的流程 Id
getRequestData 获取流程的初始参数
setIsEnd 表示是否立即结束整个流程 ,用法为this.setIsEnd(true)。对于这种方式,由于是用户主动结束的流程,属于正常结束,所以最终的 isSuccess 是为 true 的。 需要注意的是,如果 isContinueOnErrortrue 的情况下,调用了 this.setIsEnd(true),那么依旧会终止。response 里的 isSuccess 还是 true
getTag 获取这个组件的标签信息,关于标签的定义和使用,请参照组件标签。

2、选择组件

选择节点需要继承 NodeSwitchComponent。需要实现方法processSwitch方法。

1)根据 NodeId 进行选择

a、switchChain2

<!--选择组件-根据NodeId进行选择-->
<chain name="switchChain1">
    SWITCH(SwitchACmp).to(NormalBCmp, NormalCCmp);
</chain>

b、SwitchACmp

/**
 * SwitchACmp
 *
 * @author zhangyujin
 * @date 2023/7/24
 */
@Slf4j
@LiteflowComponent("SwitchACmp")
public class SwitchACmp extends NodeSwitchComponent {

    @Override
    public String processSwitch() throws Exception {
        log.info("SwitchACmp executed!");
        // 1、根据nodeId进行选择
        return "NormalCCmp";
    }
}

c、单元测试

@DisplayName("选择组件-根据NodeId进行选择")
@Test
public void test2(){
    DemoContext demoContext = new DemoContext();
    demoContext.setBusinessType("选择组件");
    LiteflowResponse response = flowExecutor.execute2Resp("switchChain1", demoContext);
    log.info("[LiteFlowTest#选择组件] res:{}", JsonUtils.toString(response));
}

2)根据表达式的 id进行选择

a、switchChain2

<!--选择组件-根据表达式的id进行选择-->
<chain name="switchChain2">
    SWITCH(SwitchACmp).to(NormalBCmp, WHEN(NormalBCmp, NormalCCmp).id("1Id"));
</chain>

b、SwitchACmp

/**
 * SwitchACmp
 *
 * @author zhangyujin
 * @date 2023/7/24
 */
@Slf4j
@LiteflowComponent("SwitchACmp")
public class SwitchACmp extends NodeSwitchComponent {

    @Override
    public String processSwitch() throws Exception {
        log.info("SwitchACmp executed!");
        // 2、根据表达式的id进行选择
        return "1Id";
    }
}

c、单元测试

@DisplayName("选择组件-根据表达式的id进行选择")
@Test
public void test3(){
    DemoContext demoContext = new DemoContext();
    demoContext.setBusinessType("选择组件");
    LiteflowResponse response = flowExecutor.execute2Resp("switchChain2", demoContext);
    log.info("[LiteFlowTest#选择组件] res:{}", JsonUtils.toString(response));
}

3)根据 tag进行选择

a、switchChain3

<!--选择组件-根据tag进行选择-->
<chain name="switchChain3">
    SWITCH(SwitchACmp).to(NormalBCmp.tag("TagB"), NormalCCmp.tag("TagC"));
</chain>

b、SwitchACmp

/**
 * SwitchACmp
 *
 * @author zhangyujin
 * @date 2023/7/24
 */
@Slf4j
@LiteflowComponent("SwitchACmp")
public class SwitchACmp extends NodeSwitchComponent {

    @Override
    public String processSwitch() throws Exception {
        log.info("SwitchACmp executed!");
        // 3、根据tag进行选择
        return "tag:TagB";
    }
}

c、单元测试

@DisplayName("选择组件-根据tag进行选择")
@Test
public void test4(){
    DemoContext demoContext = new DemoContext();
    demoContext.setBusinessType("选择组件");
    LiteflowResponse response = flowExecutor.execute2Resp("switchChain3", demoContext);
    log.info("[LiteFlowTest#选择组件] res:{}", JsonUtils.toString(response));
}

4)表达式 tag的选择

a、switchChain4

<!--选择组件-表达式tag的选择-->
<chain name="switchChain4">
    SWITCH(SwitchACmp).to(NormalBCmp, WHEN(NormalBCmp, NormalCCmp).tag("TagC"));
</chain>

b、SwitchACmp

/**
 * SwitchACmp
 *
 * @author zhangyujin
 * @date 2023/7/24
 */
@Slf4j
@LiteflowComponent("SwitchACmp")
public class SwitchACmp extends NodeSwitchComponent {

    @Override
    public String processSwitch() throws Exception {
        log.info("SwitchACmp executed!");
        // 4、表达式tag的选择
        return "tag:TagC";
    }
}

c、单元测试

@DisplayName("选择组件-表达式tag的选择")
@Test
public void test5(){
    DemoContext demoContext = new DemoContext();
    demoContext.setBusinessType("选择组件");
    LiteflowResponse response = flowExecutor.execute2Resp("switchChain4", demoContext);
    log.info("[LiteFlowTest#选择组件] res:{}", JsonUtils.toString(response));
}

5)链路 tag 的选择

a、switchChain5

<!--选择组件-链路tag的选择-->
<chain name="switchChain5">
    SWITCH(SwitchACmp).to(NormalBCmp, switchChain5Sub.tag("TagC"));
</chain>
<chain name="switchChain5Sub">
    THEN(NormalBCmp, NormalCCmp);
</chain>

b、SwitchACmp

无论返回 switchChain5Sub 还是 tag:TagC 都能选择到后面的链路。

/**
 * SwitchACmp
 *
 * @author zhangyujin
 * @date 2023/7/24
 */
@Slf4j
@LiteflowComponent("SwitchACmp")
public class SwitchACmp extends NodeSwitchComponent {

    @Override
    public String processSwitch() throws Exception {
        log.info("SwitchACmp executed!");
        // 5、链路tag的选择
        return "tag:TagC";
    }
}

c、单元测试

    @DisplayName("选择组件-链路tag的选择")
    @Test
    public void test6(){
        DemoContext demoContext = new DemoContext();
        demoContext.setBusinessType("选择组件");
        LiteflowResponse response = flowExecutor.execute2Resp("switchChain5", demoContext);
        log.info("[LiteFlowTest#选择组件] res:{}", JsonUtils.toString(response));
    }
}

3、条件组件

需要继承 NodeIfComponent

1)ifChain1

<!--条件组件-->
<chain name="ifChain1">
    <!-- XIfCmd 就是IF组件,为真,执行 IfNormalBCmp,为假,执行 IfNormalCCmp:-->
    IF(XIfCmd, IfNormalBCmp, IfNormalCCmp);
</chain>

2)XIfCmd

/**
 * XIFCmd
 *
 * @author zhangyujin
 * @date 2023/7/24
 */
@LiteflowComponent("XIfCmd")
@Slf4j
public class XIfCmd extends NodeIfComponent {


    @Override
    public boolean processIf() throws Exception {
        //do your biz
        return true;
    }
}

3)单元测试

@DisplayName("条件组件")
@Test
public void test7(){
    DemoContext demoContext = new DemoContext();
    demoContext.setBusinessType("条件组件");
    LiteflowResponse response = flowExecutor.execute2Resp("ifChain1", demoContext);
    log.info("[LiteFlowTest#条件组件] res:{}", JsonUtils.toString(response));
}

三、EL 规则

1、串行编排

如果你要依次执行 a , b , c , d 四个组件,你可以用 THEN 关键字,需要注意的是,THEN 必须大写。

<chain name="chain1">
    THEN(a, b, c, d);
</chain>

image-20230724130034056

2、并行编排

1)最基本的例子

如果你要并行执行a,b,c三个组件,你可以用WHEN关键字,需要注意的是,WHEN必须大写。

<chain name="chain1">
    WHEN(a, b, c);
</chain>

2)和串行嵌套起来(一)

接下来,让我们把 THENWHEN 结合起来用,看一个示例:

<chain name="chain1">
    THEN(
        a,
        WHEN(b, c, d),
        e
    );
</chain>

image-20230724130225690

3)和串行嵌套起来(二)

<chain name="chain1">
    THEN(
        a,
        WHEN(b, THEN(c, d)),
        e
    );
</chain>

image-20230724130300072

4)忽略错误

WHEN 关键字提供了一个子关键字 ignoreError (默认为 false)来提供忽略错误的特性,用法如下:假设 b, c , d 中任一一个节点有异常,那么最终 e 仍旧会被执行。

<chain name="chain1">
    THEN(
        a,
        WHEN(b, c, d).ignoreError(true),
        e
    );
</chain>

image-20230724130414450

5)任一节点先执行完则忽略其他

WHEN 关键字提供了一个子关键字 any (默认为 false )用来提供并行流程中,任一条分支先执行完即忽略其他分支,继续执行的特性。用法如下 (假设 e 节点先执行完,那么不管其他分支是否执行完,会立马执行节点 f

<chain name="chain1">
    THEN(
        a,
        WHEN(b, THEN(c, d), e).any(true),
        f
    );
</chain>

image-20230724130504800

6)关于组的概念

EL 表达式,其实你写 2 个不同的 WHEN 就是2个组。比如:(a b 是一个并行组,而 c d 是另一个并行组。)

<chain name="chain1">
    THEN(
        WHEN(a, b),
        WHEN(c, d)
    );
</chain>

3、选择编排

我们在写业务逻辑的时候,通常会碰到选择性问题,即,如果返回结果1,则进入 A 流程,如果返回结果2,则进入 B 流程,如果返回结果3,则进入 C 流程。在有些流程定义中也被定义为排他网关。

1)最基本的例子

如果,根据组件a,来选择执行b,c,d中的一个,你可以如下声明:

<chain name="chain1">
    SWITCH(a).to(b, c, d);
</chain>

image-20230724130816361

2)DEFAULT 关键字

用法为SWITCHTODEFAULT。如表达式的 x 如果返回非 a , b , c 中的一个,则默认选择到y。当然 DEFAULT 里面也可以是一个表达式。

<chain name="chain1">
    SWITCH(x).TO(a, b, c).DEFAULT(y);
</chain>

3)和 THEN,WHEN嵌套起来

<chain name="chain1">
    THEN(
        a,
        WHEN(
            b,
            SWITCH(c).to(d,e)
        ),
        f
    );
</chain>

image-20230724131035223

4)选择编排中的 id 语法

LiteFlow 中规定,每个表达式都可以有一个 id 值,你可以设置 id 值来设置一个表达式的 id 值。然后在选择组件里返回这个id即可。用法如下:

<chain name="chain1">
    THEN(
        a,
        SWITCH(b).to(
            c, 
            THEN(d, e).id("t1")
        ),
        f
    );
</chain>

image-20230724131230924

5)选择编排中的 tag 语法

事实上,除了给表达式赋值 id属性之外,你还可以给表达式赋值 tag 属性。用法如下:

<chain name="chain1">
    THEN(
        a,
        SWITCH(b).to(
            c, 
            THEN(d, e).tag("t1")
        ),
        f
    );
</chain>

4、条件编排

1) IF 的二元表达式

其中 x 为条件节点,为真的情况下,执行链路就为 x -> a ->b,为假链路就为 x ->b

<chain name="chain1">
    THEN(
        IF(x, a),
        b
    );
</chain>

image-20230724131457832

2) IF 的三元表达式

其中 x 为条件节点,为真的情况下,执行链路就为 x - > a -> c ,为假链路就为 x -> b -> c

<chain name="chain1">
    THEN(
        IF(x, a, b),
        c
    );
</chain>

image-20230724131525313

3)ELSE 表达式

LiteFlow 也提供了ELSE表达式,IF的二元表达式 + ELSE 表达式等同于 IF 三元表达式,比如:

<chain name="chain1">
    IF(x, a).ELSE(b);
</chain>

<chain name="chain1">
    IF(x, a, b);
</chain>

4)ELIF 表达式

ELIF 关键字的用法其实和 java 语言的 else if 类似,可以跟多个,和 IF 二元表达式参数一样,一般最后还会跟个 ELSE,用于多重条件的判断:

<chain name="chain1">
    IF(x1, a).ELIF(x2, b).ELIF(x3, c).ELIF(x4, d).ELSE(THEN(m, n));
</chain>

image-20230724131827498

4、捕获异常表达式

1)基本用法

语法表示,如果a组件出现异常并抛出,则不会执行b组件,会直接执行c组件, 在 c组件中,可以通过this.getSlot().getException() 来获取异常。

同时,当用了CATCH表达式之后,即便在 CATCH 包裹的组件有异常抛出,整个流程返回的 LiteflowResponse 中的 isSuccess方法仍然为true getCause 中也没有任何的 Exception。如果你写过 java 程序,应该会对这样的机制很容易理解。因为异常已经被你自己处理掉了。

<chain name="chain1">
    CATCH(
        THEN(a,b)
    ).DO(c)
</chain>

5、与或非表达式

1)AND

如果 xy 都为 true,则为真,会执行组件 a,如果 xy 有一个为 false,则执行 b

<chain name="chain1">
    IF(AND(x,y), a, b);
</chain>

2)OR

只要 xy 中的一个为 true,则为真,否则为假。

<chain name="chain1">
    IF(OR(x,y), a, b);
</chain>

3)NOT

NOT 就是非的意思,比如:如果 x 返回 true,则经过非运算后,为假,执行 b ,如果 x 返回 false,则经过非运算后,为真,执行a。

<chain name="chain1">
    IF(NOT(x), a, b);
</chain>

6、使用子流程

<chain name="chain4">
    THEN(
        A, B,
        WHEN(
            THEN(C, WHEN(J, K)),
            D,
            THEN(H, I)
        ),
        SWITCH(X).to(
            M,
            N,
            WHEN(Q, THEN(P, R)).id("w01")
        ),
        Z
    );
</chain>

image-20230724132651376

<chain name="mainChain">
    THEN(
    	A, B,
    	WHEN(chain1, D, chain2),
    	SWITCH(X).to(M, N, chain3),
    	z
    );
</chain>

<chain name="chain1">
  	THEN(C, WHEN(J, K));
</chain>

<chain name="chain2">
  	THEN(H, I);
</chain>

<chain name="chain3">
  	WHEN(Q, THEN(P, R)).id("w01");
</chain>

7、使用子变量

image-20230724132754965

<chain>
    t1 = THEN(C, WHEN(J, K));
    w1 = WHEN(Q, THEN(P, R)).id("w01");
    t2 = THEN(H, I);
    
    THEN(
        A, B,
        WHEN(t1, D, t2),
        SWITCH(X).to(M, N, w1),
        Z
    );
</chain>

8、验证规则

public void yourMethod() {
    boolean isValid = LiteFlowChainELBuilder.validate("THEN(a, b, h)");
    ...
}

四、数据上下文

对于数据上下文而言,初始化动作是由框架来处理的。也就是说,在你执行第一个组件时,上下文对象里面是没有用户数据的。而你的流程入参是用 this.getRequestData() 获取的,这部分不包含在上下文里面

1)this.getRequestData()


@DisplayName("条件组件")
@Test
public void test7(){
DemoContext demoContext = new DemoContext();
demoContext.setBusinessType("条件组件");
LiteflowResponse response = flowExecutor.execute2Resp("ifChain1", demoContext);
log.info("[LiteFlowTest#条件组件] res:{}", JsonUtils.toString(response));
}


// 4、获取流程的初始参数。。
Object requestData = this.getRequestData();
log.info("[BCmp#process] this.getRequestData():{}", JSON.toJSONString(requestData));

2)this.getContextBean()

传入之后, LiteFlow 会在调用时进行初始化,给这个上下文分配唯一的实例。你在组件之中可以这样去获得这个上下文实例:

@DisplayName("高级特性-组件化参数")
@Test
public void testFeat001(){
    DemoContext demoContext = new DemoContext();
    demoContext.setBusinessType("高级特性");

    LiteflowResponse response = flowExecutor.execute2Resp("featChain1", demoContext, 
                                                          DemoContext.class, 
                                                          UserContext.class);
    log.info("[LiteFlowTest#高级特性] res:{}", JsonUtils.toString(response));
}



@Override
public void process() {
  log.info("[FeatBCmp#]process");
  // 1、高级特性,组件参数
  NodePramsBO nodePrams = this.getCmpData(NodePramsBO.class);
  log.info("[FeatBCmp#process] this.getCmpData():{}", nodePrams);

  DemoContext requestData = this.getRequestData();
  log.info("[FeatBCmp#process] requestData:{}", JsonUtils.toString(requestData));

  DemoContext demoContext = this.getContextBean(DemoContext.class);
  log.info("[FeatBCmp#process] demoContext:{}", JsonUtils.toString(demoContext));

  UserContext userContext = this.getContextBean(UserContext.class);
  log.info("[FeatBCmp#process] userContext:{}", JsonUtils.toString(userContext));
}

this.getCmpData():{"age":20,"name":"r***"}
requestData:{"businessType":"高级特性"}
demoContext:{}
userContext:{}

五、执行器

1、返回类型: LiteflowResponse

//第一个参数为流程ID,第二个参数为流程入参,后面可以传入多个上下文class
public LiteflowResponse execute2Resp(String chainId, Object param, Class<?>... contextBeanClazzArray)
  
//第一个参数为流程ID,第二个参数为流程入参,后面可以传入多个上下文的Bean
public LiteflowResponse execute2Resp(String chainId, Object param, Object... contextBeanArray)

1)流程执行是否成功

boolean isSuccess = response.isSuccess();

2)获取异常信息

if (!response.isSuccess()){
  Exception e = response.getCause();
}

3)获得执行步骤详细信息

Map<String, CmpStep> stepMap = response.getExecuteSteps();


Queue<CmpStep> stepQueue = response.getExecuteStepQueue();

4)上下文数据

CustomContext context = response.getContextBean(CustomContext.class);

5)获得步骤字符串信息

String stepStr = response.getExecuteStepStrWithTime();

六、用代码动态构造规则

值得提一下的是,由于用构造模式是一个链路一个链路的添加,如果你用了子流程,如果chain1依赖chain2,那么chain2要先构建。否则会报错。

LiteFlowChainELBuilder.createChain().setChainName("chain2").setEL(
  "THEN(a, b, WHEN(c, d))"
).build();

七、高级特性

1、本地规则文件监听

LiteFlow 的配置liteflow.rule-source中,不光可以配置项目内的规则文件,其实还可以配置本地磁盘上的文件的

只需要配置 liteflow.enable-monitor-file=true,即可开启自动对文件的监听特性。文件改动,你的项目无需做任何事,立马自动刷新整个规则。非常方便。

liteflow.rule-source=/Users/bryan31/liteflow/test/flow.xml

2、组件参数

1)featChain1

<!--高级特性-组件参数-->
<chain name="featChain1">
    cmpBData = '{"name":"rose","age":20}';
    THEN(FeatACmp, FeatBCmp.data(cmpBData), FeatCCmp);
</chain>

2)NodePramsBO

/**
 * NodePramsBO
 *
 * @author zhangyujin
 * @date 2023/7/24
 */
@Accessors(chain = true)
@Data
public class NodePramsBO {

    /**
     * name
     */
    private String name;

    /**
     * age
     */
    private Integer age;

}

3)FeatBCmp

/**
 * FeatBCmp
 *
 * @author zhangyujin
 * @date 2023 /7/24
 */
@Slf4j
@LiteflowComponent("FeatBCmp")
public class FeatBCmp extends NodeComponent {

    /**
     * 流程执行
     */
    @Override
    public void process() {
        log.info("[FeatBCmp#]process");
        // 1、高级特性,组件参数
        NodePramsBO nodePrams = this.getCmpData(NodePramsBO.class);
        log.info("[FeatBCmp#process] this.getCmpData():{}", nodePrams);
    }


}

4)单元测试

@DisplayName("高级特性-组件化参数")
@Test
public void testFeat001(){
    DemoContext demoContext = new DemoContext();
    demoContext.setBusinessType("高级特性");
    LiteflowResponse response = flowExecutor.execute2Resp("featChain1", demoContext);
    log.info("[LiteFlowTest#高级特性] res:{}", JsonUtils.toString(response));
}

3、组件别名

1)Springboot

@LiteflowComponent(id = "a", name = "组件A")
public class ACmp extends NodeComponent {

	@Override
	public void process() {
		System.out.println("ACmp executed!");
	}
}

2)规则文件方式定义组件

<node id="a" name="组件A" class="com.yomahub.liteflow.test.config.cmp.ACmp"/>
<node id="b" name="组件B" class="com.yomahub.liteflow.test.config.cmp.BCmp"/>

4、组件重试

LiteFLow 支持组件的重试,其中又分全局重试和单个组件重试,下面一一说明

1)全局重试

如果需要全局重试,你需要做如下配置:

流程继续

1、如果组件里覆盖了isContinueOnError,设为true的话,那流程会继续。

2、异步流程的话,如果 WHEN 上配置了 ignoreError true 的 话(默为 false ),则下一个不同并行组会继续。关于ignoreError的定义和使用

liteflow.retry-count=3

2)单个组件重试

如果这个组件抛出的异常是NullPointerException或者IllegalArgumentException(或者是这两个Exception类的子类),则会进行最多5次的尝试,最后一遍再不成功,那就会真正抛出异常。

@LiteflowComponent("c")
// @LiteflowRetry(5)
@LiteflowRetry(retry = 5, forExceptions = {NullPointerException.class,IllegalArgumentException.class})
public class CCmp extends NodeComponent {
	@Override
	public void process() {
		//do your biz
	}
}

3)全局重试和单个组件重试都定义的情况下

如果在2者都定义的情况下,优先取单个组件的重试配置。没有的情况下,再取全局配置。

5、平滑热刷新

1)主动调用代码刷新

如果你使用了数据库作为规则文件的存储方式,或是你自己实现了自定义配置源,那么LiteFlow还提供了一种基于代码刷新的方式。

flowExecutor.reloadRule();

1、这样刷新是全量刷新,不过各位同学不用担心其性能,经测试,LiteFlow 框架一秒可以刷新 1000 条规则左右,这都是一些cpu级别的操作,如果你规则没有上大几千,几w条,那么推荐这种方式。

2、如果你的应用是多节点部署的,必须在每个节点上都要刷新,因为规则是存储在 jvm 内存里的。这就意味着,如果你把刷新规则做成一个rpc接口(诸如 dubbo 接口之类的),那么 rpc 接口只会调用到其中一个节点,也就是说,只会有一个节点的规则会刷新。

正确的做法是:利用 mq 发一个消息,让各个节点去监听到,进行刷新。

2)单独刷新某一个规则

如果你的规则比较多,成千上万条,又或者你就是不想全量刷新。希望单独刷新某个改动的规则。

LiteFlowChainELBuilder.createChain().setChainName("chain2").setEL(
  "THEN(a, b, WHEN(c, d))"
).build();

1、既然是指定刷新,那么必须你要获取到改动的 EL 内容,然后再利用动态代码构建重新 build下就可以了,这种方式会自动替换缓存中已有的规则。这种方式不用在 build 之前销毁流程。

2、如果是多服务节点部署的情况下,还是要遵循每个节点要都刷新,上面已经说明具体建议的方式。这里不再赘述

6、组件切面

1)全局切面

全局切面是针对于所有的组件,进行切面。你只需要做如下实现即可:

@Component
public class CmpAspect implements ICmpAroundAspect {
    @Override
    public void beforeProcess(String nodeId, Slot slot) {
        YourContextBean context = slot.getContextBean(YourContextBean.class);
        //before business
    }

    @Override
    public void afterProcess(String nodeId, Slot slot) {
        YourContextBean context = slot.getContextBean(YourContextBean.class);
        //after business
    }
}

2)Aspect 的切面

LiteFlow 同时也支持了 Spring Aspect 的切面,你可以用 @Aspect 标注对任意包,任意规则内的组件进行切面

@Aspect
public class CustomAspect {

    @Pointcut("execution(* com.yomahub.liteflow.test.aop.cmp1.*.process())")
    public void cut() {
    }

    @Around("cut()")
    public Object around(ProceedingJoinPoint jp) throws Throwable {
        //do before business
        Object returnObj = jp.proceed();
        //do after business
        return returnObj;
    }
}

7、步骤信息

LiteflowResponse response = flowExecutor.execute2Resp("chain1", "初始参数", CustomContext.class);
Map<String, CmpStep> stepMap = response.getExecuteSteps();
//或者你也可以通过以下的语句来获得一个步骤队列
Queue<CmpStep> stepQueue = response.getExecuteStepQueue();
参数 说明
isSuccess 此组件是否执行成功
getNodeId 获得组件Id
getNodeName 获得组件名称
getTag 获得组件标签值
getTimeSpent 获得组件的耗时,单位为毫秒
getException 获取此组件抛出的异常,如果 isSuccess为false的话。但是这里要注意下:有 exceptionsuccess 一定为false,但是 successfalse ,不一定有 exception,因为有可能没执行到,或者没执行结束 (any的情况)。

8、异常

如果你的业务中有获取异常Code的需求,则你自定义的异常需要实现 LiteFlow 提供的 LiteFlowException 接口:

public class YourException extends LiteFlowException {
	public YourException(String code, String message) {
		super(code, message);
	}
}
LiteflowResponse response = flowExecutor.execute2Resp("chain1", 初始参数, CustomContext.class);
if (!response.isSuccess()){
  Exception e = response.getCause();
  String code = response.getCode();
  String message = response.getMessage();
}

9、打印信息详解

1)流程执行中打印

[ea1af4810cc849d58948d091d858b29a]:[O]start component[ACmp] execution
[ea1af4810cc849d58948d091d858b29a]:[O]start component[BCmp] execution
[ea1af4810cc849d58948d091d858b29a]:[X]start component[CCmp] execution
[ea1af4810cc849d58948d091d858b29a]:[O]start component[DCmp] execution

1、其中最前面的一串序号,代表这个请求的请求ID,一个请求无论经历了多少个组件,他们的请求ID都是一致的,你可以根据这个ID在日志中进行快速定位进行排查。

2、在后面会跟着一个[O]或者[X],[O]代表了执行了这个组件的主要逻辑,[X]代表因为isAccess()返回了false所以没进入这个组件的主要逻辑。

3、如果你不希望打印这种中间执行信息,LiteFlow 提供了配置项,你需要作如下设置:

liteflow.print-execution-log=false

##

2)打印步骤信息

在执行完一个链路之后,框架会自动打出这一条流程的执行步骤顺序,如下所示: (这里的表达形式为:组件ID<耗时毫秒>

a<100>==>c<10>==>m<0>==>q<200>==>p<300>==>p1<0>==>g<305>

如果你希望在打印流程链的时候增加别名描述,那你需要在定义组件的时候设置name属性,具体请参照[组件别名]

a[组件A]<100>==>b[组件B]<0>==>m[组件M]<256>

10、自定义请求 Id

其中日志主体中最前面的就是 RequestId,一个请求中的 requestId 都是相同的,方便你进行日志查找。

2022-07-03 11:15:00.196  INFO 71275 --- [           main] com.yomahub.liteflow.flow.element.Node   : [067a0baa6d434de3a8ccafa4b1506562]:[O]start component[a] execution
2022-07-03 11:15:00.204  INFO 71275 --- [           main] com.yomahub.liteflow.flow.element.Node   : [067a0baa6d434de3a8ccafa4b1506562]:[O]start component[b] execution
2022-07-03 11:15:00.218  INFO 71275 --- [lf-when-thead-0] com.yomahub.liteflow.flow.element.Node   : [067a0baa6d434de3a8ccafa4b1506562]:[O]start component[c] execution
2022-07-03 11:15:00.220  INFO 71275 --- [lf-when-thead-1] com.yomahub.liteflow.flow.element.Node   : [067a0baa6d434de3a8ccafa4b1506562]:[O]start component[d] execution
2022-07-03 11:15:00.220  INFO 71275 --- [           main] com.yomahub.liteflow.slot.Slot           : [067a0baa6d434de3a8ccafa4b1506562]:CHAIN_NAME[chain1]
a<1>==>b<0>==>c<0>==>d<0>
2022-07-03 11:15:00.221  INFO 71275 --- [           main] com.yomahub.liteflow.slot.DataBus        : [067a0baa6d434de3a8ccafa4b1506562]:slot[0] released

1)按照自己的规则生成

你只需要要声明一个类,然后实现 RequestIdGenerator 接口即可:

public class CustomRequestIdGenerator implements RequestIdGenerator {

    @Override
    public String generate() {
        return System.nanoTime();
    }
}

然后在 LiteFlow 的配置文件里声明下你这个类即可:

liteflow.request-id-generator-class=com.yomahub.liteflow.test.requestId.config.CustomRequestIdGenerator

2)传入已有的 requestId / traceId

LiteflowResponse response = flowExecutor.execute2RespWithRid("chain1", arg, "T001234", YourContext.class);

11、异步线程池自定义

LiteFlow 自己默认有全局线程池,并且线程池的大小等参数可以通过设置以下参数来进行设置:

liteflow.when-max-wait-seconds=15
liteflow.when-max-workers=16
liteflow.when-queue-limit=512

1)自定义全局线程池

但是如果你要对线程池有特殊化的要求,LiteFlow 也支持自定义线程池的设置。需要注意的是,自定义线程池只适用于并行组件,这个参数对于同步组件来说并无作用。而且一旦设置了你自定义的线程池,那么以上参数将不会再有用。对于线程池的所有参数的定义,都取决于你自己了。

public class CustomThreadBuilder implements ExecutorBuilder {
    @Override
    public ExecutorService buildExecutor() {
        return Executors.newCachedThreadPool();
    }
}
liteflow.thread-executor-class=com.yomahub.liteflow.test.customThreadPool.CustomThreadBuilder

2)WHEN级别的单独线程池

<chain name="chain1">
    WHEN(a, b).threadPool("com.yomahub.liteflow.test.customWhenThreadPool.CustomThreadExecutor1");
</chain>
<chain name="chain2">
    WHEN(c, d).threadPool("com.yomahub.liteflow.test.customWhenThreadPool.CustomThreadExecutor2");
</chain>

12、简单监控

LiteFlow 提供了简单的监控,目前只统计一个指标:每个组件的平均耗时。默认每5分钟会打印一次(可以自己调整),并且是根据耗时时长倒序排的。

#是否启用监控
liteflow.monitor.enable-log=false
#监控队列的大小
liteflow.monitor.queue-limit=200
#监控延迟多少毫秒打印
liteflow.monitor.delay=300000
#监控每隔多少毫秒打印
liteflow.monitor.period=300000

ContactAuthor