发布时间:2023-02-20 文章分类:编程知识 投稿人:李佳 字号: 默认 | | 超大 打印

分布式事务

一、分布式事务基础

什么是事务?

事务指的就是一个操作单元,在这个操作单元中的所有操作最终要保持一致的行为,要么所有操作都成功,要么所有的操作都被撤销。简单地说,事务提供一种“要么什么都不做,要么做全套”机制

本地事物

本地事物其实可以认为是数据库提供的事务机制。说到数据库事务就不得不说,数据库事务中的四

大特性:

数据库事务在实现时会将一次事务涉及的所有操作全部纳入到一个不可分割的执行单元,该执行单元中的所有操作要么都成功,要么都失败,只要其中任一操作执行失败,都将导致整个事务的回滚

分布式事务

分布式事务指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。

简单的说,就是一次大的操作由不同的小操作组成,这些小的操作分布在不同的服务器上,且属于不同的应用,分布式事务需要保证这些小操作要么全部成功,要么全部失败。

本质上来说,分布式事务就是为了保证不同数据库的数据一致性。

分布式事务的场景

一个服务需要调用多个数据库实例完成数据的增删改操作

分布式事务

多个服务需要调用一个数据库实例完成数据的增删改操作
分布式事务

多个服务需要调用一个数据库实例完成数据的增删改操作

分布式事务

二、分布式事务解决方案

全局事务

全局事务基于DTP模型实现。DTP是由X/Open组织提出的一种分布式事务模型——X/Open Distributed Transaction Processing Reference Model。它规定了要实现分布式事务,需要三种⻆色:

整个事务分成两个阶段:

分布式事务

优点

缺点

可靠消息服务

基于可靠消息服务的方案是通过消息中间件保证上、下游应用数据操作的一致性。假设有A和B两个系统,分别可以处理任务A和任务B。此时存在一个业务流程,需要将任务A和任务B在同一个事务中处理。就可以使用消息中间件来实现这种分布式事务。

RocketMQ事务消息流程图

分布式事务

1)事务消息发送及提交

(1) 发送消息(half消息)
(2) 服务端响应消息写入结果
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可⻅,本地逻辑不执行)
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生产消息索引,消息对消费者可⻅)

2) 事务补偿

(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对于的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用户解决消息Commit或者Rollback发生超时或者失效的情况

3) 事务消息状态

事务消息共有三种状态,提交状态,回查状态,中间状态:

消息生产者实现

发送代码如下:

Message<OperateIntergralVo> message =
MessageBuilder.withPayload(vo).setHeader("orderNo",orderNo).build();
TransactionSendResult sendResult =
rocketMQTemplate.sendMessageInTransaction("tx_group", "tx_topic", message,
orderNo);
String sendStatus = sendResult.getSendStatus().name();
String localTXState = sendResult.getLocalTransactionState().name();
og.info(">>>> send status={},localTransactionState={}
<<<<",sendStatus,localTXState);
if(sendResult.getLocalTransactionState().equals(LocalTransactionState.COMMIT_ME
SSAGE)){
return "退款成功";
}else{
return "退款失败";
}

创建事务消息生产者端的消息监听器,注意是生产者,不是消费者,我们需要实现的是RocketMQLocalTransactionListener接口,代码如下:

@RocketMQTransactionListener(txProducerGroup = "tx_group")
@Slf4j
public class OrderTXMsgListener implements RocketMQLocalTransactionListener {
@Autowired
private IOrderInfoService orderInfoService;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg,
Object arg) {
log.info("执行本地事务");
RocketMQLocalTransactionState result =
RocketMQLocalTransactionState.COMMIT;
try {
String orderNo = (String) arg;
orderInfoService.changeOrderStatusToRefund(orderNo);
} catch (Exception e) {
result = RocketMQLocalTransactionState.ROLLBACK;
}
return result;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String orderNo = (String) msg.getHeaders().get("orderNo");
if(!StringUtils.isEmpty(orderNo)){
OrderInfo orderInfo = orderInfoService.getOrderStatus(orderNo);
if(OrderInfo.STATUS_REFUND.equals(orderInfo.getStatus())){
return RocketMQLocalTransactionState.COMMIT;
}
}

TCC事务

TCC即为Try Confifirm Cancel,它属于补偿型分布式事务。TCC实现分布式事务一共有三个步骤:

这个过程并未执行业务,只是完成所有业务的一致性检查,并预留好执行所需的全部资源

确认执行业务操作,不做任何业务检查, 只使用Try阶段预留的业务资源。通常情况下,采用TCC则认为 Confifirm阶段是不会出错的。即:只要Try成功,Confifirm一定成功。若Confifirm阶段真的出错了,需引入重试机制或人工处理。

取消Try阶段预留的业务资源。通常情况下,采用TCC则认为Cancel阶段也是一定成功的。若Cancel阶段真的出错了,需引入重试机制或人工处理。

分布式事务

return RocketMQLocalTransactionState.ROLLBACK;
}
}

