彻底搞透分布式一致性( 六 )

@Retryable(value = https://www.isolves.com/it/cxkf/jiagou/2023-11-06/{Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 1000))public void doSomething() throws Exception {// 业务逻辑代码}

  • 1.
  • 2.
  • 3.
  • 4.
该实现会在方法调用失败时进行最多3次的重试,每次重试之间会等待1秒的时间 。如果超过3次重试仍然失败,则抛出异常 。
  • 基于自定义异常处理的重试实现
@Retryable(value = https://www.isolves.com/it/cxkf/jiagou/2023-11-06/{Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 1000), fallback = @Fallback(fallbackMethod = "doDefault"))public void doSomething() throws Exception {// 业务逻辑代码}private String doDefault(Exception e) {// 当出现指定异常时,执行该方法进行重试处理}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
该实现会在方法调用失败时进行最多3次的重试,每次重试之间会等待1秒的时间 。如果超过3次重试仍然失败,则会执行 doDefault 方法来进行重试处理 。在该方法中,我们可以自定义处理方式来处理异常情况 。
@Retry 仍旧是一个内存解决方案,在极端场景下可能出现任务丢失的情况 。因此在实际工作中,很少用于可重试性事务这种场景 。
3.3.2. MQMQ(消息队列)消费者重试机制是指在消费消息时,如果消费者无法成功消费消息(比如网络异常、服务器故障等原因),会自动重试一定次数或间隔一定时间后再次尝试消费消息,以保证消息的可靠性和可用性 。
如下图所示:
彻底搞透分布式一致性

文章插图
im
具有MQ的可重试性事务,需要以下保障:
  • 保障业务操作与消费发送之间的一致性:业务操作成功,消息必须发送成功;业务操作失败 , 消息不能发送;
  • 保障消息投递和消费消费之间的一致性:对于消费失败的消息,MQ 会自动进行重试,直至消费成功;
一般情况下会采用多次投递的方式来实现消息投递和消息消费之间的一致性,所以消息消费者需要保障幂等性,避免多次投递造成的业务问题 。
3.3.2.1. 半消息RocketMQ事务消息是一种支持分布式事务的消息模型,将消息生产和消费与业务逻辑绑定在一起,确保消息发送和事务执行的原子性,保证消息的可靠性 。
事务消息分为两个阶段:发送消息和确认消息,确认消息分为提交和回滚两个操作 。在提交操作执行完毕后,消息才会被消费端消费,而在回滚操作执行完毕后,消息会被删除,从而达到了事务的一致性和可靠性 。
事务消息的发生流程如下:
彻底搞透分布式一致性

文章插图
图片
  • 生产者发送prepare消息到RocketMQ服务端,RocketMQ将消息存储到本地并返回结果;
  • 生产者开始执行本地事务,并根据本地事务的结果将状态信息提交给RocketMQ服务端;
  • 如果本地事务执行成功,生产者向RocketMQ服务端发送commit消息;
  • 如果本地事务执行失败,生产者向RocketMQ服务端发送rollback消息;
  • RocketMQ接收到commit或rollback消息后,对消息进行投放或删除;
如果生成者发送 prepare 消息后,未在规定时间内发送 commit 或 rollback 消息,RocketMQ 将进入恢复流程,具体如下:
彻底搞透分布式一致性

文章插图
图片
  • 如果在回查的时间之前没有收到相应的 commit 或 rollback 消息,则 RocketMQ 会将对该 prepare 消息进行回查;
  • 应用程序接收到回查指令,从业务库中获取数据,并根据业务逻辑进行判断,最终是 commit 还是 rollback;
  • RocketMQ 接收到 commit 或 rollback 回复后,进行相应动作,从而实现业务操作和消息发送的一致性;
使用 RocketMQ 的事务消息代码示例如下:
// 编写事务监听器类public class TransactionListenerImpl implements TransactionListener {private AtomicInteger transactionIndex = new AtomicInteger(0);// 执行本地事务public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {int value = https://www.isolves.com/it/cxkf/jiagou/2023-11-06/transactionIndex.getAndIncrement();System.out.println("executeLocalTransaction " + value);// TODO 执行本地事务,并返回事务状态// 本例假定 index 为偶数的消息执行成功,奇数的消息执行失败if (value % 2 == 0) {return LocalTransactionState.COMMIT_MESSAGE;}return LocalTransactionState.ROLLBACK_MESSAGE;}// 检查本地事务状态public LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println("checkLocalTransaction " + msg.getTransactionId());// 模拟检查本地事务状态,返回事务状态boolean committed = prepare(true);if (committed) {return LocalTransactionState.COMMIT_MESSAGE;}return LocalTransactionState.UNKNOW;}// 模拟操作预处理逻辑private boolean prepare(boolean commit) {System.out.println("prepare " + (commit ? "commit" : "rollback"));return commit;}}// 编写发送消息的代码public class Producer {private static final String NAME_SERVER_ADDR = "localhost:9876";public static void main(String[] args) throws Exception {TransactionMQProducer producer = new TransactionMQProducer("MyGroup");producer.setNamesrvAddr(NAME_SERVER_ADDR);// 注册事务监听器producer.setTransactionListener(new TransactionListenerImpl());producer.start();// 发送事务消息String[] tags = {"TagA", "TagB", "TagC"};for (int i = 0; i


推荐阅读