消息队列之事务消息,RocketMQ 和 Kafka是如何做的?

责编|郑丽媛
来源|yes的练级攻略(ID:yes_java)
每个时代 , 都不会亏待会学习的人 。
大家好 , 我是yes 。
今天我们来谈一谈消息队列的事务消息 , 一说起事务相信大家都不陌生 , 脑海里蹦出来的就是ACID 。
通常我们理解的事务就是为了一些更新操作要么都成功 , 要么都失败 , 不会有中间状态的产生 , 而ACID是一个严格的事务实现的定义 , 不过在单体系统时候一般都不会严格的遵循ACID的约束来实现事务 , 更别说分布式系统了 。
分布式系统往往只能妥协到最终一致性 , 保证数据最终的完整性和一致性 , 主要原因就是实力不允许...因为可用性为王 。
而且要保证完全版的事务实现代价很大 , 你想想要维护这么多系统的数据 , 不允许有中间状态数据可以被读取 , 所有的操作必须不可分割 , 这意味着一个事务的执行是阻塞的 , 资源是被长时间锁定的 。
在高并发情况下资源被长时间的占用 , 就是致命的伤害 , 举一个有味道的例子 , 如厕高峰期 , 好了懂得都懂 。
准备阶段就是协调者向各参与者发送准备命令 , 这个阶段参与者除了事务的提交啥都做了 , 而提交阶段就是协调者看看各个参与者准备阶段都o不ok , 如果有ok那么就向各个参与者发送提交命令 , 如果有一个不ok那么就发送回滚命令 。
这里的重点就是2PC只适用于数据库层面的事务 , 什么意思呢?就是你想在数据库里面写一条数据同时又要上传一张图片 , 这两个操作2PC无法保证两个操作满足事务的约束 。
而且2PC是一种强一致性的分布式事务 , 它是同步阻塞的 , 即在接收到提交或回滚命令之前 , 所有参与者都是互相等待 , 特别是执行完准备阶段的时候 , 此时的资源都是锁定的状态 , 假如有一个参与者卡了很久 , 其他参与者都得等它 , 产生长时间资源锁定状态下的阻塞 。
总体而言效率低 , 并且存在单点故障问题 , 协调者是就是那个单点 , 并且在极端条件下存在数据不一致的风险 , 例如某个参与者未收到提交命令 , 此时宕机了 , 恢复之后数据是回滚的 , 而其他参与者其实都已经执行了提交事务的命令了 。
TCC分为三个阶段try-confirm-cancel , 简单的说就是每个业务都需要有这三个方法 , 先都执行try方法 , 这一阶段不会做真正的业务操作 , 只是先占个坑 , 什么意思呢?比如打算加10个积分 , 那先在预添加字段加上这10积分 , 这个时候用户账上的积分其实是没有增加的 。
然后如果都try成功了那么就执行confirm方法 , 大家都来做真正的业务操作 , 如果有一个try失败了那么大家都执行cancel操作 , 来撤回刚才的修改 。
可以看到TCC其实对业务的耦合性很大 , 因为业务上需要做一定的改造才能完成这三个方法 , 这其实就是TCC的缺点 , 并且confirm和cancel操作要注意幂等 , 因为到执行这两步的时候没有退路 , 是务必要完成的 , 因此需要有重试机制 , 所以需要保证方法幂等 。
它的目的是为了解决消息生产者与消息消费者的数据一致性问题 。
比如你点外卖 , 我们先选了炸鸡加入购物车 , 又选了瓶可乐 , 然后下单 , 付完款这个流程就结束了 。
我们希望的就是下单成功之后购物车的菜品最终会被删除 , 所以要点就是下单和发消息这两个步骤要么都成功要么都失败 。
RocketMQ的事务消息也可以被认为是一个两阶段提交 , 简单的说就是在事务开始的时候会先发送一个半消息给Broker 。
半消息的意思就是这个消息此时对Consumer是不可见的 , 而且也不是存在真正要发送的队列中 , 而是一个特殊队列 。
发送完半消息之后再执行本地事务 , 再根据本地事务的执行结果来决定是向Broker发送提交消息 , 还是发送回滚消息 。
此时有人说这一步发送提交或者回滚消息失败了怎么办?
影响不大 , Broker会定时的向Producer来反查这个事务是否成功 , 具体的就是Producer需要暴露一个接口 , 通过这个接口Broker可以得知事务到底有没有执行成功 , 没成功就返回未知 , 因为有可能事务还在执行 , 会进行多次查询 。
如果成功那么就将半消息恢复到正常要发送的队列中 , 这样消费者就可以消费这条消息了 。
我们再来简单的看下如何使用 , 我根据官网示例代码简化了下 。
至此RocketMQ事务消息大致的流程已经清晰了 , 我们画一张整体的流程图来过一遍 , 其实到第四步这个消息要么就是正常的消息 , 要么就是抛弃什么都不存在 , 此时这个事务消息已经结束它的生命周期了 。
在Broker的SendMessageProcessor#sendMessage中会处理这个半消息请求 , 因为今天主要分析的是事务消息 , 所以其他流程不做分析 , 我大致的说一下原理 。
简单的说就是sendMessage中查到接受来的消息的属性里面
MessageConst.PROPERTY_TRANSACTION_PREPARED是true , 那么可以得知这个消息是事务消息 , 然后再判断一下这条消息是否超过最大消费次数 , 是否要延迟 , Broker是否接受事务消息等操作后 , 将这条消息真正的topic和队列存入属性中 , 然后重置消息的topic为RMQ_SYS_TRANS_HALF_TOPIC , 并且队列是0的队列中 , 使得消费者无法读取这个消息 。
以上就是整体处理半消息的流程 , 我们来看一下源码 。
Broker处理提交或者回滚消息的处理方法是
EndTransactionProcessor#processRequest, 我们来看一看它做了什么操作 。
那个后台服务就是TransactionalMessageCheckService服务 , 它会定时的扫描半消息队列 , 去请求反查接口看看事务成功了没 , 具体执行的就是
TransactionalMessageServiceImpl#check方法 。
我大致说一下流程 , 这一步骤其实涉及到的代码很多 , 我就不贴代码了 , 有兴趣的同学自行了解 。 不过我相信用语言也是能说清楚的 。
首先取半消息topic即RMQ_SYS_TRANS_HALF_TOPIC下的所有队列 , 如果还记得上面内容的话 , 就知道半消息写入的队列是id是0的这个队列 , 然后取出这个队列对应的half_op主题下的队列 , 即RMQ_SYS_TRANS_OP_HALF_TOPIC主题下的队列 。
这个half_op主要是为了记录这个事务消息已经被处理过 , 也就是说已经得知此事务消息是提交的还是回滚的消息会被记录在half_op中 。
然后调用fillOpRemoveMap方法 , 从half_op取一批已经处理过的消息来去重 , 将那些没有记录在half_op里面的半消息调用putBackHalfMsgQueue又写入了commitlog中 , 然后发送事务反查请求 , 这个反查请求也是oneWay , 即不会等待响应 。 当然此时的半消息队列的消费offset也会推进 。
看到这里相信大家会有一些疑问 , 比如为什么要有个half_op , 为什么半消息处理了还要再写入commitlog中别急听我一一道来 。
首先RocketMQ的设计就是顺序追加写入 , 所以说不会更改已经入盘的消息 , 那事务消息又需要更新反查的次数 , 超过一定反查失败就判定事务回滚 。
因此每一次要反查的时候就将以前的半消息再入盘一次 , 并且往前推进消费进度 。 而half_op又会记录每一次反查的结果 , 不论是提交还是回滚都会记录 , 因此下一次还循环到处理此半消息的时候 , 可以从half_op得知此事务已经结束了 , 因此就被过滤掉不需要处理了 。
如果得到的反查的结果是UNKNOW , 那half_op中也不会记录此结果 , 因此还能再次反查 , 并且更新反查次数 。
到现在整个流程已经清晰了 , 我再画个图总结一下Broker的事务处理流程 。
而Kafka事务消息则是用在一次事务中需要发送多个消息的情况 , 保证多个消息之间的事务约束 , 即多条消息要么都发送成功 , 要么都发送失败 , 就像下面代码所演示的 。
讲到这我就想扯一下了 , 说到这个ExactlyOnce其实不太清楚的同学很容易会误解 。
我们知道消息可靠性有三种 , 分别是最多一次、恰好一次、最少一次 , 之前在消息队列连环问的文章我已经提到了基本上我们都是用最少一次然后配合消费者端的幂等来实现恰好一次 。
消息恰好被消费一次当然我们所有人追求的 , 但是之前文章我已经从各方面已经分析过了 , 基本上难以达到 。
而Kafka竟说它能实现ExactlyOnce?这么牛啤吗?这其实是Kafka的一个噱头 , 你要说他错 , 他还真没错 , 你要说他对但是他实现的ExactlyOnce不是你心中想的那个ExactlyOnce 。
它的恰好一次只能存在一种场景 , 就是从Kafka作为消息源 , 然后做了一番操作之后 , 再写入Kafka中 。
所以说Kafka实现的是在特定场景下的恰好一次 , 不是我们所想的利用Kafka来发送消息 , 那么这条消息只会恰巧被消费一次 。
这其实和Redis说他实现事务了一样 , 也不是我们心想的事务 。
所以开源软件说啥啥特性开发出来了 , 我们一味的相信 , 因此其往往都是残血的或者在特殊的场景下才能满足 , 不要被误导了 , 不能相信表面上的描述 , 还得详细的看看文档或者源码 。
不过从另一个角度看也无可厚非 , 作为一个开源软件肯定是想更多的人用 , 我也没说谎呀 , 我文档上写的很清楚的 , 这标题也没骗人吧?
确实 , 比如你点进震惊xxxx标题的文章 , 人家也没骗你啥 , 他自己确实震惊的呢 。
Kafka的事务有事务协调者角色 , 事务协调者其实就是Broker的一部分 。
在开始事务的时候 , 生产者会向事务协调者发起请求表示事务开启 , 事务协调者会将这个消息记录到特殊的日志-事务日志中 , 然后生产者再发送真正想要发送的消息 , 这里Kafka和RocketMQ处理不一样 , Kafka会像对待正常消息一样处理这些事务消息 , 由消费端来过滤这个消息 。
然后发送完毕之后生产者会向事务协调者发送提交或者回滚请求 , 由事务协调者来进行两阶段提交 , 如果是提交那么会先执行预提交 , 即把事务的状态置为预提交然后写入事务日志 , 然后再向所有事务有关的分区写入一条类似事务结束的消息 , 这样消费端消费到这个消息的时候就知道事务好了 , 可以把消息放出来了 。
最后协调者会向事务日志中再记一条事务结束信息 , 至此Kafka事务就完成了 , 我拿confluent.io上的图来总结一下这个流程 。
需要贴代码的文章其实很难受 , 这贴的多不好 , 贴的少又怕不清晰 , 真的难 , 如果觉得文章不错记得点个在看哟 。


    推荐阅读