前言

Github:https://github.com/HealerJean

博客:http://blog.healerjean.com

1、分布式事务

1.1、两阶段提交 2PC

1.2、补偿事务TCC

1.3、本地事件表+消息队列

1.4、MQ事务消息

2、MQ本地时间表+消息队列

2.1、理解

2.1.1、场景

以用户注册场景为例,需求是新用户注册之后给该用户新增一条积分记录。 假设有用户和积分两个服务,用户服务使用数据库DBI ,积分服务使用数据库DB2.

服务调用者只需使用新增用户服务,由该服务内部保证既在DBI 新增了用户记录,又在DB2 新增了积分记录。显然这是一个分布式事务的问题。

2.1.2、解决思路

其实这个问题的核心是DBI 中的事务完成之后需要协调通知DB2 执行事务,这可以通过消息队列来实现。比如在用户服务成功保存用户记录之后向消息队列的某个主题中发送一条用户创建消息,积分系统需要监昕该主题, 一旦接收到用户创建的消息,积分系统就在DB2 中为该用户创建一条积分记录

2.1.3、问题

1、用户服务在保存用户记录后还没来得及向消息队列发送消息就岩机了,如何保证新增了用户记录后一定有消息发送到消息队列呢?

答:数据库定时任务一直扫描

2、积分服务接收到消息后还没来得及保存积分记录就告机了,如何保证系统重启后不丢失积分记录呢?

答:接收到消息,保存到数据库的中间,系统挂了,但是消息不签收

2.2、用户服务

2.2.1、数据准备与配置

2.2.1.1、数据库表

use hlj_activemq_trans_user;
drop table if exists t_event;
create table t_event
(
    id          bigint(20) unsigned not null auto_increment comment '主键',
    type        varchar(30)         not null default '' comment '事件的类型:比如新增用户、新增积分',
    process     varchar(30)         not null default '' comment '事件进行到的环节:比如,新建、已发布、已处理',
    content     varchar(255)        not null default '' comment '事件的内容,用于保存事件发生时需要传递的数据',
    create_time datetime            not null default current_timestamp comment '创建时间',
    update_time datetime            not null default current_timestamp on update current_timestamp comment '更新时间',
    primary key (id)
) comment 'mq分布式事务-事件表';


drop table if exists  t_user;
create table t_user
(
    id        bigint(20) unsigned not null auto_increment comment '主键',
    user_name varchar(64)         not null default '' comment '用户名',
    primary key (id)
) comment 'mq分布式事务-用户表';




2.2.1.2、activemq配置

server.port=7777
logging.config=classpath:log4j2.xml

########################################################
### jpa
########################################################
spring.jpa.database=MYSQL
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto =none
spring.jpa.hibernate.naming-strategy=org.hibernate.cfg.ImprovedNamingStrategy
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect




########################################################
### Active
########################################################
# ActiveMQ通讯地址
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
#最大连接数
spring.activemq.pool.maxConnections=2
#空闲时间
spring.activemq.pool.idleTimeout=30000
#是否启用内存模式(就是不安装MQ,项目启动时同时启动一个MQ实例)
spring.activemq.in-memory=false
##信任所有的包
spring.activemq.packages.trust-al=true
# 是否替换默认的连接池,使用ActiveMQ的连接池需引入的依赖
spring.activemq.pool.enabled=false




hlj.datasource.url=jdbc:mysql://localhost:3306/hlj_activemq_trans_user?serverTimezone=CTT&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true
hlj.datasource.username=root
hlj.datasource.password=123456
hlj.datasource.driver-class-name=com.mysql.jdbc.Driver


####################################
### mybatis
####################################
mybatis.mapper-locations=classpath*:/mapper/mysql/**/*.xml
mybatis.type-aliases-package=com.hlj.proj.data.pojo


2.2.2、用户Service接口与实现

2.2.2.1、接口

public interface UserService {

     UserDTO addUser(UserDTO userDTO);

}

2.2.2.2、实现

/**
 * @author HealerJean
 * @ClassName UserServiceImpl
 * @date 2019/9/9  14:19.
 */
@Service
public class UserServiceImpl implements UserService {

    @Autowired
    private TuserDao tuserDao;

    @Autowired
    private UserEventService userEventService;

    /**
     * 添加用户
     * 1、添加用户
     * 2、添加事件
     */
    @Override
    public UserDTO addUser(UserDTO userDTO) {

        // 1、添加用户
        Tuser tuser = new Tuser();
        tuser.setUserName(userDTO.getName());
        tuserDao.insertSelective(tuser);

        // 2、添加事件
        EventDTO eventDTO = new EventDTO();
        eventDTO.setType(BusinessEnum.EventType.新增用户.code);
        eventDTO.setProcess(BusinessEnum.EventProcess.新建.code);
        PointDTO pointDTO = new PointDTO();
        pointDTO.setUserId(tuser.getId());
        pointDTO.setPointAmount(userDTO.getPointAmount());
        eventDTO.setContent(JsonUtils.toJsonString(pointDTO));
        userEventService.addEvent(eventDTO);

        userDTO.setUserId(tuser.getId());
        return userDTO;
    }
}