TCC两阶段提交与XA两阶段提交的区别是:

TCC事务的优缺点:

三、Seata分布式事务解决方案

2019 年 1 月,阿里巴巴中间件团队发起了开源项目 Fescar (Fast & EaSy Commit And Rollback),其愿景是让分布式事务的使用像本地事务的使用一样,简单和高效,并逐步解决开发者们遇到的分布式事务方面的所有难题。后来更名为 Seata ,意为:Simple Extensible Autonomous Transaction Architecture,是一套分布式事务解决方案。
Seata的设计目标是对业务无侵入,因此从业务无侵入的2PC方案着手,在传统2PC的基础上演进。
它把一个分布式事务理解成一个包含了若干分支事务的全局事务。全局事务的职责是协调其下管辖的分
支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个关系数据
库的本地事务。

3.1 Seata-At模式

Seata主要由三个重要组件组成:

分布式事务

Seata-AT模式的执行流程如下:

  1. A服务的TM向TC申请开启一个全局事务,TC就会创建一个全局事务并返回一个唯一的XID

  2. A服务的RM向TC注册分支事务,并及其纳入XID对应全局事务的管辖

  3. A服务执行分支事务,向数据库做操作4. A服务开始远程调用B服务,此时XID会在微服务的调用链上传播

  4. B服务的RM向TC注册分支事务,并将其纳入XID对应的全局事务的管辖

  5. B服务执行分支事务,向数据库做操作

  6. 全局事务调用链处理完毕,TM根据有无异常向TC发起全局事务的提交或者回滚

  7. TC协调其管辖之下的所有分支事务, 决定是否回滚

Seata-AT模式实现2PC与传统2PC的差别

  1. 架构层次方面,传统2PC方案的 RM 实际上是在数据库层,RM本质上就是数据库自身,通过XA协议实现,而 Seata的RM是以jar包的形式作为中间件层部署在应用程序这一侧的。

  2. 两阶段提交方面,传统2PC无论第二阶段的决议是commit还是rollback,事务性资源的锁都要保持到Phase2完成才释放。而Seata的做法是在Phase1 就将本地事务提交,这样就可以省去Phase持锁的时间,整体提高效率。

3.2 秒杀项目集成Seata

启动Seata-server

1.上传,将seata-server-1.3.0.zip上传到/usr/local/software目录下

2.解压文件到指定目录

unzip /usr/local/software/seata-server-1.3.0.zip -d /usr/local

3.修改日志配置文件,否则启动控制台乱码(如果是window的情况需要修改如下配置)

vi /usr/local/seata/conf/logback.xml

原配置如下:

<property name="CONSOLE_LOG_PATTERN" value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(%5p) %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n%wEx"/>

修改成如下格式:

<property name="CONSOLE_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS} %5p --- %[%15.15t] %-40.40logger{39} : %m%n%wEx"/>

