在众多关于MQ的面试八股文中有这么一道题 , “如何保证MQ消息消费的幂等性” 。
为什么需要保证幂等性呢?是因为消息会重复消费 。
为什么消息会重复消费?
明明已经消费了 , 为什么消息会被再次被消费呢?
不同的MQ产生的原因可能不一样
本文就以RocketMQ为例 , 来扒一扒RocketMQ中会导致消息重复消息的原因 , 最终你会发现 , 其实消息重复消费算是RocketMQ无奈的“bug” 。
文章插图
消息发送异常时重复发送
首先 , 我们来瞅瞅RocketMQ发送消息和消费消息的基本原理 。
文章插图
如图 , 简单说一下上图中的概念:
- Broker , 就是RocketMQ的服务端 , 如上图就有两个服务实例
- Topic就是一类消息集合的名字
- Queue就是Topic的对应的队列 , 消息都存在Queue上 , 每个Topic都会有自己的几个Queue
所以 , 整个消息发送和消费过程大致如下:
- 生产者在发送消息之前根据负载均衡策略(默认是轮询)选择一个Queue , 然后跟这个Queue所在的机器建立连接 , 把消息发送到这个Queue上
- 消费者只要消费这个Queue , 那么就能消费到消息
在正常情况下 , 生产者的确是按照这个方式来发送消息的
但是当出现了异常时 , 这种异常包括消息发送超时、响应超时等等 , RocketMQ为了保证消息成功发送 , 会进行消息发送的重试操作 , 默认情况下会最多会重试两次
文章插图
重试操作比较简单 , 就是选择另一台机器的Queue来发送 。
虽然重试操作可以很大程度保证消息能够发送成功 , 但是同时也会带来消息重复发送的问题 。
举个例子 , 假设生产者向A机器发送消息 , 发生了异常 , 响应超时了 , 但是就一定代表消息没发成功么?
不一定 , 有可能会出现服务端的确接收到并处理了消息 , 但是由于网络波动等等 , 导致生产者接收不到服务端响应的情况 , 此时消息处理成功了 , 但是生成者还是以为发生了异常
此时如果发生重试操作 , 那么势必会导致消息被发送了两次甚至更多次 , 导致服务端存了多条相同的消息 , 那么就一定会导致消费者重复消费消息 。
消费消息抛出异常
在RocketMQ的并发消费消息的模式下 , 需要用户实现MessageListenerConcurrently接口来处理消息
文章插图
当消费者获取到消息之后会调用MessageListenerConcurrently的实现 , 传入需要消费的消息集合msgs , 这里提到的msgs很重要
文章插图
如上代码 , 当消息消费出现异常的时候 , status就会为null , 后面就会将status设置成为RECONSUME_LATER 。
RECONSUME_LATER翻译成功中文就是稍后重新消费的意思
所以从这可以看出 , 一旦抛出异常 , 那么消息之后就可以被重复消息 。
到这其实可能有小伙伴觉得消息消费失败重新消费很正常 , 保证消息尽可能消费成功 。
对 , 这句话不错 , 的确可以在一定程度上保证消费异常的消息可以消费成功 。
推荐阅读
- 一文看懂Java中的ThreadLocal源码和注意事项
- 一文看懂Redisson分布式锁的Watchdog机制源码实现
- SpringBoot整合RocketMQ,老鸟们都是这么玩的!
- 张钰|?深扒2003年“张钰事件”:20盘录像带,30个导演,1场桃色罗生门!
- ChatGPT 开源了第一款插件,都来学习一下源码吧!
- 周杰伦为什么娶了昆凌(天涯深扒昆凌)
- 学会这20个库,让你快速看懂 vue3 和 vite3 源码
- 迪丽热巴|孙红雷也移民了?深扒他的资产,为钱掉了名誉何必呢
- SpringBoot 与RabbitMQ、RocketMQ高可靠、高性能、分布式应用实践
- 头狼涨停战法通达信选股公式副图源码?有谁知道追涨停技术?