2.2.3、事件接口与实现

2.2.3.1、事件接口

public interface UserEventService {

    /**
     * 添加事件
     * @param eventDTO
     */
    void addEvent(EventDTO eventDTO);

    /**
     * 执行事件
     */
    void executeAddUserEvent(EventDTO eventDTO);
}

2.2.3.2、事件实现

/**
 * @author HealerJean
 * @ClassName UserEventService
 * @date 2019/9/9  14:28.
 * @Description
 */
@Service
@Slf4j
public class UserEventServiceImpl implements UserEventService {

    @Autowired
    private TeventManager teventManager;

    @Autowired
    private JMSProducer jmsProducer;

    /**
     * 添加事件
     */
    @Override
    public void addEvent(EventDTO eventDTO) {
        Tevent tevent = new Tevent();
        tevent.setType(eventDTO.getType());
        tevent.setProcess(eventDTO.getProcess());
        tevent.setContent(eventDTO.getContent());
        teventManager.insertSelective(tevent);
    }

    /**
     * 执行事件
     * 1、activemq 发送queue消息
     */
    @Override
    public void executeAddUserEvent(EventDTO eventDTO) {
        // 1、activemq 发送topic消息
        String messageContent = eventDTO.getContent();
        Destination queue = new ActiveMQQueue(JmsConstant.QUEUE_DISTRIBUTE_TRANSACTION);
        jmsProducer.sendMessage(queue, messageContent);
        log.info("发送topic消息:{}", eventDTO);

        // 2、更新事件状态
        Tevent tevent = teventManager.findById(eventDTO.getEventId());
        tevent.setProcess(BusinessEnum.EventProcess.已发布.code);
        teventManager.updateSelective(tevent);
    }


}

2.2.4、常亮与枚举数据

2.2.4.1、枚举

public interface BusinessEnum {

    /**
     * 事件类型
     */
    enum EventType {

        新增用户 ("New_User","新增用户"),
        新增积分 ("New_Point","新增积分"),

        ;

        public  String code ;
        public String desc ;

        EventType(String code, String desc) {
            this.code = code;
            this.desc = desc;
        }
    }


    /**
     * 事件处理过程
     */
    enum EventProcess {

        新建 ("New","新建"),
        已发布 ("Publish","已发布"),
        ;

        public String code ;
        public String desc ;


        EventProcess(String code, String desc) {
            this.code = code;
            this.desc = desc;
        }
    }



}

2.2.4.2、常亮

public class JmsConstant {

    public static final String QUEUE_DISTRIBUTE_TRANSACTION = "DistributeTransaction" ;

}

2.2.5、定时器

@Component
@Slf4j
public class UserSchedule {

    @Autowired
    private UserEventService userEventService;

    @Autowired
    private TeventManager teventManager;

    @Scheduled(cron = "0 */1 * * * ?")
    public void executeEvent() {
        log.info("定时器执行--------新增用户事件处理");
        TeventQuery teventQuery = new TeventQuery();
        teventQuery.setType(BusinessEnum.EventType.新增用户.code);
        teventQuery.setProcess(BusinessEnum.EventProcess.新建.code);
        List<Tevent> tevents = teventManager.queryList(teventQuery);
        if (!EmptyUtil.isEmpty(tevents)) {
            System.out.println("定时器执行--------新增的用户记录总数为" + tevents.size());
            List<EventDTO> eventDTOS = tevents.stream().map(item -> {
                EventDTO eventDTO = new EventDTO();
                eventDTO.setEventId(item.getId());
                eventDTO.setType(item.getType());
                eventDTO.setProcess(item.getProcess());
                eventDTO.setContent(item.getContent());
                return eventDTO;
            }).collect(Collectors.toList());

            for(EventDTO eventDTO : eventDTOS){
                try {
                    userEventService.executeAddUserEvent(eventDTO);
                }catch (Exception e){
                    log.error("定时器执行-----处理事件为{}发送异常,异常信息",e);
                    continue;
                }
                log.info("定时器执行-处理事件:{}处理成功", eventDTO);
            }

        } else {
            log.info("没有新增的用户记录,无需处理");
        }

    }

}

2.2.6、测试Controller

@RestController
@RequestMapping("hlj")
public class UserController {

    @Autowired
    private UserService userService;

    @GetMapping("user/add")
    public ResponseBean addUser(UserDTO userDTO) {
        userService.addUser(userDTO);
        return ResponseBean.buildSuccess(userDTO);
    }
}

GET http://localhost:7777/hlj/user/add?name=HealerJean

2.3、积分服务

2.3.1、数据准备与配置

2.3.1.1、数据表



