背景
最近发现一个有趣的命题,多线程如何保证事务的一致性,简单的说,如何保证多线程情况下,一个线程操作入库逻辑执行失败,其他线程能感知,一起回滚,一起成功
个人看法
再此之前,我是觉得不太现实,为什么呢?
我们先从理论上去推理一下。
我们都知道事务的特性是什么?
这个不难吧?八股文必背内容之一,ACID 必须张口就来:
- 原子性(Atomicity)
- 一致性(Consistency)
- 隔离性(Isolation)
- 持久性(Durability)
那么问题又来了,你觉得如果有多线程事务,那么我们破坏了哪个特性?
多线程事务你也别想的多深奥,你就想,两个不同的用户各自发起了一个下单请求,这个请求对应的后台实现逻辑中是有事务存在的。
这不就是多线程事务吗?
这种场景下你没有想过怎么分别去控制两个用户的事务操作吧?
因为这两个操作之间就是完全隔离的,各自拿着各自的链接玩儿。
所以多个事务之间的最基本的原则是什么?
隔离性 。两个事务操作之间不应该相互干扰。
而多线程事务想要实现的是 A 线程异常了。A,B 线程的事务一起回滚。
事务的特性里面就卡的死死的。所以,多线程事务从理论上就是行不通的。
通过理论指导实践,那么多线程事务的代码也就是写不出来的。
Spring 事务保证
前面说到隔离性。那么请问,Spring 的源码里面,对于事务的隔离性是如何保证的呢?
答案就是 ThreadLocal。(刚好最近手写过 Spring TX 源码,有兴趣看看那篇博客)
在事务开启的时候,把当前的链接保存在了 ThreadLocal 里面,从而保证了多线程之间的隔离性:
简单来说就是每个线程都有个 ThreadLocal 上下文,如果是其他线程是无法感知另外线程的 ThreadLocal,本质上 Spring 事务都是再同一个 connection 维持的,也就是再同一个事务,Spring 事务才能保证原子性,所以问题来了,如果我们是多线程,那么 Spring 声明式事务就无法处理了
可以看到,这个 resource 对象是一个 ThreadLocal 对象。
在下面这个方法中进行了赋值操作:
org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin
既然声明式事务无法实现,那如果编程式事务呢,哎,细想下发现有点搞头
常规背景
假设有这么一个场景,需要上传一个非常大的 Excel 文件,需要解析里面的数据,数据量涉及很大,需要保证,全部插入入库,如果有个数据失败,其他的一起失败
首先我们都会想到单线程同步
单线程循环插入
对于业务系统而言,这 些 条数据,必须全部落库,差一条都不行。要么就是一条都不插入。
在这个过程中,不会去调用其他的外部接口,也不会有其他的流程去操作这个表的数据。
既然说到一条不差了,那么对于大家直观而言,想到的肯定是两个解决方案:
- for 循环中一条条的事务插入。
- 直接一条语句批量插入。
对于这种需求,开启事务,然后在 for 循环中一条条的插入可以说是非常 low 的解决方案了。
单线程批量插入
通过用批量插入,每次插入 5000 条数据获取一次 io 连接插入,发现性能提升很大
为什么批量插入能有这么大的飞跃呢?
你想啊,之前 for 循环插入,虽然 SpringBoot 2.0 默认使用了 HikariPool,连接池里面默认给你搞 10 个连接。
但是你只需要一个连接,开启一次事务。这个不耗时。
耗时的地方是你 5000 次 IO 呀。
所以,耗时长是必然的。
而批量插入只是一条 SQL 语句,所以只需要一个连接,还不需要开启事务。
既然这样,那我直接批量一次就发送这些数据,不分批发送会怎么样,想到就干,
最后发现出现异常,说包传输太大,需要更改 mysq max_allowed_packet
配置
通过这指令查询
select @@max_allowed_packet;
知道什么问题了,直接改 测试
set global max_allowed_packet = xxxxxx
运行发现成功执行入库,因为再一个线程事务内,同时入库,需求完成
但是想想还能不能更快呢
如果我们用多个线程一起分批去处理呢,应该会更快,但是多线程无法保证事务
这么一想就是多线程事务问题,既然如此 Spring 的 @Transactional 就无法实现,只能通过编程式事务
由自己控制每个线程的事务,失败就记录回滚,
多线程插入
伪代码:
AtomicBoolean allSuccess = new AtomicBoolean(true);
try (ExecutorService executorService = Executors.newFixedThreadPool(5)) {
List<String> dataList = parseData();
List<CompletableFuture<Map<Boolean, Runnable>>> completableFutureList = dataList.stream()
.map(data -> CompletableFuture.supplyAsync(() -> new Task().processData(data,allSuccess), executorService)).
collect(Collectors.toList());
CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0]))
.thenAccept(arg -> {
Stream<Map<Boolean, Runnable>> stream = completableFutureList.stream()
.map(CompletableFuture::join);// 获取每个 CompletableFuture 的结果
if (allSuccess.get()) {
stream.forEach(map -> map.get(Boolean.TRUE).run());
} else {
stream.forEach(map -> map.get(Boolean.FALSE).run());
}
}).join();
executorService.shutdown();
}
static class Task {
private PlatformTransactionManager platformTransactionManager;
private Map<Boolean, Runnable> processData(String data,AtomicBoolean atomicBoolean) {
TransactionStatus status = platformTransactionManager.getTransaction(new DefaultTransactionDefinition());
Map<Boolean, Runnable> result = Maps.newHashMap();
try {
//todo 保存数据入数据库
} catch (Exception e) {
//不能通过atomicBoolean 判断为true,就直接提交事务,
// 可能存在线程通信延迟的情况,就是说,另外的线程还没运行完成,当前线程看到为true,就直接提交事务了
//除非收集需要回滚的数据,后续再删除数据,这里直接才有会lambda回调处理
atomicBoolean.compareAndSet(true,false);
// todo 打印日志
} finally {
result.put(true, () -> platformTransactionManager.commit(status));
result.put(false, () -> platformTransactionManager.rollback(status));
}
return result;
}
}
这里的思路就是,通过 CompletableFutureure 线程对象分批去执行数据,维护一个变量 AtomicBoolean,异步线程共享这个变量,只要有一个线程失败,将其 cas 设置为 false 标记位,但不会立马将自己的事务提交或者回滚,而是有另外的独立的线程去协调回滚或者提交事务,哎,是不是很熟悉,这不就是很像两阶段提交(2PC)的一致性协议。
两阶段提交协议
两阶段提交协议,简称 2PC(2 Prepare Commit),是比较常用的解决分布式事务问题的方式,要么所有参与进程都提交事务,要么都取消事务,即实现 ACID 中的原子性(A)的常用手段。
缺陷
通过代码和流程图能了解到,这个协议的缺陷也很明显,就是长时间会占用事务,长时间事务未释放,或者频繁回滚或者提交,最主要式最后无法保证所有事务都提交成功,什么意思呢?
就是说,最后协调者线程收集到所有子线程 Feature 返回的 lambda 事务方法,最后一起提交事务,有可能会出现有些事务提交成功,但是中途宕机或者与数据库出现网络波动,连接数过大等等情况,导致后续的事务就无法提交,也不能完整保证一致性
结论
多线程事务换个角度想,可以理解为分布式事务。可以借助这个案例去了解分布式事务。但是解决分布式事务的最好的方法就是:不要有分布式事务!
而解决分布式事务的绝大部分落地方案都是:最终一致性
一般主流就是通过消息中间件 mq 来实现
消息队列(Message Queue, MQ)是实现分布式系统数据最终一致性的常用手段。它们能够解耦系统组件、提高系统的响应性和可扩展性。在分布式事务处理中,因为直接保证强一致性可能导致性能瓶颈,所以很多系统采用最终一致性模型。
最终一致性的含义是,在没有新的更新请求的情况下,数据的复制品最终都将达到一致的状态。消息队列在这个过程中起到中间人的作用,确保即使某些操作暂时失败,也能够重新尝试,直到所有系统中的数据达成一致。
实现数据最终一致性的一般步骤如下:
- 操作日志记录:
当一个操作在主系统(比如订单服务)上执行时,该操作及其结果被记录到操作日志中。 - 发送消息:
主系统执行完操作并记录操作日志后,将一个消息发送到消息队列。这个消息包含了足够的信息以允许其他系统(如库存服务、支付服务等)执行相应的操作。 - 消息处理:
其他系统监听消息队列,接收到消息后执行必要的操作。如果操作成功,相应的服务会更新自己的状态,并将成功处理的确认信息发送回消息队列。 - 失败处理和重试机制:
如果消息的消费者在处理消息时失败,它通常会将消息重新入队或记录到死信队列中。一个好的实践是实现退避策略和重试机制,即在消息处理失败时,等待一段时间后再次重试,直到成功或达到重试次数上限。 - 消息确认和幂等性:
消费者处理消息后,必须向消息队列发送确认信息,以避免消息被重复处理。此外,操作必须是幂等的,即重复执行相同的操作不会改变系统状态,从而确保即使同一个消息被多次处理,系统状态也能保持一致。 - 事务补偿机制:
对于已经成功的操作,如果后续步骤失败,可能需要执行补偿事务来撤销之前的操作,保证数据一致性。
举一个简单的例子:
假设有一个电子商务应用,一个用户下了一个订单,系统需要更新订单服务、库存服务和会计服务。
- 用户提交订单后,订单服务会创建订单,并将订单信息发送到消息队列。
- 库存服务监听消息队列,收到订单信息后减少库存量,并返回确认消息。
- 会计服务也监听消息队列,收到订单信息后更新账户余额,并返回确认消息。
- 如果库存服务或会计服务在处理时失败,它们会将消息放回队列或记录到错误队列,用于后续的处理。
通过上述步骤,即使系统某部分暂时不可用或处理较慢,通过消息重试和补偿机制,数据也会在一段时间后达到最终的一致性状态。
重要的是要注意,实现最终一致性的系统设计必须能够容忍短暂的数据不一致,如果业务场景不能接受这一点,那么这种方法可能不适合。此外,设计时还需要考虑各种异常情况,确保系统的健壮性。