附代码实现 彻底弄懂RocketMQ文件存储( 三 )

 
2.读取consumeQueue文件,并根据offset从commitLog读取一条完整的消息
public class ConsumeQueueMessageReadTest {public static MappedByteBuffer mappedByteBuffer = null;private static int MESSAGE_COUNT = 10;public static void main(String[] args) throws IOException {FileChannel fileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/consumerQueue.txt")),StandardOpenOption.WRITE, StandardOpenOption.READ);MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);fileChannel.close();//根据索引下标读取索引,实际情况是用户消费的最新点位(for循环的i值),//存在在broker的偏移量文件中int index = 0;for (int i = 0; i < MESSAGE_COUNT; i++) {mappedByteBuffer.position(i * 20);long commitlogOffset = mappedByteBuffer.getLong();// System.out.println(commitlogOffset);long msgLen = mappedByteBuffer.getInt();Long tag = mappedByteBuffer.getLong();//System.out.println("======读取到consumerQueue,commitlogOffset:"+commitlogOffset+",msgLen :"+msgLen+"===");//根据偏移量读取CommitLogSystem.out.println("=================commitlog读取第:"+index+"消息,偏移量为" + commitlogOffset + "===================");readCommitLogByOffset(Integer.valueOf(commitlogOffset + ""));index ++;}}public static MappedByteBuffer initFileChannel() throws IOException {if (mappedByteBuffer == null) {FileChannel commitLogfileChannel = FileChannel.open(Paths.get(URI.create("file:/c:/123/commitLog.txt")),StandardOpenOption.WRITE, StandardOpenOption.READ);mappedByteBuffer = commitLogfileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 409600);commitLogfileChannel.close();}return mappedByteBuffer;}/*** 根据偏移量读取commitLog* */public static void readCommitLogByOffset(int offset) throws IOException {/* 存放顺序,读到时候保持顺序一致b.putLong(totalSize);//totalSizeb.put(getBytes(msgId, 64));//msgIdb.put(getBytes(topic, 64));//topic,定长64b.putLong(queueOffset);//索引偏移量b.putLong(msgBody.getBytes(StandardCharsets.UTF_8).length);//bodySizeb.put(msgBody.getBytes(StandardCharsets.UTF_8));//bodyb.put(getBytes(transactionId, 64));b.putLong(commitLogOffset);//commitLogOffset*/MappedByteBuffer mappedByteBuffer = initFileChannel();mappedByteBuffer.position(offset);long totalSize = mappedByteBuffer.getLong();//消息长度byte[] msgIdByte = new byte[64];//uuid 固定是64mappedByteBuffer.get(msgIdByte);byte[] topicByte = new byte[64];// 固定是64mappedByteBuffer.get(topicByte);long queueOffset = mappedByteBuffer.getLong();Long bodySize = mappedByteBuffer.getLong();int bSize = Integer.valueOf(bodySize + "");byte[] bodyByte = new byte[bSize];//bodySize 长度不固定mappedByteBuffer.get(bodyByte);byte[] transactionIdByte = new byte[64];//uuid 固定是64mappedByteBuffer.get(transactionIdByte);long commitLogOffset = mappedByteBuffer.getLong();//偏移量System.out.println("totalSize:" + totalSize);System.out.println("msgId:" + new String(msgIdByte));System.out.println("topic:" + new String(topicByte));System.out.println("queueOffset:" + queueOffset);System.out.println("bodySize:" + bodySize);System.out.println("body:" + new String(bodyByte));System.out.println("transactionId:" + new String(transactionIdByte));System.out.println("commitLogOffset:" + commitLogOffset);}}运行结果:
=================commitlog读取第:0消息,偏移量为0===================totalSize:338msgId:8d8eb486-d94c-4da1-bdfe-f0587161ea05topic:Topic-testqueueOffset:0bodySize:58body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmtransactionId:874605e6-69d2-4301-a65e-01e63de75a4dcommitLogOffset:0=================commitlog读取第:1消息,偏移量为338===================totalSize:338msgId:57c74e53-4ea1-4a8c-9c7f-c50417d8681etopic:Topic-testqueueOffset:1bodySize:58body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmtransactionId:b991a3e9-66fc-4a54-97fc-1492f7f54d3ccommitLogOffset:338=================commitlog读取第:2消息,偏移量为676===================totalSize:296msgId:a0c7c833-9811-4f17-800b-847766aef7ddtopic:Topic-testqueueOffset:2bodySize:16body:消息内容msgmtransactionId:9a836d21-704f-46ae-926c-b7933efe06a5commitLogOffset:676=================commitlog读取第:3消息,偏移量为972===================totalSize:299msgId:050d6330-1f4a-4dff-a650-4f7eaee63356topic:Topic-testqueueOffset:3bodySize:19body:消息内容msgmsgmtransactionId:19506313-c7ae-4282-8bc7-1f5ca7735c44commitLogOffset:972=================commitlog读取第:4消息,偏移量为1271===================totalSize:306msgId:f5c5be5b-2d9d-4dd8-a9e3-1fdcacc8c2c5topic:Topic-testqueueOffset:4bodySize:26body:消息内容msgmsgmsgmsgmstransactionId:09f3b762-159e-4486-8820-0bce0ef7972dcommitLogOffset:1271=================commitlog读取第:5消息,偏移量为1577===================totalSize:313msgId:e76911ad-8d05-4d0b-b735-0b2f487f89f1topic:Topic-testqueueOffset:5bodySize:33body:消息内容msgmsgmsgmsgmsgmsgmsgtransactionId:42dce613-6aaf-466b-b185-02a3f7917579commitLogOffset:1577=================commitlog读取第:6消息,偏移量为1890===================totalSize:321msgId:05be27f8-fb7a-4662-904f-2263e8899086topic:Topic-testqueueOffset:6bodySize:41body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmstransactionId:6c7db927-911c-4d19-a240-a951fad957bdcommitLogOffset:1890=================commitlog读取第:7消息,偏移量为2211===================totalSize:318msgId:9a508d90-30f6-4a25-812f-25d750736afetopic:Topic-testqueueOffset:7bodySize:38body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmstransactionId:0bbc5e92-0a78-4699-a7a4-408e7bd3b897commitLogOffset:2211=================commitlog读取第:8消息,偏移量为2529===================totalSize:305msgId:63249e08-bd0c-4a5b-954b-aea83cb442betopic:Topic-testqueueOffsmsgmtransactionId:22cc0dd6-2036-4423-8e6f-d7043b953724commitLogOffset:2529=================commitlog读取第:9消息,偏移量为2834===================totalSize:329msgId:93c46c53-b097-4dd0-90d7-06d5d877f489topic:Topic-testqueueOffset:9bodySize:49body:消息内容msgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmsgmtransactionId:e9078205-15be-42b1-ad7e-55b9f5e229ebcommitLogOffset:2834


推荐阅读