use hlj_activemq_trans_point;
drop table if exists t_event;
create table t_event
(
    id          bigint(20) unsigned not null auto_increment comment '主键',
    type        varchar(30)         not null default '' comment '事件的类型:比如新增用户、新增积分',
    process     varchar(30)         not null default '' comment '事件进行到的环节:比如,新建、已发布、已处理',
    content     varchar(255)        not null default '' comment '事件的内容,用于保存事件发生时需要传递的数据',
    create_time datetime            not null default current_timestamp comment '创建时间',
    update_time datetime            not null default current_timestamp on update current_timestamp comment '更新时间',
    primary key (id)
) comment 'mq分布式事务-事件表';


drop table if exists  t_point;
create table t_point
(
    id      bigint(20) unsigned not null auto_increment comment '主键',
    user_id bigint(20) unsigned not null default 0 comment '关联的用户id',
    amount  decimal(19, 2)      not null default 0 comment '积分金额',
    primary key (id)
)comment 'mq分布式事务-积分表';


2.3.1.2、数据配置

server.port=8888
logging.config=classpath:log4j2.xml

########################################################
### jpa
########################################################
spring.jpa.database=MYSQL
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto =none
spring.jpa.hibernate.naming-strategy=org.hibernate.cfg.ImprovedNamingStrategy
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect




########################################################
### Active
########################################################
# ActiveMQ通讯地址
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
#最大连接数
spring.activemq.pool.maxConnections=2
#空闲时间
spring.activemq.pool.idleTimeout=30000
#是否启用内存模式(就是不安装MQ,项目启动时同时启动一个MQ实例)
spring.activemq.in-memory=false
##信任所有的包
spring.activemq.packages.trust-al=true
# 是否替换默认的连接池,使用ActiveMQ的连接池需引入的依赖
spring.activemq.pool.enabled=false





hlj.datasource.url=jdbc:mysql://localhost:3306/hlj_activemq_trans_point?serverTimezone=CTT&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true
hlj.datasource.username=root
hlj.datasource.password=123456
hlj.datasource.driver-class-name=com.mysql.jdbc.Driver


####################################
### mybatis
####################################
mybatis.mapper-locations=classpath*:/mapper/mysql/**/*.xml
mybatis.type-aliases-package=com.hlj.proj.data.pojo


2.3.2、积分接口与实现

2.3.2.1、积分接口

public interface PointService {

    /**
     * 给用户添加积分
     */
    void addPoint(PointDTO pointDTO);

}

2.3.2.2、积分接口实现

@Service
@Slf4j
public class PointServiceImpl implements PointService {

    @Autowired
    private TpointManager tpointManager;

    /**
     * 给用户添加积分
     */
    @Override
    public void addPoint(PointDTO pointDTO) {
        Tpoint tpoint = new Tpoint();
        tpoint.setUserId(pointDTO.getUserId());
        tpoint.setAmount(pointDTO.getPointAmount());
        tpointManager.insertSelective(tpoint);

    }
}

2.3.3、事件接口与实现

2.3.3.1、事件接口


/**
 * @author HealerJean
 * @ClassName PointEventService
 * @date 2019/9/9  15:21.
 * @Description
 */
public interface PointEventService {

    /**
     * 添加事件
     * @param eventDTO
     */
    void addEvent(EventDTO eventDTO);

    /**
     * 执行事件
     */
    void executeAddPointEvent(EventDTO eventDTO);


}

2.3.3.2、事件实现

@Service
@Slf4j
public class PointEventServiceImpl implements PointEventService {

    @Autowired
    private TeventManager teventManager;
    @Autowired
    private PointService  pointService ;


    /**
     * 添加事件
     */
    @Override
    public void addEvent(EventDTO eventDTO) {
        Tevent tevent = new Tevent();
        tevent.setType(eventDTO.getType());
        tevent.setProcess(eventDTO.getProcess());
        tevent.setContent(eventDTO.getContent());
        teventManager.insertSelective(tevent);
    }

    /**
     * 执行事件
     * 1、添加积分记录
     * 2、修改事件状态
     */
    @Override
    public void executeAddPointEvent(EventDTO eventDTO) {
        PointDTO  pointDTO = JsonUtils.jsonToObject(eventDTO.getContent(),PointDTO.class);
        pointService.addPoint(pointDTO);

        Tevent tevent = teventManager.findById(eventDTO.getEventId());
        tevent.setProcess(BusinessEnum.EventProcess.已处理.code);
        teventManager.updateSelective(tevent);
    }

}

2.3.4、常亮与枚举数据

2.3.4.1、枚举

public interface BusinessEnum {

    /**
     * 事件类型
     */
    enum EventType {

        新增积分 ("New_Point","新增积分"),

        ;

        public  String code ;
        public String desc ;

        EventType(String code, String desc) {
            this.code = code;
            this.desc = desc;
        }
    }


    /**
     * 事件处理过程
     */
    enum EventProcess {

        已发布 ("Publish","已发布"),
        已处理 ("Processed","已处理"),
        ;

        public String code ;
        public String desc ;


