文章 62
浏览 15135
多线程事务伪命题

多线程事务伪命题

背景

最近发现一个有趣的命题,多线程如何保证事务的一致性,简单的说,如何保证多线程情况下,一个线程操作入库逻辑执行失败,其他线程能感知,一起回滚,一起成功

个人看法

再此之前,我是觉得不太现实,为什么呢?

我们先从理论上去推理一下。

我们都知道事务的特性是什么?

这个不难吧?八股文必背内容之一,ACID 必须张口就来:

  • 原子性(Atomicity)
  • 一致性(Consistency)
  • 隔离性(Isolation)
  • 持久性(Durability)

那么问题又来了,你觉得如果有多线程事务,那么我们破坏了哪个特性?

多线程事务你也别想的多深奥,你就想,两个不同的用户各自发起了一个下单请求,这个请求对应的后台实现逻辑中是有事务存在的。

这不就是多线程事务吗?

这种场景下你没有想过怎么分别去控制两个用户的事务操作吧?

因为这两个操作之间就是完全隔离的,各自拿着各自的链接玩儿。

所以多个事务之间的最基本的原则是什么?

隔离性 。两个事务操作之间不应该相互干扰。

而多线程事务想要实现的是 A 线程异常了。A,B 线程的事务一起回滚。

事务的特性里面就卡的死死的。所以,多线程事务从理论上就是行不通的。

通过理论指导实践,那么多线程事务的代码也就是写不出来的。

Spring 事务保证

前面说到隔离性。那么请问,Spring 的源码里面,对于事务的隔离性是如何保证的呢?

答案就是 ThreadLocal。(刚好最近手写过 Spring TX 源码,有兴趣看看那篇博客)

在事务开启的时候,把当前的链接保存在了 ThreadLocal 里面,从而保证了多线程之间的隔离性:

简单来说就是每个线程都有个 ThreadLocal 上下文,如果是其他线程是无法感知另外线程的 ThreadLocal,本质上 Spring 事务都是再同一个 connection 维持的,也就是再同一个事务,Spring 事务才能保证原子性,所以问题来了,如果我们是多线程,那么 Spring 声明式事务就无法处理了

可以看到,这个 resource 对象是一个 ThreadLocal 对象。

在下面这个方法中进行了赋值操作:

org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin

既然声明式事务无法实现,那如果编程式事务呢,哎,细想下发现有点搞头

常规背景

假设有这么一个场景,需要上传一个非常大的 Excel 文件,需要解析里面的数据,数据量涉及很大,需要保证,全部插入入库,如果有个数据失败,其他的一起失败

首先我们都会想到单线程同步

单线程循环插入

对于业务系统而言,这 些 条数据,必须全部落库,差一条都不行。要么就是一条都不插入。

在这个过程中,不会去调用其他的外部接口,也不会有其他的流程去操作这个表的数据。

既然说到一条不差了,那么对于大家直观而言,想到的肯定是两个解决方案:

  1. for 循环中一条条的事务插入。
  2. 直接一条语句批量插入。

对于这种需求,开启事务,然后在 for 循环中一条条的插入可以说是非常 low 的解决方案了。

单线程批量插入

通过用批量插入,每次插入 5000 条数据获取一次 io 连接插入,发现性能提升很大

为什么批量插入能有这么大的飞跃呢?

你想啊,之前 for 循环插入,虽然 SpringBoot 2.0 默认使用了 HikariPool,连接池里面默认给你搞 10 个连接。

但是你只需要一个连接,开启一次事务。这个不耗时。

耗时的地方是你 5000 次 IO 呀。

所以,耗时长是必然的。

而批量插入只是一条 SQL 语句,所以只需要一个连接,还不需要开启事务。

既然这样,那我直接批量一次就发送这些数据,不分批发送会怎么样,想到就干,

最后发现出现异常,说包传输太大,需要更改 mysq max_allowed_packet 配置

通过这指令查询

select @@max_allowed_packet;

知道什么问题了,直接改 测试

set global max_allowed_packet = xxxxxx 

运行发现成功执行入库,因为再一个线程事务内,同时入库,需求完成

但是想想还能不能更快呢

如果我们用多个线程一起分批去处理呢,应该会更快,但是多线程无法保证事务

这么一想就是多线程事务问题,既然如此 Spring 的 @Transactional 就无法实现,只能通过编程式事务

由自己控制每个线程的事务,失败就记录回滚,

多线程插入

伪代码:

 AtomicBoolean allSuccess = new AtomicBoolean(true);
        try (ExecutorService executorService = Executors.newFixedThreadPool(5)) {
            List<String> dataList = parseData();
            List<CompletableFuture<Map<Boolean, Runnable>>> completableFutureList = dataList.stream()
                    .map(data -> CompletableFuture.supplyAsync(() -> new Task().processData(data,allSuccess), executorService)).
                    collect(Collectors.toList());

            CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0]))
                    .thenAccept(arg -> {
                        Stream<Map<Boolean, Runnable>> stream = completableFutureList.stream()
                                .map(CompletableFuture::join);// 获取每个 CompletableFuture 的结果
                        if (allSuccess.get()) {
                            stream.forEach(map -> map.get(Boolean.TRUE).run());
                        } else {
                            stream.forEach(map -> map.get(Boolean.FALSE).run());
                        }
                    }).join();
            executorService.shutdown();
        }
