前言RocketMQ作为一款优秀的开源消息中间件,实现了文件的高性能存储和读取,在众多消息中间件中脱颖而出,其文件模块设计思想很值得我们学习和借鉴 。因此很多开发者在使用的时候,也开始研究其文件存储的实现原理,但是在学习过程中,由于自身知识储备不足,往往只能了解其基本原理和整体架构,对于具体是怎么实现是,用到了什么技术,往往是一知半解 。目前网上有很多介绍RocketMQ原理和源码的文章,但是很多都是讲解整体架构,对源代码的分析也仅仅是停留在代码注释层面,导致对整体和细节的把握不能统一,给人一种"不识庐山真面目,只缘身在此山中"的感觉 。
笔者针对开发者在研究RocketMQ的过程中遇到的困惑,基于对RocketMQ的文件存储原理和源码研究,结合JAVA NIO的文件读写,自己动手实现了一个简化版本的RocketMQ文件系统,分享出来,希望能抽丝剥茧,帮助开发者从本质上理解RocketMQ文件存储的原理,起到抛砖引玉,举一反三的作用 。
本文不是一篇介绍RocketMQ存储基本原理的文章,本文假设读者对RocketMQ的CommitLog,ConsumeQueue,IndexFile已经有一定的了解,熟悉java NIO文件读写 。本文适合对RocketMQ的文件存储原理有一定的了解,并且希望进一步了解RocketMQ是如何通过java NIO实现的读者 。
核心原理在向commitLog文件写入消息的时候,需要记录该条消息在commitLog文件的偏移量offset(消息在commitLog的起始字节数),读取的时候根据offset读取 。RocketMQ保存offset的文件为consumeQueue 和indexFile 。
文章插图
RockeetMQ文件读写流程
RocketMQ文件存储示意图
文章插图
RocketMQ文件逻辑存储结构
文章插图
RocketMQ文件offset查找示意图
CommitLog读写
commitLog文件写入的是完整的消息,长度不固定,因此读取的时候只能根据文件存储偏移量offset读取 。实际上offset保存在consumeQueue,indexFile文件中 。
consumeQueue读写
consumeQueue在消费方拉取消息的时候读取,读取原理比较简单 。
consumeQueue每条数据固定长度是20(8:offset+4:msgLen+8:tagCode),顺序写入,每写入一条消息,写入位置postition+20 。读取的时候按消息序号index(第几条消息)读取 。
假设消费方要消费消息序号index=2的消息(第2条消息),过程如下:
1.定位consumeQueue文件,然后读位置postition定位到40(2*20),读取数据 。
2.根据1读取 的数据取到offset值(存储在consumeQueue的偏移量) 。
3.根据2得到的offset值,定位commitLog文件,然后读取commitLog上的整条消息 。
参见RocketMQ文件offset查找示意图
indexFile读写
indexFile由indexHead(长度40),500W个hash槽位(每个槽位长度固定4),2000W个indexData组成 。
indexFile是为了方便通过messageId读取消息而设计的,因此需要将messageId和消息序号index做一层映射,将messageId取模后得到槽位下标(第几个槽位),然后将当前messageId对应的消息index(消息序号)放到对应的槽位,并将数据顺序保存到indexFile的indexData部分 。
写入过程:
1.hash(messageId)%500W得到槽位(slot)的下标slot_index(第几个槽位,槽位长度固定4),
然后将消息序号index存放到对应的槽位(为简化设计,暂不考虑hash冲突的情况) 。
2.存储indexData数据,起始存储位置postition 为
【附代码实现 彻底弄懂RocketMQ文件存储】indexDataOffset = 40(文件头长度) + 500W * 4+(index-1)*20
读取过程:
1.hash(messageId) % 500W定位到槽位的下标slot_index(第几个槽位) 。
2.然后根据槽位下标计算槽位的偏移量slot_offset(每个槽位的固定长度 是4) 。
slot_offset = 40(文件头长度) + slot_index * 4 。
3.然后根据slot_offset获取到槽位上存储的消息的序号index 。
4.根据消息的index计算该条消息存储在indexFile的indexData部分的偏移量indexDataOffset,
indexDataOffset = 40(文件头长度) + 500W * 4+( index - 1 ) * 20
5.根据indexDataOffset读取indexFile的IndexData部分,然后获取commitLog的offset,即可读取到实际的消息 。
参见RocketMQ文件offset查找示意图
代码实现1.手动生成10个消息,并创建commitLog文件,consumeQueue,indexFile文件
public class CommitLogWriteTest {private static Long commitLogOffset = 0L;//8byte(commitlog offset)private static List<ConsumerQueueData> consumerQueueDatas = new ArrayList<>();private static List<IndexFileItemData> indexFileItemDatas = new ArrayList<>();private static int MESSAGE_COUNT = 10;public static void main(String[] args) throws IOException {createCommitLog();createConsumerQueue();createIndexFile();}private static void createCommitLog() throws IOException {System.out.println("");System.out.println("commitLog file create!" );FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")),StandardOpenOption.WRITE, StandardOpenOption.READ);MAppedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);fileChannel.close();Random random = new Random();int count = 0;for (int i = 0; i < MESSAGE_COUNT; i++) {String topic = "Topic-test";String msgId = UUID.randomUUID().toString();String msgBody = "消息内容" + "msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsg".substring(0, random.nextInt(48) + 1);//long queueOffset = i;//索引偏移量String transactionId = UUID.randomUUID().toString();/* 数据格式,位置固定int totalSize;//消息长度String msgId;String topic;long queueOffset;//索引偏移量long bodySize;//消息长度byte[] body;//消息内容String transactionId;long commitLogOffset;//从第一个文件开始算的偏移量*/int msgTotalLen = 8 //msgTotalLen field+ 64//msgId field长度+ 64 //topic field长度+ 8 //索引偏移量field长度+ 8 //消息长度field长度+ msgBody.getBytes(StandardCharsets.UTF_8).length //field+ 64//transactionId field长度+ 64//commitLogOffset field长度;;// 定位写入文件的起始位置//如果3个消息长度分别是100,200,350,则偏移量分别是0,100,300mappedByteBuffer.position(Integer.valueOf(commitLogOffset + ""));mappedByteBuffer.putLong(msgTotalLen);//msgTotalLenmappedByteBuffer.put(getBytes(msgId, 64));//msgIdmappedByteBuffer.put(getBytes(topic, 64));//topic,定长64mappedByteBuffer.putLong(queueOffset);//索引偏移量mappedByteBuffer.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySizemappedByteBuffer.put(msgBody.getBytes(StandardCharsets.UTF_8));//bodymappedByteBuffer.put(getBytes(transactionId, 64));mappedByteBuffer.putLong(commitLogOffset);//commitLogOffsetSystem.out.println("写入消息,第:" + i + "次");System.out.println("msgTotalLen:" + msgTotalLen);System.out.println("msgId:" + msgId);System.out.println("topic:" + topic);System.out.println("msgBody:" + msgBody);System.out.println("transactionId:" + transactionId);System.out.println("commitLogOffset:" + commitLogOffset);ConsumerQueueData consumerQueueData = https://www.isolves.com/it/sjk/bk/2022-12-13/new ConsumerQueueData();consumerQueueData.setOffset(commitLogOffset);consumerQueueData.setMsgLength(msgTotalLen);consumerQueueData.setTagCode(100L);//准备生成consumeQueue文件consumerQueueDatas.add(consumerQueueData);IndexFileItemData indexFileItemData = new IndexFileItemData();indexFileItemData.setKeyHash(msgId.hashCode());indexFileItemData.setMessageId(msgId);indexFileItemData.setPhyOffset(commitLogOffset);//准备生成indexFile文件indexFileItemDatas.add(indexFileItemData);mappedByteBuffer.force();commitLogOffset = msgTotalLen + commitLogOffset;count++;}System.out.println("commitLog数据保存完成,totalSize:" + count);}public static void createConsumerQueue() throws IOException {System.out.println("");System.out.println("ConsumerQueue file create!" );FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/consumerQueue.txt")),StandardOpenOption.WRITE, StandardOpenOption.READ);MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 4096);fileChannel.close();int count = 0;for (int i = 0; i
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- sql实现题库按字母搜索
- 附和的意思 附和的意思
- 低代码开发平台?ddm是什么意思?
- 毛巾变色是什么菌引起
- 附pe启动教程+获取方式 U盘 PE系统制作教程
- 第一课时 清华附小同上一课:三年级下册第1课《古诗三首》
- 对资产阶级进行和平赎买在我国得以实现,对资产阶级实行和平赎买最早是-
- 宋丹丹|宋丹丹:在儿子家附近买了房,最高卖30万一平,直言很怕他们搬家
- EMUI11新技能,带你实现人生“破圈”
- 教学|30岁提前退休,小伙攒到100万在云南租房实现“躺平”人生