        EventProcess(String code, String desc) {
            this.code = code;
            this.desc = desc;
        }
    }
}

2.3.4.2、常亮

public class JmsConstant {


    public static final String QUEUE_DISTRIBUTE_TRANSACTION = "DistributeTransaction" ;

}

2.3.5、activemq消息监听

@Service
@Slf4j
public class PointMessageListenter {

    @Autowired
    private PointEventService pointEventService ;

    
    @JmsListener(destination = JmsConstant.QUEUE_DISTRIBUTE_TRANSACTION)
    public void listenQueue(String message) {
        log.info("接收到topic消息:" + message);
        EventDTO eventDTO = new EventDTO();
        eventDTO.setType(BusinessEnum.EventType.新增积分.code);
        eventDTO.setProcess(BusinessEnum.EventProcess.已发布.code);
        eventDTO.setContent(message);
        pointEventService.addEvent(eventDTO);
    }

}

2.3.6、数据库处理定时器

@Component
@Slf4j
public class PointSchedule {

    @Autowired
    private TeventManager teventManager ;

    @Autowired
    private PointEventService pointEventService ;

    @Scheduled(cron = "0 */1 * * * ?")
    public void executeEvent(){
        log.info("定时器执行--------新增积分事件处理");
        TeventQuery teventQuery = new TeventQuery();
        teventQuery.setType(BusinessEnum.EventType.新增积分.code);
        teventQuery.setProcess(BusinessEnum.EventProcess.已发布.code);
        List<Tevent> tevents = teventManager.queryList(teventQuery);
        if (!EmptyUtil.isEmpty(tevents)) {
            System.out.println("定时器执行--------新增积分记录总数为" + tevents.size());
            List<EventDTO> eventDTOS = tevents.stream().map(item -> {
                EventDTO eventDTO = new EventDTO();
                eventDTO.setEventId(item.getId());
                eventDTO.setType(item.getType());
                eventDTO.setProcess(item.getProcess());
                eventDTO.setContent(item.getContent());
                return eventDTO;
            }).collect(Collectors.toList());

            for(EventDTO eventDTO : eventDTOS){
                try {
                    pointEventService.executeAddPointEvent(eventDTO);
                }catch (Exception e){
                    log.info("定时器执行-----处理事件为{}发送异常",eventDTO);
                    continue;
                }
                log.info("定时器执行-处理事件:{}处理成功", eventDTO);
            }
        } else {
            log.info("没有新增的用户记录,无需处理");
        }
    }

}

2.4、测试吧

2.4.1、启动用户服务

2.4.1.1、调用接口

GET http://localhost:7777/hlj/user/add?name=HealerJean

1.4.1.2、数据库表

1568104577005

1.4.1.2、定时器出发之后

1568104608221

1.4.1.4、控制台

2019-09-10 16:36:05 INFO  -[                                ]- 发送topic消息:{"content":"{\"userId\":1}","eventId":1,"process":"New","type":"New_User"} com.hlj.proj.service.user.UserEventServiceImpl.executeAddUserEvent[55]
2019-09-10 16:36:05 INFO  -[                                ]- 定时器执行-处理事件:{"content":"{\"userId\":1}","eventId":1,"process":"New","type":"New_User"}处理成功 com.hlj.proj.task.UserSchedule.executeEvent[59]
2019-09-10 16:37:00 INFO  -[                                ]- 定时器执行--------新增用户事件处理 com.hlj.proj.task.UserSchedule.executeEvent[36]
2019-09-10 16:37:00 INFO  -[                                ]- 没有新增的用户记录,无需处理 com.hlj.proj.task.UserSchedule.executeEvent[63]
2019-09-10 16:38:00 INFO  -[                                ]- 定时器执行--------新增用户事件处理 com.hlj.proj.task.UserSchedule.executeEvent[36]
2019-09-10 16:38:00 INFO  -[                                ]- 没有新增的用户记录,无需处理 com.hlj.proj.task.UserSchedule.executeEvent[63]

2.4.2、启动积分服务

1.4.2.1、控制台

2019-09-10 16:38:17 INFO  -[                                ]- 接收到topic消息:{"userId":1} com.hlj.proj.listener.PointMessageListenter.listenQueue[35]
2019-09-10 16:39:00 INFO  -[                                ]- 定时器执行--------新增积分事件处理 com.hlj.proj.task.PointSchedule.executeEvent[36]
定时器执行--------新增积分记录总数为1
2019-09-10 16:39:00 INFO  -[                                ]- 定时器执行-处理事件:{"content":"{\"userId\":1}","eventId":1,"process":"Publish","type":"New_Point"}处理成功 com.hlj.proj.task.PointSchedule.executeEvent[59]


1.4.2.2、观察数据库

1568104770045

1568104788269

2.5、总结

这里主要是用到了定时器和消息队列,这种情况使用,肯定是传递的积分没问题,而且一定能够顺利执行的数据

2、XA事务: 两阶段提交2PC

