深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因( 五 )


此时由于没有提交消费进度 , RocketMQ服务端告诉消费者3的消费进度就会比实际的低 , 这就造成了消息重复消费的情况 。
清理长时间消费的消息
在RocketMQ中有这么一个机制 , 会定时清理长时间正在消费的消息 。
如图 , 假设有5条消息现在正在被消费者处理 , 这5条消息会被存在一个集合中 , 并且是按照offset的大小排序 , 消息1的offset最小 , 消息5的offset最大 。
RocketMQ消费者启动时会开启一个默认15分钟执行一次的定时任务

深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因

文章插图
 
这个定时任务会去检查正在处理的消息的第一条消息 , 也就是图中的消息1 , 一旦发现消息1已经处理了超过15分钟了 , 那么此时就会将消息1从集合中移除 , 之后会隔一定时间再次消费消息1 。
这也会有坑 , 虽然消息1从集合中被移除了 , 但是消息1并没有消失 , 仍然被消费者继续处理 , 但是消息1隔一定时间就会再次被消费 , 就会出现消息1被重复消费的情况 。
这就是清理长时间消费的消息导致重复消费的原因 。
但此时又会引出一个新的疑问 , 为什么要移除这个处理超过15分钟的消息呢?
这就又跟前面提到的消费进度提交有关!
前面说过消息被消费完成之后会提交消费进度 , 提交的消费进度实际会有两种情况:
第一种就是某个线程消费了所有的消息 , 当把所有的消息都消费完成之后 , 就会把消息从集合中全部移除 , 此时提交的消费进度offset就是图中消息5的offset+1
加1的操作是为了保证如果发生重启 , 那么消费者下次消费的起始位置就是消息5后面的消息 , 保证消息5不被重复消费
第二种情况就不太一样了
假设现在有两个线程来处理这5条消息 , 线程1处理前2条 , 线程2处理后3条 , 如图
深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因

文章插图
 
现在线程1出现了长时间处理消息的情况 。
此时线程2处理完消息之后 , 移除后面三条消息 , 准备提交offset的时候发现集合中还有元素 , 就是线程1正在处理的前两条消息 , 此时线程2提交的offset并不是消息5对应的offset , 而是消息1的offset , 代码如下
深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因

文章插图
 
这么做的主要原因就是保证消息1和消息2至少被消费一次 。
因为一旦提交了消息5对应的offset , 如果消费者重启了 , 下次消费就会接着从消息5的后面开始消费 , 而对于消息1和消息2来说 , 并不知道有没有被消费成功 , 就有可能出现消息丢失的情况 。
所以 , 一旦集合中最前面的消息长时间处理 , 那么就会导致后面被消费的消息进度无法提交 , 那么重启之后就会导致大量消息被重复消费 。
为了解决这个问题 , RocketMQ引入了定时清理的机制 , 定时清理长时间消费的消息 , 这样消费进度就可以提交了 。
最后
总得来说 , RocketMQ中还是存在很多种导致消息重读消费的情况 , 并且官方也说了 , 只是在大多数情况下消息不会重复
深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因

文章插图
 
所以如果你的业务场景中需要保证消息不能重复消费 , 那么就需要根据业务场景合理的设计幂等技术方案 。
 
原文:https://mp.weixin.qq.com/s/XtIZbObkDcDzcwttSDslZg 作者:三友
 
如果感觉本文对你有帮助 , 点赞关注支持一下




推荐阅读