分布式场景下的事务机制

事务消息是RocketMQ的一个非常特色的高级特性 , 它的基础诉求是通过RocketMQ的事务机制,来保证上下游的数据?致性 。
我们在单机版本下面只需要在业务方法上加上对应的事务就可以达到效果,但是分布式的场景下 , 多个系统之间的协调配合,你无法知道到底是那个先执行那个后执行,当然在微服务里面存在Seate框架来保证事务,但是这事务的保证始终是心头大患,只能用一句话形容鱼和熊掌不可兼得 。
而RocketMq的事务消息能够在提升性能的情况下满足要求,其主要实现是支持分布式情况下保障消息生产和本地事务的最终一致性,消息生产我们可以使用顺序消息去执行 , 这样我们只需要满足这两个的事务即可 。
 
 实现过程

分布式场景下的事务机制

文章插图
图片
准备阶段:生产者将消息发送到Broker,Broker向生产者发送ack表示消息发送成功,但是此时的消息为一个等待状态 , 不会被消费者去消费 。(生产者继续执行接下来的代码)
确认阶段:当我们执行完所有的代码后,本地事务要么回滚要么提交,此时当我们了解本地事务的状态后,将结果推送给Broker做二次确认结果,如果为Commit则将修改激活准备推送给消费者,如果为Rollback则将消息进行回滚 。
补偿机制:当出现异常情况没有发生二次确认,此时我们在固定时间后将会进行回查,检查回查消息对应的本地事务的状态,重写Commit或者Rollback 。
 涉及状态以及注意点事务消息存在三种状态:
CommitTransaction:提交事务状态 , 此状态下允许消费者消费 。
RollbackTransaction:回滚事务状态,此状态下消息会被删除 。
Unknown:中间状态 , 此状态下会等待本地事务处理结果进行对应操作 。
注意点:
本消息状态是一种对消费者不可见的状态,将消息的内容放到系统Topic的RMQ_SYS_TRANS_HALF_TOPIC队列里面去 。
事务消息中的相关参数可以进行设置,比如:本地事务回查次数transactionCheckMax默认15次,本地事务回查的间隙transactionCheckInterval默认60s , 超出后会直接将消息丢弃 。
RocketMQ的事务消息是指应用本地事务和发送消息操作可以定义到全局事务中,要么同时成功 , 要么同时失败,通过RocketMQ的事务信息可以实现可靠消息的最终一致性方案 。
 源码解析Producer端通过构建TransactionMQProducer对象绑定事务监听 。
TransactionListener transactionListener = new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {return LocalTransactionState.COMMIT_MESSAGE;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {return LocalTransactionState.COMMIT_MESSAGE;}};TransactionMQProducer producer = new TransactionMQProducer(producerGroupTemp);producer.setTransactionListener(transactionListener);producer.setNamesrvAddr("127.0.0.1:9876");product.start();SendResult result = producer.sendMessageInTransaction(message, arg);执行sendMessageInTransaction方法来发送消息 。
public TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter localTransactionExecuter, final Object arg)throws MQClientException {// 检查TransactionListener是否存在,如果不存在就直接抛异常TransactionListener transactionListener = getCheckListener();if (null == localTransactionExecuter && null == transactionListener) {throw new MQClientException("tranExecutor is null", null);}// 事务消息不支持延迟等特性if (msg.getDelayTimeLevel() != 0) {MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);}Validators.checkMessage(msg, this.defaultMQProducer);SendResult sendResult = null;// 设置half属性 , 表明是事务属性MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");// 设置所属生成者组// broker向生产者发送回查事务请求根据这个producergroup找到指定的channel// 生产者能找到所有在同一个组的机器实例从而检查事务状态MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());try {// 同步发送sendResult = this.send(msg);} catch (Exception e) {throw new MQClientException("send message Exception", e);}LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable localException = null;// 消息返回信息switch (sendResult.getSendStatus()) {// 第一阶段消息发送成功case SEND_OK: {try {if (sendResult.getTransactionId() != null) {// 设置事务ID属性msg.putUserProperty("__transactionId__", sendResult.getTransactionId());}String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {msg.setTransactionId(transactionId);}if (null != localTransactionExecuter) {// 执行本地事务localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener != null) {log.debug("Used new transaction API");// 发送消息成功后,执行本地操作localTransactionState = transactionListener.executeLocalTransaction(msg, arg);}if (null == localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;}if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {log.info("executeLocalTransactionBranch return {}", localTransactionState);log.info(msg.toString());}} catch (Throwable e) {log.info("executeLocalTransactionBranch exception", e);log.info(msg.toString());localException = e;}}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}try {// 本地事务执行完毕向broker提交事务或回滚事务this.endTransaction(msg, sendResult, localTransactionState, localException);} catch (Exception e) {log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);}TransactionSendResult transactionSendResult = new TransactionSendResult();transactionSendResult.setSendStatus(sendResult.getSendStatus());transactionSendResult.setMessageQueue(sendResult.getMessageQueue());transactionSendResult.setMsgId(sendResult.getMsgId());transactionSendResult.setQueueOffset(sendResult.getQueueOffset());transactionSendResult.setTransactionId(sendResult.getTransactionId());transactionSendResult.setLocalTransactionState(localTransactionState);return transactionSendResult;}


推荐阅读