static class Task {
        private PlatformTransactionManager platformTransactionManager;

        private Map<Boolean, Runnable> processData(String data,AtomicBoolean atomicBoolean) {
            TransactionStatus status = platformTransactionManager.getTransaction(new DefaultTransactionDefinition());
            Map<Boolean, Runnable> result = Maps.newHashMap();

            try {
                //todo 保存数据入数据库
            } catch (Exception e) {
                //不能通过atomicBoolean 判断为true,就直接提交事务,
                // 可能存在线程通信延迟的情况,就是说,另外的线程还没运行完成,当前线程看到为true,就直接提交事务了
                //除非收集需要回滚的数据,后续再删除数据,这里直接才有会lambda回调处理
                atomicBoolean.compareAndSet(true,false);
                // todo 打印日志
            } finally {
                result.put(true, () -> platformTransactionManager.commit(status));
                result.put(false, () -> platformTransactionManager.rollback(status));
            }
            return result;
        }
    }

这里的思路就是,通过 CompletableFutureure 线程对象分批去执行数据,维护一个变量 AtomicBoolean,异步线程共享这个变量,只要有一个线程失败,将其 cas 设置为 false 标记位,但不会立马将自己的事务提交或者回滚,而是有另外的独立的线程去协调回滚或者提交事务,哎,是不是很熟悉,这不就是很像两阶段提交(2PC)的一致性协议。

两阶段提交协议

两阶段提交协议,简称 2PC(2 Prepare Commit),是比较常用的解决分布式事务问题的方式,要么所有参与进程都提交事务,要么都取消事务,即实现 ACID 中的原子性(A)的常用手段。

缺陷

通过代码和流程图能了解到,这个协议的缺陷也很明显,就是长时间会占用事务,长时间事务未释放,或者频繁回滚或者提交,最主要式最后无法保证所有事务都提交成功,什么意思呢?

就是说,最后协调者线程收集到所有子线程 Feature 返回的 lambda 事务方法,最后一起提交事务,有可能会出现有些事务提交成功,但是中途宕机或者与数据库出现网络波动,连接数过大等等情况,导致后续的事务就无法提交,也不能完整保证一致性

结论

多线程事务换个角度想,可以理解为分布式事务。可以借助这个案例去了解分布式事务。但是解决分布式事务的最好的方法就是:不要有分布式事务!

而解决分布式事务的绝大部分落地方案都是:最终一致性

一般主流就是通过消息中间件 mq 来实现

消息队列(Message Queue, MQ)是实现分布式系统数据最终一致性的常用手段。它们能够解耦系统组件、提高系统的响应性和可扩展性。在分布式事务处理中,因为直接保证强一致性可能导致性能瓶颈,所以很多系统采用最终一致性模型。

最终一致性的含义是,在没有新的更新请求的情况下,数据的复制品最终都将达到一致的状态。消息队列在这个过程中起到中间人的作用,确保即使某些操作暂时失败,也能够重新尝试,直到所有系统中的数据达成一致。

实现数据最终一致性的一般步骤如下:

  1. 操作日志记录
    当一个操作在主系统(比如订单服务)上执行时,该操作及其结果被记录到操作日志中。
  2. 发送消息
    主系统执行完操作并记录操作日志后,将一个消息发送到消息队列。这个消息包含了足够的信息以允许其他系统(如库存服务、支付服务等)执行相应的操作。
  3. 消息处理
    其他系统监听消息队列,接收到消息后执行必要的操作。如果操作成功,相应的服务会更新自己的状态,并将成功处理的确认信息发送回消息队列。
  4. 失败处理和重试机制
    如果消息的消费者在处理消息时失败,它通常会将消息重新入队或记录到死信队列中。一个好的实践是实现退避策略和重试机制,即在消息处理失败时,等待一段时间后再次重试,直到成功或达到重试次数上限。
  5. 消息确认和幂等性
    消费者处理消息后,必须向消息队列发送确认信息,以避免消息被重复处理。此外,操作必须是幂等的,即重复执行相同的操作不会改变系统状态,从而确保即使同一个消息被多次处理,系统状态也能保持一致。
  6. 事务补偿机制
    对于已经成功的操作,如果后续步骤失败,可能需要执行补偿事务来撤销之前的操作,保证数据一致性。

举一个简单的例子:

假设有一个电子商务应用,一个用户下了一个订单,系统需要更新订单服务、库存服务和会计服务。

  1. 用户提交订单后,订单服务会创建订单,并将订单信息发送到消息队列。
  2. 库存服务监听消息队列,收到订单信息后减少库存量,并返回确认消息。
  3. 会计服务也监听消息队列,收到订单信息后更新账户余额,并返回确认消息。
  4. 如果库存服务或会计服务在处理时失败,它们会将消息放回队列或记录到错误队列,用于后续的处理。

通过上述步骤,即使系统某部分暂时不可用或处理较慢,通过消息重试和补偿机制,数据也会在一段时间后达到最终的一致性状态。

重要的是要注意,实现最终一致性的系统设计必须能够容忍短暂的数据不一致,如果业务场景不能接受这一点,那么这种方法可能不适合。此外,设计时还需要考虑各种异常情况,确保系统的健壮性。


标题:多线程事务伪命题
作者:xiaohugg
地址:https://xiaohugg.top/articles/2023/12/28/1703754492338.html

人民有信仰 民族有希望 国家有力量