流程编排之LiteFlow
前言
Github:https://github.com/HealerJean
官方:https://liteflow.yomahub.com/pages/9bf839/
常用流程编排框架
liteflow |
asyncTool |
JDEasyFlow |
disruptor |
|
---|---|---|---|---|
开发团队 | 由阿里巴巴开发,是一个开源的流程编排框架。 | 由阿里巴巴开发,是一个开源的异步流程处理框架。 | 由Jdon 公司开发,是一个基于Java的流程编排框架。 |
由 LMAX Disruptor 公司开发,是一个开源的高性能线程间消息传递框架。 |
介绍 | LiteFlow 是一个非常强大的现代化的规则引擎框架,融合了编排特性和规则引擎的所有特性。如果你要对复杂业务逻辑进行新写或者重构,用LiteFlow 最合适不过。它是一个编排式的规则引擎框架,组件编排,帮助解耦业务代码,让每一个业务片段都是一个组件。 |
asyncTool 提供了一种异步的流程处理方式,可以将耗时的操作异步化处理,提高系统的吞吐量和响应速度。它提供了一系列的异步任务节点,如异步 Java 任务、异步Shell任务、异步HTTP任务等,以及异步线程池和异步回调机制。 |
JDEasyFlow 提供了一系列的流程模型和任务节点,支持顺序流、条件流、循环流和并行流等。它提供了一个基于XML的流程定义语言,以及可视化的流程设计工具,还支持多种任务节点和事件处理机制。 |
disruptor 是一个用于实现高性能和低延迟应用的消息传递框架,旨在提供快速的事件处理能力。它提供了一种高效的环形缓冲区数据结构,支持多生产者和多消费者模式,并且具有可预测的性能表现。 |
优点 | 轻量级、易用性好、灵活性强、支持多种流程模型和任务节点,同时具有良好的性能和稳定性。复杂业务流程编排、社区成熟, | 异步处理能力强、性能高、灵活性强、可以适用于高并发和高吞吐量的场景。 | 简单、灵活、易扩展 | 高性能、低延迟、可预测的性能表现、灵活的事件处理机制。 |
缺点 | 开源框架较重,有一定学习成本,社区活跃度相对较低,文档和示例相对较少 | 相对较窄的应用场景,主要适用于处理耗时操作。 | 相对于其他流程编排框架,JDEasyFlow 可能较为复杂,需要更多的配置和维护工作。 |
主要适用于线程间消息传递和事件处理的场景,可能不太适合用于流程编排。 |
社区活跃度 | 尽管社区活跃度相对较低,但该框架仍然有一个相对稳定的用户群体。 | asyncTool 的社区相对较小,但仍然有一个稳定的用户群体。 |
JDEasyFlow 的社区相对较小,但仍然有一定的用户群体。 |
disruptor 是一个广受欢迎的开源框架,拥有一个活跃的社区和大量的用户群体。 |
一、接入 SpringBoot
1、pom.xml
<!--liteflow-->
<dependency>
<groupId>com.yomahub</groupId>
<artifactId>liteflow-spring-boot-starter</artifactId>
<version>2.10.5</version>
</dependency>
2、流程节点
1)ACmp
/**
* ACmp
*
* @author zhangyujin
* @date 2023/7/24
*/
@LiteflowComponent("流程A")
@Slf4j
public class ACmp extends NodeComponent {
@Override
public void process() {
log.info("[ACmp#]process");
}
}
2)BCmp
package com.healerjean.proj.liteflow.flow;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.NodeComponent;
import lombok.extern.slf4j.Slf4j;
/**
* BCmp
*
* @author zhangyujin
* @date 2023/7/24
*/
@Slf4j
@LiteflowComponent("流程B")
public class BCmp extends NodeComponent {
@Override
public void process() {
log.info("[BCmp#]process");
}
}
3)CCmp
package com.healerjean.proj.liteflow.flow;
import com.yomahub.liteflow.annotation.LiteflowComponent;
import com.yomahub.liteflow.core.NodeComponent;
import lombok.extern.slf4j.Slf4j;
/**
* CCmp
*
* @author zhangyujin
* @date 2023/7/24
*/
@Slf4j
@LiteflowComponent("流程C")
public class CCmp extends NodeComponent {
@Override
public void process() {
log.info("[CCmp#]process");
}
}
3、yml
文件
liteflow:
#规则文件路径
rule-source: config/flow.el.xml
4、flow.el.xml
<?xml version="1.0" encoding="UTF-8"?>
<flow>
<chain name="chain1">
THEN(流程A, 流程B, 流程C);
</chain>
</flow>
4、单元测试
package com.healerjean.proj.liteflow;
import com.healerjean.proj.base.BaseJunit5SpringTest;
import com.healerjean.proj.liteflow.context.DemoContext;
import com.healerjean.proj.utils.JsonUtils;
import com.yomahub.liteflow.core.FlowExecutor;
import com.yomahub.liteflow.flow.LiteflowResponse;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import javax.annotation.Resource;
/**
* LiteFlowTest
*
* @author zhangyujin
* @date 2023/7/24
*/
@DisplayName("LiteFlowTest")
@Slf4j
public class LiteFlowTest extends BaseJunit5SpringTest {
@Resource
private FlowExecutor flowExecutor;
@DisplayName("flowExecutor.execute2Resp")
@Test
public void test(){
DemoContext demoContext = new DemoContext();
demoContext.setBusinessType("junitTestType");
LiteflowResponse response = flowExecutor.execute2Resp("chain1", demoContext);
log.info("[LiteFlowTest#flowExecutor.execute2Resp] res:{}", JsonUtils.toString(response));
}
}
二、常规组件
1、普通组件
普通组件节点需要继承
NodeComponent
,可用于THEN
和WHEN
关键字中。需要实现process
方法
1)可以覆盖的方法
方法 | 说明 |
---|---|
isAccess |
推荐实现isAccess 方法,表示是否进入该节点,可以用于业务参数的预先判断 |
isContinueOnError |
表示出错是否继续往下执行下一个组件,默认为false |
isEnd |
如果覆盖后,返回 true ,则表示在这个组件执行完之后立马终止整个流程。对于这种方式,由于是用户主动结束的流程,属于正常结束,所以最终的 isSuccess 是为 true 的。需要注意的是,如果isContinueOnError 为 true 的情况下,调用了this.setIsEnd(true) ,那么依旧会终止。response 里的isSuccess 还是true 。 |
beforeProcess 和afterProcess |
流程的前置和后置处理器,其中前置处理器,在isAccess 之后执行。 |
onSuccess |
流程的成功事件回调 |
onError |
流程的失败事件回调。 1、onError 方法执行后,因为主方法抛出异常,所以整个流程依旧是失败状态。response对象里依旧是主方法抛出的错。2、如果 onError 方法本身抛错,那么最终抛到最外面的错,是主方法里的错,而 onError 方法所产生的异常会被打出堆栈 |
2)This
关键字可以调用的方法
方法 | 说明 |
---|---|
getNodeId |
获取组件 ID |
getName |
获取组件别名 |
getChainId |
获取当前执行的流程 Id |
getRequestData |
获取流程的初始参数 |
setIsEnd |
表示是否立即结束整个流程 ,用法为this.setIsEnd(true) 。对于这种方式,由于是用户主动结束的流程,属于正常结束,所以最终的 isSuccess 是为 true 的。 需要注意的是,如果 isContinueOnError 为 true 的情况下,调用了 this.setIsEnd(true) ,那么依旧会终止。response 里的 isSuccess 还是 true |
getTag |
获取这个组件的标签信息,关于标签的定义和使用,请参照组件标签。 |
2、选择组件
选择节点需要继承
NodeSwitchComponent
。需要实现方法processSwitch
方法。
1)根据 NodeId
进行选择
a、switchChain2
<!--选择组件-根据NodeId进行选择-->
<chain name="switchChain1">
SWITCH(SwitchACmp).to(NormalBCmp, NormalCCmp);
</chain>
b、SwitchACmp
/**
* SwitchACmp
*
* @author zhangyujin
* @date 2023/7/24
*/
@Slf4j
@LiteflowComponent("SwitchACmp")
public class SwitchACmp extends NodeSwitchComponent {
@Override
public String processSwitch() throws Exception {
log.info("SwitchACmp executed!");
// 1、根据nodeId进行选择
return "NormalCCmp";
}
}
c、单元测试
@DisplayName("选择组件-根据NodeId进行选择")
@Test
public void test2(){
DemoContext demoContext = new DemoContext();
demoContext.setBusinessType("选择组件");
LiteflowResponse response = flowExecutor.execute2Resp("switchChain1", demoContext);
log.info("[LiteFlowTest#选择组件] res:{}", JsonUtils.toString(response));
}
2)根据表达式的 id
进行选择
a、switchChain2
<!--选择组件-根据表达式的id进行选择-->
<chain name="switchChain2">
SWITCH(SwitchACmp).to(NormalBCmp, WHEN(NormalBCmp, NormalCCmp).id("1Id"));
</chain>
b、SwitchACmp
/**
* SwitchACmp
*
* @author zhangyujin
* @date 2023/7/24
*/
@Slf4j
@LiteflowComponent("SwitchACmp")
public class SwitchACmp extends NodeSwitchComponent {
@Override
public String processSwitch() throws Exception {
log.info("SwitchACmp executed!");
// 2、根据表达式的id进行选择
return "1Id";
}
}
c、单元测试
@DisplayName("选择组件-根据表达式的id进行选择")
@Test
public void test3(){
DemoContext demoContext = new DemoContext();
demoContext.setBusinessType("选择组件");
LiteflowResponse response = flowExecutor.execute2Resp("switchChain2", demoContext);
log.info("[LiteFlowTest#选择组件] res:{}", JsonUtils.toString(response));
}
3)根据 tag
进行选择
a、switchChain3
<!--选择组件-根据tag进行选择-->
<chain name="switchChain3">
SWITCH(SwitchACmp).to(NormalBCmp.tag("TagB"), NormalCCmp.tag("TagC"));
</chain>
b、SwitchACmp
/**
* SwitchACmp
*
* @author zhangyujin
* @date 2023/7/24
*/
@Slf4j
@LiteflowComponent("SwitchACmp")
public class SwitchACmp extends NodeSwitchComponent {
@Override
public String processSwitch() throws Exception {
log.info("SwitchACmp executed!");
// 3、根据tag进行选择
return "tag:TagB";
}
}
c、单元测试
@DisplayName("选择组件-根据tag进行选择")
@Test
public void test4(){
DemoContext demoContext = new DemoContext();
demoContext.setBusinessType("选择组件");
LiteflowResponse response = flowExecutor.execute2Resp("switchChain3", demoContext);
log.info("[LiteFlowTest#选择组件] res:{}", JsonUtils.toString(response));
}
4)表达式 tag
的选择
a、switchChain4
<!--选择组件-表达式tag的选择-->
<chain name="switchChain4">
SWITCH(SwitchACmp).to(NormalBCmp, WHEN(NormalBCmp, NormalCCmp).tag("TagC"));
</chain>
b、SwitchACmp
/**
* SwitchACmp
*
* @author zhangyujin
* @date 2023/7/24
*/
@Slf4j
@LiteflowComponent("SwitchACmp")
public class SwitchACmp extends NodeSwitchComponent {
@Override
public String processSwitch() throws Exception {
log.info("SwitchACmp executed!");
// 4、表达式tag的选择
return "tag:TagC";
}
}
c、单元测试
@DisplayName("选择组件-表达式tag的选择")
@Test
public void test5(){
DemoContext demoContext = new DemoContext();
demoContext.setBusinessType("选择组件");
LiteflowResponse response = flowExecutor.execute2Resp("switchChain4", demoContext);
log.info("[LiteFlowTest#选择组件] res:{}", JsonUtils.toString(response));
}
5)链路 tag
的选择
a、switchChain5
<!--选择组件-链路tag的选择-->
<chain name="switchChain5">
SWITCH(SwitchACmp).to(NormalBCmp, switchChain5Sub.tag("TagC"));
</chain>
<chain name="switchChain5Sub">
THEN(NormalBCmp, NormalCCmp);
</chain>
b、SwitchACmp
无论返回
switchChain5Sub
还是tag:TagC
都能选择到后面的链路。
/**
* SwitchACmp
*
* @author zhangyujin
* @date 2023/7/24
*/
@Slf4j
@LiteflowComponent("SwitchACmp")
public class SwitchACmp extends NodeSwitchComponent {
@Override
public String processSwitch() throws Exception {
log.info("SwitchACmp executed!");
// 5、链路tag的选择
return "tag:TagC";
}
}
c、单元测试
@DisplayName("选择组件-链路tag的选择")
@Test
public void test6(){
DemoContext demoContext = new DemoContext();
demoContext.setBusinessType("选择组件");
LiteflowResponse response = flowExecutor.execute2Resp("switchChain5", demoContext);
log.info("[LiteFlowTest#选择组件] res:{}", JsonUtils.toString(response));
}
}
3、条件组件
需要继承
NodeIfComponent
:
1)ifChain1
<!--条件组件-->
<chain name="ifChain1">
<!-- XIfCmd 就是IF组件,为真,执行 IfNormalBCmp,为假,执行 IfNormalCCmp:-->
IF(XIfCmd, IfNormalBCmp, IfNormalCCmp);
</chain>
2)XIfCmd
/**
* XIFCmd
*
* @author zhangyujin
* @date 2023/7/24
*/
@LiteflowComponent("XIfCmd")
@Slf4j
public class XIfCmd extends NodeIfComponent {
@Override
public boolean processIf() throws Exception {
//do your biz
return true;
}
}
3)单元测试
@DisplayName("条件组件")
@Test
public void test7(){
DemoContext demoContext = new DemoContext();
demoContext.setBusinessType("条件组件");
LiteflowResponse response = flowExecutor.execute2Resp("ifChain1", demoContext);
log.info("[LiteFlowTest#条件组件] res:{}", JsonUtils.toString(response));
}
三、EL
规则
1、串行编排
如果你要依次执行
a
,b
,c
,d
四个组件,你可以用THEN
关键字,需要注意的是,THEN
必须大写。
<chain name="chain1">
THEN(a, b, c, d);
</chain>
2、并行编排
1)最基本的例子
如果你要并行执行a,b,c三个组件,你可以用
WHEN
关键字,需要注意的是,WHEN
必须大写。
<chain name="chain1">
WHEN(a, b, c);
</chain>
2)和串行嵌套起来(一)
接下来,让我们把
THEN
和WHEN
结合起来用,看一个示例:
<chain name="chain1">
THEN(
a,
WHEN(b, c, d),
e
);
</chain>
3)和串行嵌套起来(二)
<chain name="chain1">
THEN(
a,
WHEN(b, THEN(c, d)),
e
);
</chain>
4)忽略错误
WHEN
关键字提供了一个子关键字ignoreError
(默认为false
)来提供忽略错误的特性,用法如下:假设b
,c
,d
中任一一个节点有异常,那么最终e
仍旧会被执行。
<chain name="chain1">
THEN(
a,
WHEN(b, c, d).ignoreError(true),
e
);
</chain>
5)任一节点先执行完则忽略其他
WHEN
关键字提供了一个子关键字any
(默认为false
)用来提供并行流程中,任一条分支先执行完即忽略其他分支,继续执行的特性。用法如下 (假设e
节点先执行完,那么不管其他分支是否执行完,会立马执行节点f
)
<chain name="chain1">
THEN(
a,
WHEN(b, THEN(c, d), e).any(true),
f
);
</chain>
6)关于组的概念
用
EL
表达式,其实你写2
个不同的WHEN
就是2个组。比如:(a
b
是一个并行组,而c
d
是另一个并行组。)
<chain name="chain1">
THEN(
WHEN(a, b),
WHEN(c, d)
);
</chain>
3、选择编排
我们在写业务逻辑的时候,通常会碰到选择性问题,即,如果返回结果1,则进入
A
流程,如果返回结果2,则进入B
流程,如果返回结果3,则进入C
流程。在有些流程定义中也被定义为排他网关。
1)最基本的例子
如果,根据组件a,来选择执行b,c,d中的一个,你可以如下声明:
<chain name="chain1">
SWITCH(a).to(b, c, d);
</chain>
2)DEFAULT
关键字
用法为
SWITCH
…TO
…DEFAULT
。如表达式的x
如果返回非a
,b
,c
中的一个,则默认选择到y
。当然DEFAULT
里面也可以是一个表达式。
<chain name="chain1">
SWITCH(x).TO(a, b, c).DEFAULT(y);
</chain>
3)和 THEN
,WHEN
嵌套起来
<chain name="chain1">
THEN(
a,
WHEN(
b,
SWITCH(c).to(d,e)
),
f
);
</chain>
4)选择编排中的 id
语法
LiteFlow
中规定,每个表达式都可以有一个id
值,你可以设置id
值来设置一个表达式的id
值。然后在选择组件里返回这个id即可。用法如下:
<chain name="chain1">
THEN(
a,
SWITCH(b).to(
c,
THEN(d, e).id("t1")
),
f
);
</chain>
5)选择编排中的 tag
语法
事实上,除了给表达式赋值
id
属性之外,你还可以给表达式赋值tag
属性。用法如下:
<chain name="chain1">
THEN(
a,
SWITCH(b).to(
c,
THEN(d, e).tag("t1")
),
f
);
</chain>
4、条件编排
1) IF
的二元表达式
其中
x
为条件节点,为真的情况下,执行链路就为x
->a
->b
,为假链路就为x
->b
。
<chain name="chain1">
THEN(
IF(x, a),
b
);
</chain>
2) IF
的三元表达式
其中
x
为条件节点,为真的情况下,执行链路就为x
- >a
->c
,为假链路就为x
->b
->c
。
<chain name="chain1">
THEN(
IF(x, a, b),
c
);
</chain>
3)ELSE
表达式
LiteFlow
也提供了ELSE
表达式,IF
的二元表达式 +ELSE
表达式等同于IF
三元表达式,比如:
<chain name="chain1">
IF(x, a).ELSE(b);
</chain>
<chain name="chain1">
IF(x, a, b);
</chain>
4)ELIF
表达式
ELIF
关键字的用法其实和java
语言的else
if
类似,可以跟多个,和IF
二元表达式参数一样,一般最后还会跟个ELSE
,用于多重条件的判断:
<chain name="chain1">
IF(x1, a).ELIF(x2, b).ELIF(x3, c).ELIF(x4, d).ELSE(THEN(m, n));
</chain>
4、捕获异常表达式
1)基本用法
语法表示,如果a组件出现异常并抛出,则不会执行b组件,会直接执行c组件, 在
c
组件中,可以通过this.getSlot().getException()
来获取异常。同时,当用了
CATCH
表达式之后,即便在CATCH
包裹的组件有异常抛出,整个流程返回的LiteflowResponse
中的isSuccess
方法仍然为true
,getCause
中也没有任何的Exception
。如果你写过java
程序,应该会对这样的机制很容易理解。因为异常已经被你自己处理掉了。
<chain name="chain1">
CATCH(
THEN(a,b)
).DO(c)
</chain>
5、与或非表达式
1)AND
如果
x
和y
都为true
,则为真,会执行组件a
,如果x
和y
有一个为false
,则执行b
。
<chain name="chain1">
IF(AND(x,y), a, b);
</chain>
2)OR
只要
x
和y
中的一个为true
,则为真,否则为假。
<chain name="chain1">
IF(OR(x,y), a, b);
</chain>
3)NOT
NOT
就是非的意思,比如:如果x
返回true
,则经过非运算后,为假,执行b
,如果 x 返回false
,则经过非运算后,为真,执行a。
<chain name="chain1">
IF(NOT(x), a, b);
</chain>
6、使用子流程
<chain name="chain4">
THEN(
A, B,
WHEN(
THEN(C, WHEN(J, K)),
D,
THEN(H, I)
),
SWITCH(X).to(
M,
N,
WHEN(Q, THEN(P, R)).id("w01")
),
Z
);
</chain>
<chain name="mainChain">
THEN(
A, B,
WHEN(chain1, D, chain2),
SWITCH(X).to(M, N, chain3),
z
);
</chain>
<chain name="chain1">
THEN(C, WHEN(J, K));
</chain>
<chain name="chain2">
THEN(H, I);
</chain>
<chain name="chain3">
WHEN(Q, THEN(P, R)).id("w01");
</chain>
7、使用子变量
<chain>
t1 = THEN(C, WHEN(J, K));
w1 = WHEN(Q, THEN(P, R)).id("w01");
t2 = THEN(H, I);
THEN(
A, B,
WHEN(t1, D, t2),
SWITCH(X).to(M, N, w1),
Z
);
</chain>
8、验证规则
public void yourMethod() {
boolean isValid = LiteFlowChainELBuilder.validate("THEN(a, b, h)");
...
}
四、数据上下文
对于数据上下文而言,初始化动作是由框架来处理的。也就是说,在你执行第一个组件时,上下文对象里面是没有用户数据的。而你的流程入参是用
this.getRequestData()
获取的,这部分不包含在上下文里面。
1)this.getRequestData()
@DisplayName("条件组件")
@Test
public void test7(){
DemoContext demoContext = new DemoContext();
demoContext.setBusinessType("条件组件");
LiteflowResponse response = flowExecutor.execute2Resp("ifChain1", demoContext);
log.info("[LiteFlowTest#条件组件] res:{}", JsonUtils.toString(response));
}
// 4、获取流程的初始参数。。
Object requestData = this.getRequestData();
log.info("[BCmp#process] this.getRequestData():{}", JSON.toJSONString(requestData));
2)this.getContextBean()
传入之后,
LiteFlow
会在调用时进行初始化,给这个上下文分配唯一的实例。你在组件之中可以这样去获得这个上下文实例:
@DisplayName("高级特性-组件化参数")
@Test
public void testFeat001(){
DemoContext demoContext = new DemoContext();
demoContext.setBusinessType("高级特性");
LiteflowResponse response = flowExecutor.execute2Resp("featChain1", demoContext,
DemoContext.class,
UserContext.class);
log.info("[LiteFlowTest#高级特性] res:{}", JsonUtils.toString(response));
}
@Override
public void process() {
log.info("[FeatBCmp#]process");
// 1、高级特性,组件参数
NodePramsBO nodePrams = this.getCmpData(NodePramsBO.class);
log.info("[FeatBCmp#process] this.getCmpData():{}", nodePrams);
DemoContext requestData = this.getRequestData();
log.info("[FeatBCmp#process] requestData:{}", JsonUtils.toString(requestData));
DemoContext demoContext = this.getContextBean(DemoContext.class);
log.info("[FeatBCmp#process] demoContext:{}", JsonUtils.toString(demoContext));
UserContext userContext = this.getContextBean(UserContext.class);
log.info("[FeatBCmp#process] userContext:{}", JsonUtils.toString(userContext));
}
this.getCmpData():{"age":20,"name":"r***"}
requestData:{"businessType":"高级特性"}
demoContext:{}
userContext:{}
五、执行器
1、返回类型: LiteflowResponse
//第一个参数为流程ID,第二个参数为流程入参,后面可以传入多个上下文class
public LiteflowResponse execute2Resp(String chainId, Object param, Class<?>... contextBeanClazzArray)
//第一个参数为流程ID,第二个参数为流程入参,后面可以传入多个上下文的Bean
public LiteflowResponse execute2Resp(String chainId, Object param, Object... contextBeanArray)
1)流程执行是否成功
boolean isSuccess = response.isSuccess();
2)获取异常信息
if (!response.isSuccess()){
Exception e = response.getCause();
}
3)获得执行步骤详细信息
Map<String, CmpStep> stepMap = response.getExecuteSteps();
Queue<CmpStep> stepQueue = response.getExecuteStepQueue();
4)上下文数据
CustomContext context = response.getContextBean(CustomContext.class);
5)获得步骤字符串信息
String stepStr = response.getExecuteStepStrWithTime();
六、用代码动态构造规则
值得提一下的是,由于用构造模式是一个链路一个链路的添加,如果你用了子流程,如果chain1依赖chain2,那么chain2要先构建。否则会报错。
LiteFlowChainELBuilder.createChain().setChainName("chain2").setEL(
"THEN(a, b, WHEN(c, d))"
).build();
七、高级特性
1、本地规则文件监听
在
LiteFlow
的配置liteflow.rule-source
中,不光可以配置项目内的规则文件,其实还可以配置本地磁盘上的文件的只需要配置
liteflow.enable-monitor-file=true
,即可开启自动对文件的监听特性。文件改动,你的项目无需做任何事,立马自动刷新整个规则。非常方便。
liteflow.rule-source=/Users/bryan31/liteflow/test/flow.xml
2、组件参数
1)featChain1
<!--高级特性-组件参数-->
<chain name="featChain1">
cmpBData = '{"name":"rose","age":20}';
THEN(FeatACmp, FeatBCmp.data(cmpBData), FeatCCmp);
</chain>
2)NodePramsBO
/**
* NodePramsBO
*
* @author zhangyujin
* @date 2023/7/24
*/
@Accessors(chain = true)
@Data
public class NodePramsBO {
/**
* name
*/
private String name;
/**
* age
*/
private Integer age;
}
3)FeatBCmp
/**
* FeatBCmp
*
* @author zhangyujin
* @date 2023 /7/24
*/
@Slf4j
@LiteflowComponent("FeatBCmp")
public class FeatBCmp extends NodeComponent {
/**
* 流程执行
*/
@Override
public void process() {
log.info("[FeatBCmp#]process");
// 1、高级特性,组件参数
NodePramsBO nodePrams = this.getCmpData(NodePramsBO.class);
log.info("[FeatBCmp#process] this.getCmpData():{}", nodePrams);
}
}
4)单元测试
@DisplayName("高级特性-组件化参数")
@Test
public void testFeat001(){
DemoContext demoContext = new DemoContext();
demoContext.setBusinessType("高级特性");
LiteflowResponse response = flowExecutor.execute2Resp("featChain1", demoContext);
log.info("[LiteFlowTest#高级特性] res:{}", JsonUtils.toString(response));
}
3、组件别名
1)Springboot
@LiteflowComponent(id = "a", name = "组件A")
public class ACmp extends NodeComponent {
@Override
public void process() {
System.out.println("ACmp executed!");
}
}
2)规则文件方式定义组件
<node id="a" name="组件A" class="com.yomahub.liteflow.test.config.cmp.ACmp"/>
<node id="b" name="组件B" class="com.yomahub.liteflow.test.config.cmp.BCmp"/>
4、组件重试
LiteFLow
支持组件的重试,其中又分全局重试和单个组件重试,下面一一说明
1)全局重试
如果需要全局重试,你需要做如下配置:
流程继续
1、如果组件里覆盖了
isContinueOnError
,设为true的话,那流程会继续。2、异步流程的话,如果
WHEN
上配置了ignoreError
为true
的 话(默为false
),则下一个不同并行组会继续。关于ignoreError
的定义和使用
liteflow.retry-count=3
2)单个组件重试
如果这个组件抛出的异常是
NullPointerException
或者IllegalArgumentException
(或者是这两个Exception类的子类),则会进行最多5次的尝试,最后一遍再不成功,那就会真正抛出异常。
@LiteflowComponent("c")
// @LiteflowRetry(5)
@LiteflowRetry(retry = 5, forExceptions = {NullPointerException.class,IllegalArgumentException.class})
public class CCmp extends NodeComponent {
@Override
public void process() {
//do your biz
}
}
3)全局重试和单个组件重试都定义的情况下
如果在2者都定义的情况下,优先取单个组件的重试配置。没有的情况下,再取全局配置。
5、平滑热刷新
1)主动调用代码刷新
如果你使用了数据库作为规则文件的存储方式,或是你自己实现了自定义配置源,那么LiteFlow还提供了一种基于代码刷新的方式。
flowExecutor.reloadRule();
1、这样刷新是全量刷新,不过各位同学不用担心其性能,经测试,LiteFlow
框架一秒可以刷新 1000
条规则左右,这都是一些cpu
级别的操作,如果你规则没有上大几千,几w条,那么推荐这种方式。
2、如果你的应用是多节点部署的,必须在每个节点上都要刷新,因为规则是存储在 jvm
内存里的。这就意味着,如果你把刷新规则做成一个rpc接口(诸如 dubbo
接口之类的),那么 rpc
接口只会调用到其中一个节点,也就是说,只会有一个节点的规则会刷新。
正确的做法是:利用 mq
发一个消息,让各个节点去监听到,进行刷新。
2)单独刷新某一个规则
如果你的规则比较多,成千上万条,又或者你就是不想全量刷新。希望单独刷新某个改动的规则。
LiteFlowChainELBuilder.createChain().setChainName("chain2").setEL(
"THEN(a, b, WHEN(c, d))"
).build();
1、既然是指定刷新,那么必须你要获取到改动的 EL
内容,然后再利用动态代码构建重新 build
下就可以了,这种方式会自动替换缓存中已有的规则。这种方式不用在 build
之前销毁流程。
2、如果是多服务节点部署的情况下,还是要遵循每个节点要都刷新,上面已经说明具体建议的方式。这里不再赘述
6、组件切面
1)全局切面
全局切面是针对于所有的组件,进行切面。你只需要做如下实现即可:
@Component
public class CmpAspect implements ICmpAroundAspect {
@Override
public void beforeProcess(String nodeId, Slot slot) {
YourContextBean context = slot.getContextBean(YourContextBean.class);
//before business
}
@Override
public void afterProcess(String nodeId, Slot slot) {
YourContextBean context = slot.getContextBean(YourContextBean.class);
//after business
}
}
2)Aspect
的切面
LiteFlow
同时也支持了Spring
Aspect
的切面,你可以用@Aspect
标注对任意包,任意规则内的组件进行切面
@Aspect
public class CustomAspect {
@Pointcut("execution(* com.yomahub.liteflow.test.aop.cmp1.*.process())")
public void cut() {
}
@Around("cut()")
public Object around(ProceedingJoinPoint jp) throws Throwable {
//do before business
Object returnObj = jp.proceed();
//do after business
return returnObj;
}
}
7、步骤信息
LiteflowResponse response = flowExecutor.execute2Resp("chain1", "初始参数", CustomContext.class);
Map<String, CmpStep> stepMap = response.getExecuteSteps();
//或者你也可以通过以下的语句来获得一个步骤队列
Queue<CmpStep> stepQueue = response.getExecuteStepQueue();
参数 | 说明 |
---|---|
isSuccess |
此组件是否执行成功 |
getNodeId : |
获得组件Id |
getNodeName |
获得组件名称 |
getTag |
获得组件标签值 |
getTimeSpent |
获得组件的耗时,单位为毫秒 |
getException |
获取此组件抛出的异常,如果 isSuccess 为false的话。但是这里要注意下:有 exception ,success 一定为false ,但是 success 为 false ,不一定有 exception ,因为有可能没执行到,或者没执行结束 (any 的情况)。 |
8、异常
如果你的业务中有获取异常Code的需求,则你自定义的异常需要实现
LiteFlow
提供的LiteFlowException
接口:
public class YourException extends LiteFlowException {
public YourException(String code, String message) {
super(code, message);
}
}
LiteflowResponse response = flowExecutor.execute2Resp("chain1", 初始参数, CustomContext.class);
if (!response.isSuccess()){
Exception e = response.getCause();
String code = response.getCode();
String message = response.getMessage();
}
9、打印信息详解
1)流程执行中打印
[ea1af4810cc849d58948d091d858b29a]:[O]start component[ACmp] execution
[ea1af4810cc849d58948d091d858b29a]:[O]start component[BCmp] execution
[ea1af4810cc849d58948d091d858b29a]:[X]start component[CCmp] execution
[ea1af4810cc849d58948d091d858b29a]:[O]start component[DCmp] execution
1、其中最前面的一串序号,代表这个请求的请求ID,一个请求无论经历了多少个组件,他们的请求ID都是一致的,你可以根据这个ID在日志中进行快速定位进行排查。
2、在后面会跟着一个[O
]或者[X
],[O
]代表了执行了这个组件的主要逻辑,[X
]代表因为isAccess()
返回了false所以没进入这个组件的主要逻辑。
3、如果你不希望打印这种中间执行信息,LiteFlow
提供了配置项,你需要作如下设置:
liteflow.print-execution-log=false
##
2)打印步骤信息
在执行完一个链路之后,框架会自动打出这一条流程的执行步骤顺序,如下所示: (这里的表达形式为:
组件ID<耗时毫秒>
)
a<100>==>c<10>==>m<0>==>q<200>==>p<300>==>p1<0>==>g<305>
如果你希望在打印流程链的时候增加别名描述,那你需要在定义组件的时候设置name属性,具体请参照[组件别名]
a[组件A]<100>==>b[组件B]<0>==>m[组件M]<256>
10、自定义请求 Id
其中日志主体中最前面的就是
RequestId
,一个请求中的requestId
都是相同的,方便你进行日志查找。
2022-07-03 11:15:00.196 INFO 71275 --- [ main] com.yomahub.liteflow.flow.element.Node : [067a0baa6d434de3a8ccafa4b1506562]:[O]start component[a] execution
2022-07-03 11:15:00.204 INFO 71275 --- [ main] com.yomahub.liteflow.flow.element.Node : [067a0baa6d434de3a8ccafa4b1506562]:[O]start component[b] execution
2022-07-03 11:15:00.218 INFO 71275 --- [lf-when-thead-0] com.yomahub.liteflow.flow.element.Node : [067a0baa6d434de3a8ccafa4b1506562]:[O]start component[c] execution
2022-07-03 11:15:00.220 INFO 71275 --- [lf-when-thead-1] com.yomahub.liteflow.flow.element.Node : [067a0baa6d434de3a8ccafa4b1506562]:[O]start component[d] execution
2022-07-03 11:15:00.220 INFO 71275 --- [ main] com.yomahub.liteflow.slot.Slot : [067a0baa6d434de3a8ccafa4b1506562]:CHAIN_NAME[chain1]
a<1>==>b<0>==>c<0>==>d<0>
2022-07-03 11:15:00.221 INFO 71275 --- [ main] com.yomahub.liteflow.slot.DataBus : [067a0baa6d434de3a8ccafa4b1506562]:slot[0] released
1)按照自己的规则生成
你只需要要声明一个类,然后实现
RequestIdGenerator
接口即可:
public class CustomRequestIdGenerator implements RequestIdGenerator {
@Override
public String generate() {
return System.nanoTime();
}
}
然后在 LiteFlow
的配置文件里声明下你这个类即可:
liteflow.request-id-generator-class=com.yomahub.liteflow.test.requestId.config.CustomRequestIdGenerator
2)传入已有的 requestId
/ traceId
LiteflowResponse response = flowExecutor.execute2RespWithRid("chain1", arg, "T001234", YourContext.class);
11、异步线程池自定义
LiteFlow
自己默认有全局线程池,并且线程池的大小等参数可以通过设置以下参数来进行设置:
liteflow.when-max-wait-seconds=15
liteflow.when-max-workers=16
liteflow.when-queue-limit=512
1)自定义全局线程池
但是如果你要对线程池有特殊化的要求,
LiteFlow
也支持自定义线程池的设置。需要注意的是,自定义线程池只适用于并行组件,这个参数对于同步组件来说并无作用。而且一旦设置了你自定义的线程池,那么以上参数将不会再有用。对于线程池的所有参数的定义,都取决于你自己了。
public class CustomThreadBuilder implements ExecutorBuilder {
@Override
public ExecutorService buildExecutor() {
return Executors.newCachedThreadPool();
}
}
liteflow.thread-executor-class=com.yomahub.liteflow.test.customThreadPool.CustomThreadBuilder
2)WHEN
级别的单独线程池
<chain name="chain1">
WHEN(a, b).threadPool("com.yomahub.liteflow.test.customWhenThreadPool.CustomThreadExecutor1");
</chain>
<chain name="chain2">
WHEN(c, d).threadPool("com.yomahub.liteflow.test.customWhenThreadPool.CustomThreadExecutor2");
</chain>
12、简单监控
LiteFlow
提供了简单的监控,目前只统计一个指标:每个组件的平均耗时。默认每5分钟会打印一次(可以自己调整),并且是根据耗时时长倒序排的。
#是否启用监控
liteflow.monitor.enable-log=false
#监控队列的大小
liteflow.monitor.queue-limit=200
#监控延迟多少毫秒打印
liteflow.monitor.delay=300000
#监控每隔多少毫秒打印
liteflow.monitor.period=300000