对数据库分布式事务有了解的同学一定知道数据库支持的2PC,又叫做 XA Transactions

XA 是一个两阶段提交协议,该协议分为以下两个阶段:

第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是否可以提交.

第二阶段:事务协调器要求每个数据库提交数据。

1585734802011

其中,如果有任何一个数据库否决此次提交,那么所有数据库都会被要求回滚它们在此事务中的那部分信息。这样做的缺陷是什么呢?

首先需要了解一个定理 :CAP定理

分布式有一个定理:CAP原则又称CAP定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)。CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾,到底要什么根据情况看

CAP理论就是说在分布式存储系统中,最多只能实现上面的两点。而由于网络硬件肯定会出现延迟丢包等问题,所以分区容错性是我们必须需要实现的。所以我们只能在一致性和可用性之间进行权衡

一致性和可用性,为什么不可能同时成立?答案很简单,因为可能通信失败(即出现分区容错)。

咋看之下我们可以在数据库分区之间获得一致性。但是仔细想想,如果数据库都特别多,这种方案就是牺牲了一定的可用性换取一致性 ,如果说系统的可用性代表的是执行某项操作相关所有组件的可用性的和。那么在两阶段提交的过程中,可用性就代表了涉及到的每一个数据库中可用性的和

我们假设两阶段提交的过程中每一个数据库都具有99.9%的可用性,那么如果两阶段提交涉及到两个数据库,这个结果就是99.8%。根据系统可用性计算公式,假设每个月43200分钟,99.9%的可用性就是43157分钟, 99.8%的可用性就是43114分钟,相当于每个月的宕机时间增加了43分钟。

在分布式系统中,我们往往追求的是可用性,它的重要性比一致性要高(不一定哦,我的是金融,必须一致性高),那么如何实现高可用性呢?前人已经给我们提出来了另外一个理论,就是BASE理论,具体看下面的Base

2.1、特性

1、支持数据分片后的跨库XA事务

2、两阶段提交保证操作的原子性和数据的强一致性

3、服务宕机重启后,提交/回滚中的事务可自动恢复

4、SPI机制整合主流的XA事务管理器,默认Atomikos,可以选择使用Narayana和Bitronix

5、同时支持XA和非XA的连接池

6、提供spring-boot和namespace的接入端

优点:实现比较简单,尽量保证了数据的强一致,适合对数据强一致要求很高的关键领域,比如我们金融业务。(其实也不能100%保证强一致)

缺点: 牺牲了可用性,对性能影响较大,不适合高并发高性能场景,如果分布式系统跨接口调用,在事务执行过程中,所有的资源都是被锁定的,这种情况只适合执行时间确定的短事务。 而且因为2PC的协议成本比较高,又有全局锁的问题,性能会比较差。 现在大家基本上不会采用这种强一致解决方案。

2.2、Apache分库分表

2.2.1、配置XA:2阶段事务

<!-- 分表分库 ShardingShpere -->
<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>sharding-jdbc-spring-boot-starter</artifactId>
    <version>4.0.0-RC2</version>
</dependency>

<!--XA事务必须配置如下,否则如下报错-->
<!--Caused by: java.lang.NullPointerException: Cannot find transaction manager of [XA]-->
<dependency>
    <groupId>org.apache.shardingsphere</groupId>
    <artifactId>sharding-transaction-xa-core</artifactId>
    <version>4.0.0-RC2</version>
</dependency>

配置方法(配合@Transactional注解使用)

方式一:注解

@ShardingTransactionType(value = TransactionType.XA)

非常抱歉的是,我使用注解没有成功,所以我选择了第二种方式,在进入这个事务方法的时候,用代码控制

@Transactional(rollbackFor = Exception.class)
@ShardingTransactionType(value = TransactionType.XA)
@Override
public void dbTransactional(UserDTO userDTO, CompanyDTO companyDTO) {
    System.out.println("----------------开始进入事务");
    userService.insert(userDTO);
    companyService.insert(companyDTO);
}

方式二:Java代码

当然可以自己自定义一个注解,用来实现下面的

TransactionTypeHolder.set(TransactionType.XA);

2.2.1.1、正常流程

@Transactional(rollbackFor = Exception.class)
@ShardingTransactionType(value = TransactionType.XA)
@Override
public void dbTransactional(UserDTO userDTO, CompanyDTO companyDTO) {
    System.out.println("----------------开始进入事务");
    userService.insert(userDTO);
    companyService.insert(companyDTO);
}

事务方法,刚进入开启事务

public abstract class AbstractConnectionAdapter 
    extends AbstractUnsupportedOperationConnection {

    public final void setAutoCommit(final boolean autoCommit) throws SQLException {
        this.autoCommit = autoCommit;
        if (TransactionType.LOCAL == transactionType || isOnlyLocalTransactionValid()) {
            setAutoCommitForLocalTransaction(autoCommit);
        } else if (!autoCommit) {
            shardingTransactionManager.begin();//事务管理器开始
        }
    }
}