此问题是因为开发者为seata1.3.0添加字体颜色,而在window中的shell脚本内不显示发生的乱码错误

4.修改registry.config文件

vi /usr/local/seata/conf/registry.conf

修改内容如下:[注意需要把下面nacos的IP地址修改成实际地址]

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"
  nacos {
    application = "seata-server"
    serverAddr = "nacos的IP地址:8848"
    group = "SEATA_GROUP"
    namespace = ""
    cluster = "default"
    username = ""
    password = ""
  }
}
config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "nacos"
  nacos {
    serverAddr = "nacos的IP地址:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
  }
}

5.启动seata-server

nohup /usr/local/seata/bin/seata-server.sh -h 目前所在服务器ip地址 -p 7000 >log.out 2>1 &

项目集成seata配置

1.启动seata-server,详情请看部署文档

2.在项目中添加依赖

 <dependency>
	<groupId>com.alibaba.cloud</groupId>
	<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
	<version>2.2.2.RELEASE</version>
	<exclusions>
		<exclusion>
            <groupId>io.seata</groupId>
			<artifactId>seata-spring-boot-starter</artifactId>
		</exclusion>
	</exclusions>
</dependency>
<dependency>
	<groupId>io.seata</groupId>
	<artifactId>seata-spring-boot-starter</artifactId>
	<version>1.3.0</version>
</dependency>

在配置文件中添加如下配置

seata:
  tx-service-group: seckill-service-group
  registry:
    type: nacos
    nacos:
      server-addr: ${spring.cloud.nacos.config.server-addr}
      group: SEATA_GROUP
      application: seata-server
  service:
    vgroup-mapping:
      seckill-service-group: default

AT模式代码实现

分布式事务发起方只需要贴@GlobalTransactional注解即可

分支分布式事务贴上@Transactional即可

3.3 Seata-TCC深度解析

TCC模型图

分布式事务

模型设计

业务场景

1.账户支付,用户账户金额减少

2.账户退款,用户账户金额增加

表设计

CREATE TABLE `user_account` (
`user_id` varchar( 100 ) NOT NULL COMMENT '用户UID',
`gmt_create` datetime NOT NULL COMMENT '创建时间',
`gmt_modified` datetime NOT NULL COMMENT '修改时间',
`amount` bigint( 20 ) NOT NULL COMMENT '用户余额',
PRIMARY KEY (`user_id`),
KEY `idx_gmt_create` (`gmt_create`),
KEY `idx_gmt_modified` (`gmt_modified`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
扣钱业务逻辑

场景: 账户A上有 100 元,要扣除其中的 30 元

Try: 检查余额,扣除其中 30 元;
分布式事务

Confirm: 空提交

分布式事务

Cancel: 返还扣除的 30 元

分布式事务

加钱业务逻辑

Try: 空操作;

分布式事务

Confirm: 增加可用金额 30 元

分布式事务

Cancel: 空操作

分布式事务

业务模型总结

分布式事务

并发控制

账户A上有 100 元,事务T1要扣除其中 30 元,事务T2也要扣除 30 元,出现并发

Try: 检查余额,扣除其中 30 元

分布式事务

T2 Confirm: 空提交

分布式事务

T1 Cancel: 释放T1预留的 30 元

分布式事务

业务模型优化

表增加冻结金额字段

CREATE TABLE `user_account` (
`user_id` varchar( 100 ) NOT NULL COMMENT '用户UID',
`gmt_create` datetime NOT NULL COMMENT '创建时间',
`gmt_modified` datetime NOT NULL COMMENT '修改时间',
`amount` bigint( 20 ) NOT NULL COMMENT '用户余额',
`freezed_amount` bigint( 20 ) unsigned DEFAULT '0',
PRIMARY KEY (`user_id`),
KEY `idx_gmt_create` (`gmt_create`),
KEY `idx_gmt_modified` (`gmt_modified`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

扣钱为例: 账户上有 100 元,要扣除其中 30 元(此时里面的可用金额=amount-freezed_amount)
Try: 检查余额,扣除其中 30 元(freezed_amount=freezed_amount+30)

分布式事务

Confirm: 扣除 30 元( amount=amount-30 freezed_amount=freezed_amount-30)

分布式事务

Cancel: 释放预留的 30 元(freezed_amount=freezed_amount-30)

分布式事务

加钱为例: 账户上有 100 元,要加 30 元(此时里面的可用金额=amount-freezed_amount)
Try: 空操作

分布式事务

Confirm: 增加 30 元( amount=amount+30)

分布式事务

Cancel: 空操作

分布式事务

异常处理

空回滚

Try方法为执行,Cancel执行了

出现原因:

  1. Try超时
  2. 分布式事务回滚,触发Cancel
  3. 未收到Try,收到Cancel

解决方案: Cancel方法需要识别出是否执行Try方法,如果执行了就正常执行Cancel,如果没有就直接结束增加事务日志表来实现这个功能.

CREATE TABLE `account_transaction` (
`tx_id` varchar( 100 ) NOT NULL COMMENT '事务Txid',
`action_id` varchar( 100 ) NOT NULL COMMENT '分支事务id',
`gmt_create` datetime NOT NULL COMMENT '创建时间',
`gmt_modified` datetime NOT NULL COMMENT '修改时间',
`user_id` varchar( 100 ) NOT NULL COMMENT '账户Uid',
`amount` varchar( 100 ) NOT NULL COMMENT '变动金额',
`type` varchar( 100 ) NOT NULL DEFAULT '' COMMENT '变动类型',
PRIMARY KEY (`tx_id`,`action_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
幂等

多次调用二阶段方法

出现原因:

解决方案: 做幂等性处理

CREATE TABLE `account_transaction` (
`tx_id` varchar( 100 ) NOT NULL COMMENT '事务Txid',
`action_id` varchar( 100 ) NOT NULL COMMENT '分支事务id',
`gmt_create` datetime NOT NULL COMMENT '创建时间',
`gmt_modified` datetime NOT NULL COMMENT '修改时间',
`user_id` varchar( 100 ) NOT NULL COMMENT '账户Uid',
`amount` varchar( 100 ) NOT NULL COMMENT '变动金额',
`type` varchar( 100 ) NOT NULL DEFAULT '' COMMENT '变动类型',
`state` smallint( 4 ) NOT NULL COMMENT '状态: 1.初始化 2.已提交 3.已回滚',
PRIMARY KEY (`tx_id`,`action_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
防悬挂

Cancel比Try先执行

出现原因:

  1. Try超时(拥堵)
  2. 分布式事务回滚触发Cancel
  3. 拥堵的Try到达

要允许空回滚,但是要拒绝空回滚之后的Try方法.

解决方案: 在Try方法中, 如果根据全局事务ID能查询出数据出来,说明在try方法之前执行了空回滚,此时
就不能进行try方法。否则就正常执行try方法.

异常处理流程图

Try方法

分布式事务

Comfirm方法

分布式事务

Cancel方法

分布式事务

TCC模式代码实现

分布式事务发起方只需要贴@GlobalTransactional注解即可

分支事务需要完成下面步骤:

1.在接口上贴上@LocalTCC和@TwoPhaseBusinessAction注解,具体配置如下:

@LocalTCC
public interface IUsableIntegralService {
/**
* 增加积分
*/
@TwoPhaseBusinessAction(name = "incrIntergralTry", commitMethod =
"incrIntergralCommit", rollbackMethod = "incrIntergralRollback")
void incrIntergralTry(@BusinessActionContextParameter(paramName =
"operateIntergralVo") OperateIntergralVo operateIntergralVo,
BusinessActionContext context);
void incrIntergralCommit(BusinessActionContext context);
void incrIntergralRollback(BusinessActionContext context);
}

2.添加实现类,实现try,confirm,cancel方法逻辑即可