public final class XAShardingTransactionManager implements ShardingTransactionManager {
    @SneakyThrows
    @Override
    public void begin() {
        xaTransactionManager.getTransactionManager().begin();
    }
}

事务方法结束的时候,事务管理器提交事务,清除XAResource

public final class XAShardingTransactionManager implements ShardingTransactionManager {

    @SneakyThrows
    @Override
    public void commit() {
        try {
            xaTransactionManager.getTransactionManager().commit();//事务管理器提交,实现类为AtomikosTransactionManager
        } finally {
            enlistedXAResource.remove();
        }
    }
}
public final class AtomikosTransactionManager implements XATransactionManager {

}

2.2.1.2、异常流程

@Transactional(rollbackFor = Exception.class)
@Override
public void dbTransactional(UserDTO userDTO, CompanyDTO companyDTO) {
    System.out.println("----------------开始进入事务");
    userService.insert(userDTO);
    companyService.insert(companyDTO);
    int i = 1 / 0;
}

事务开启和上面正常流程一样,如果发了异常情况,就会回滚,具体执行操作,看下文

public final class ShardingConnection extends AbstractConnectionAdapter {
    
    @Override
    public void rollback() throws SQLException {
        if (TransactionType.LOCAL == transactionType) {
            super.rollback();
        } else {
            shardingTransactionManager.rollback();
        }
    }
   
}

    public void rollback() {
        try {
            try {
                this.xaTransactionManager.getTransactionManager().rollback();
            } finally {
                this.enlistedXAResource.remove();
            }

        } catch (Throwable var5) {
            throw var5;
        }
    }

2.2.2、原理分析

1、Begin(开启XA全局事务)

通常收到接入端的set autoCommit=0时,XAShardingTransactionManager会调用具体的XA事务管理器开启XA的全局事务,通常以XID的形式进行标记。

2、执行物理SQL

ShardingSphere进行解析/优化/路由后,会生成逻辑SQL的分片SQLUnit,执行引擎为每个物理SQL创建连接的同时,物理连接所对应的XAResource也会被注册到当前XA事务中,事务管理器会在此阶段发送XAResource.start命令给数据库,数据库在收到XAResource.end命令(个人可以理解为连接试探)之前的所有SQL操作,会被标记为XA事务

XAResource1.start             ## Enlist阶段执行
statement.execute("sql1");    ## 模拟执行一个分片SQL1
statement.execute("sql2");    ## 模拟执行一个分片SQL2
XAResource1.end    

这里sql1和sql2将会被标记为XA事务

3、Commit/rollback(提交XA事务

XAShardingTransactionManager收到接入端的提交命令后,会委托实际的XA事务管理进行提交动作,这时事务管理器会收集当前线程里所有注册的XAResource,首先发送XAResource.end指令,

用以标记此XA事务的边界。 接着会依次发送prepare指令,收集所有参与XAResource投票,如果所有XAResource的反馈结果都是OK,则会再次调用commit指令进行最终提交,

如果有一个XAResource的反馈结果为No,则会调用rollback指令进行回滚。 在事务管理器发出提交指令后,任何`XAResource`产生的异常都会通过recovery日志进行重试,来保证提交阶段的操作原子性,和数据强一致性。

XAResource1.prepare           ## ack: yes
XAResource2.prepare           ## ack: yes
XAResource1.commit
XAResource2.commit

     
XAResource1.prepare           ## ack: yes
XAResource2.prepare           ## ack: no
XAResource1.rollback
XAResource2.rollback

ShardingSphere默认的XA事务管理器为Atomikos,在项目的logs目录中会生成xa_tx.log, 这是XA崩溃恢复时所需的日志,请勿删除。

3、TCC分布式事务(补偿事务)

国内开源的 ByteTCC、Himly、TCC-transaction。 TCC 分布式事务方案来保证各个接口的调用,要么一起成功,要么一起回滚,是比较合适的。

TCC属于应用层的一种补偿方式,所以需要程序员在实现的时候多写很多补偿的代码,在一些场景中,一些业务流程可能用TCC不太好定义及处理。

3.1、场景

咱们先来看看业务场景,假设你现在有一个电商系统,里面有一个支付订单的场景。那对一个订单支付之后,我们需要做下面的步骤:

1、更改订单的状态为“已支付”

2、扣减商品库存

3、给会员增加积分

4、创建销售出库单通知仓库发货

image-20201208150226136

业务场景有了,现在我们要更进一步,实现一个 TCC 分布式事务的效果。

什么意思呢?也就是说,[1] 订单服务-修改订单状态,[2] 库存服务-扣减库存,[3] 积分服务-增加积分,[4] 仓储服务-创建销售出库单。

上述这几个步骤,要么一起成功,要么一起失败,必须是一个整体性的事务。

举个例子,现在订单的状态都修改为“已支付”了,结果库存服务扣减库存失败。那个商品的库存原来是 100 件,现在卖掉了 2 件,本来应该是 98 件了。结果呢?由于库存服务操作数据库异常,导致库存数量还是 100。这不是在坑人么,当然不能允许这种情况发生了!

image-20201208150410406

上面那几个步骤,要么全部成功,如果任何一个服务的操作失败了,就全部一起回滚,撤销已经完成的操作。比如说库存服务要是扣减库存失败了,那么订单服务就得撤销那个修改订单状态的操作,然后得停止执行增加积分和通知出库两个操作

image-20201208150449255

3.2、实现TCC

3.2.1、TCC 实现阶段一:Try

订单服务那儿,它的代码大致来说应该是这样子的

public class OrderService {

    // 库存服务
    @Autowired
    private InventoryService inventoryService;

    // 积分服务
    @Autowired
    private CreditService creditService;

    // 仓储服务
    @Autowired
    private WmsService wmsService;

    // 对这个订单完成支付
    public void pay(){
        //对本地的的订单数据库修改订单状态为"已支付"
        orderDAO.updateStatus(OrderStatus.PAYED);

        //调用库存服务扣减库存
        inventoryService.reduceStock();

        //调用积分服务增加积分
        creditService.addCredit();

        //调用仓储服务通知发货
        wmsService.saleDelivery();
    }
} 

1、首先,上面那个订单服务先把自己的状态修改为:OrderStatus.UPDATING。这是啥意思呢?也就是说,在 pay() 那个方法里,你别直接把订单状态修改为已支付啊!你先把订单状态修改为 UPDATING,也就是修改中的意思。 这个状态是个没有任何含义的这么一个状态,代表有人正在修改这个状态罢了

2、然后呢,库存服务直接提供的那个 reduceStock() 接口里,也别直接扣减库存啊,你可以是冻结掉库存。举个例子,本来你的库存数量是 100,你别直接 100 - 2 = 98,扣减这个库存!你可以把可销售的库存:100 - 2 = 98,设置为 98 没问题,然后在一个单独的冻结库存的字段里,设置一个 2。也就是说,有 2 个库存是给冻结了

3、积分服务的 addCredit() 接口也是同理,别直接给用户增加会员积分。你可以先在积分表里的一个预增加积分字段加入积分。比如:用户积分原本是 1190,现在要增加 10 个积分,别直接 1190 + 10 = 1200 个积分啊!你可以保持积分为 1190 不变,在一个预增加字段里,比如说 prepare_add_credit 字段,设置一个 10,表示有 10 个积分准备增加

4、仓储服务的 saleDelivery() 接口也是同理啊,你可以先创建一个销售出库单,但是这个销售出库单的状态是“UNKNOWN”。也就是说,刚刚创建这个销售出库单,此时还不确定它的状态是什么呢!

总结上述过程,如果你要实现一个 TCC 分布式事务,首先你的业务的主流程以及各个接口提供的业务含义,不是说直接完成那个业务操作,而是完成一个 Try 的操作。其实就是所谓的 TCC 分布式事务中的第一个 T 字母代表的阶段,也就是 Try 阶段

这个操作,一般都是锁定某个资源,设置一个预备类的状态,冻结部分数据,等等,大概都是这类操作

2.2.2、TCC 实现阶段二:Confirm

然后就分成两种情况了,第一种情况是比较理想的,那就是各个服务执行自己的那个 Try 操作,都执行成功了,Bingo!

这个时候,就需要依靠 TCC 分布式事务框架来推动后续的执行了。这里简单提一句,如果你要玩儿 TCC 分布式事务,必须引入一款 TCC 分布式事务框架,比如国内开源的 ByteTCC、Himly、TCC-transaction。

否则的话,感知各个阶段的执行情况以及推进执行下一个阶段的这些事情,不太可能自己手写实现,太复杂了。如果你在各个服务里引入了一个 TCC 分布式事务的框架,订单服务里内嵌的那个 TCC 分布式事务框架可以感知到,各个服务的 Try 操作都成功了。

1、为了实现这个阶段,你需要在各个服务里再加入一些代码。比如说,订单服务里,你可以加入一个 Confirm 的逻辑,就是正式把订单的状态设置为“已支付”了,大概是类似下面这样子:

public class OrderServiceConfirm {

    public void pay(){
        orderDao.updateStatus(OrderStatus.PAYED);
    }
}

2、库存服务也是类似的,你可以有一个 InventoryServiceConfirm 类,里面提供一个 reduceStock() 接口的 Confirm 逻辑,这里就是将之前冻结库存字段的 2 个库存扣掉变为 0。这样的话,可销售库存之前就已经变为 98 了,现在冻结的 2 个库存也没了,那就正式完成了库存的扣减。

3、积分服务也是类似的,可以在积分服务里提供一个 CreditServiceConfirm 类,里面有一个 addCredit() 接口的 Confirm 逻辑,就是将预增加字段的 10 个积分扣掉,然后加入实际的会员积分字段中,从 1190 变为 1120。

4、仓储服务也是类似,可以在仓储服务中提供一个 WmsServiceConfirm 类,提供一个 saleDelivery() 接口的 Confirm 逻辑,将销售出库单的状态正式修改为“已创建”,可以供仓储管理人员查看和使用,而不是停留在之前的中间状态“UNKNOWN”了。

好了,上面各种服务的 Confirm 的逻辑都实现好了,一旦订单服务里面的 TCC 分布式事务框架感知到各个服务的 Try 阶段都成功了以后,就会执行各个服务的 Confirm 逻辑。订单服务内的 TCC 事务框架会负责跟其他各个服务内的 TCC 事务框架进行通信,依次调用各个服务的 Confirm 逻辑。然后,正式完成各个服务的所有业务逻辑的执行。,顺着图一起来看看整个过程:

image-20201208151619671

3.2.3、TCC 实现阶段三:Cancel

举个例子:在 Try 阶段,比如积分服务吧,它执行出错了,此时会怎么样?

那订单服务内的 TCC 事务框架是可以感知到的,然后它会决定对整个 TCC 分布式事务进行回滚。也就是说,会执行各个服务的第二个 C 阶段,Cancel 阶段。同样,为了实现这个 Cancel 阶段,各个服务还得加一些代码。

1、首先订单服务,它得提供一个 OrderServiceCancel 的类,在里面有一个 pay() 接口的 Cancel 逻辑,就是可以将订单的状态设置为“CANCELED”,也就是这个订单的状态是已取消。

2、库存服务也是同理,可以提供 reduceStock() 的 Cancel 逻辑,就是将冻结库存扣减掉 2取消,冻结库存变成0。

3、积分服务也需要提供 addCredit() 接口的 Cancel 逻辑,将预增加积分字段的 10 个积分扣减掉,预增变成0。

4、仓储服务也需要提供一个 saleDelivery() 接口的 Cancel 逻辑,将销售出库单的状态修改为“CANCELED”设置为已取消。

然后这个时候,订单服务的 TCC 分布式事务框架只要感知到了任何一个服务的 Try 逻辑失败了,就会跟各个服务内的 TCC 分布式事务框架进行通信,然后调用各个服务的 Cancel 逻辑。

image-20201208152039344

3.3、总结和思考

总结一下,你要玩儿 TCC 分布式事务的话:首先需要选择某种 TCC 分布式事务框架,各个服务里就会有这个 TCC 分布式事务框架在运行。

3.3.1、Try-Confirm-Cancel:

你原本的一个接口,要改造为 3 个逻辑,

1、先是服务调用链路依次执行 Try 逻辑。

2、如果都正常的话,TCC 分布式事务框架推进执行 Confirm 逻辑,完成整个事务。

3、如果某个服务的 Try 逻辑有问题,TCC 分布式事务框架感知到之后就会推进执行各个服务的 Cancel 逻辑,撤销之前执行的各种操作。

3.3.2、核心思想

TCC 分布式事务的核心思想,说白了,就是当遇到下面这些情况时,

1、先来 Try 一下,不要把业务逻辑完成,先试试看,看各个服务能不能基本正常运转,能不能先冻结我需要的资源。如果 Try 都 OK,也就是说,底层的数据库、Redis、Elasticsearch、MQ 都是可以写入数据的,并且你保留好了需要使用的一些资源(比如冻结了一部分库存)。

2、接着,再执行各个服务的 Confirm 逻辑,基本上 Confirm 就可以很大概率保证一个分布式事务的完成了

3、那如果 Try 阶段某个服务就失败了,比如说底层的数据库挂了,或者 Redis 挂了,等等。此时就自动执行各个服务的 Cancel 逻辑,把之前的 Try 逻辑都回滚,所有服务都不要执行任何设计的业务逻辑。保证大家要么一起成功,要么一起失败。

1、某个服务的数据库宕机了。

2、某个服务自己挂了。

3、那个服务的 Redis、Elasticsearch、MQ 等基础设施故障了。

4、某些资源不足了,比如说库存不够这些。

问题1:等一等,你有没有想到一个问题?如果有一些意外的情况发生了,比如说订单服务突然挂了,然后再次重启,TCC 分布式事务框架是如何保证之前没执行完的分布式事务继续执行的呢?

答:TCC 事务框架都是要记录一些分布式事务的活动日志的,可以在磁盘上的日志文件里记录,也可以在数据库里记录。保存下来分布式事务运行的各个阶段和状态

问题2:万一某个服务的 Cancel 或者 Confirm 逻辑执行一直失败怎么办呢?

答:那也很简单,TCC 事务框架会通过活动日志记录各个服务的状态。举个例子,比如发现某个服务的 Cancel 或者 Confirm 一直没成功,会不停的重试调用它的 Cancel 或者 Confirm 逻辑,务必要它成功!

最后,再给大家来一张图,来看看给我们的业务,加上分布式事务之后的整个执行流程:

image-20201208152718745

4、可靠消息最终一致性方案

ContactAuthor