- 主页 > 生活百科 > >
RocketMQ与SpringBoot整合进行生产级二次封装( 四 )
- 这个类是最基础的原始封装类,相当于餐馆提供的点餐服务 。上面提供无业务特性的发送,比如想要发送日志消息或者动态发送消息目的场景
3.2.3 增强RocketMQTemplate
- 以订单处理中心来说,变化点仅仅只是单号等业务数据不一样,发往订单处理中心的消息不管是topic还是tag等等其实完全都一样,那么此时可以根据业务来增加封装
- 增强原始功能需要注意下面两个点 所有父类能出现的地方,子类都能出现:也就是子类拥有功能 >= 父类 ,比如Java的List,只要入参是List的地方,传ArrayList和LinkedList完全可以 增强功能不能改变原始功能的行为:比如父类有一个方法say是说话,结果子类覆写了say改成了行为是吃饭,然后当调用者调用say的时候得到了一个完全预期外的结果
- 就以订单中心消息发送为例,封装OrderMessageTemplate继承自RocketMqTemplate,此时前者就拥有了封装父类的所有基础方法,拥有了所有父类的功能 。然后可以在前者增加自身业务特性的发送方法,比如发送订单处理消息
package com.codecoord.rocketmq.template;import com.codecoord.rocketmq.constant.RocketMqBizConstant;import com.codecoord.rocketmq.domain.RocketMqEntityMessage;import org.apache.rocketmq.client.producer.SendResult;import org.springframework.stereotype.Component;import javax.annotation.Resource;import javax.validation.constraints.NotNull;import java.time.LocalDate;import java.time.LocalDateTime;/** * 订单类发送消息模板工具类 * * @author tianxincode@163.com * @since 2022/6/16 */@Componentpublic class OrderMessageTemplate extends RocketMqTemplate {/// 如果不采用继承也可以直接注入使用/* @Resourceprivate RocketMqTemplate rocketMqTemplate; *//*** 入参只需要传入是哪个订单号和业务体消息即可,其他操作根据需要处理* 这样对于调用者而言,可以更加简化调用*/public SendResult sendOrderPaid(@NotNull String orderId, String body) {RocketMqEntityMessage message = new RocketMqEntityMessage();message.setKey(orderId);message.setSource("订单支付");message.setMessage(body);// 这两个字段只是为了测试message.setBirthday(LocalDate.now());message.setTradeTime(LocalDateTime.now());return send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.ORDER_PAID_TAG, message);}} - 此时对于调用者只需要 orderMessageTemplate.sendOrderPaid("O001", "xxx");就可以把消息发送到订单处理中心
- 封装后的好处,假如现在有10个订单来源,现在需要调整消息发送格式,如果不进行封装那么10个来源发送的地方都需要改;如果进行了二次封装,只需要改sendOrderPaid方法即可,而且还不会出错,此时优势就体现出来了
2.3 RocketMQListener封装
- RocketMQListener是消费消息的核心,同时也涉及到更多的操作,比如:基础日志记录、异常处理、消息重试、警告通知等等等
- 按照抽离变化点,RocketMQListener只应该处理与自身业务相关的,除此之外的其它应该交给父类,子类只需要告诉父类要不要异常处理、要不要重试等等,点餐式服务
- 封装消息消费的抽象类 注意泛型限定为标准基础消息类,这样能到消费者的一定有统一的标准类BaseMqMessage 下面简单封装示例
package com.codecoord.rocketmq.listener;import com.codecoord.rocketmq.constant.RocketMqSysConstant;import com.codecoord.rocketmq.domain.BaseMqMessage;import com.codecoord.rocketmq.template.RocketMqTemplate;import com.codecoord.rocketmq.util.JsonUtil;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.client.producer.SendStatus;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.slf4j.MDC;import javax.annotation.Resource;import java.time.Instant;import java.util.Objects;/** * 抽象消息监听器,封装了所有公共处理业务,如 * 1、基础日志记录 * 2、异常处理 * 3、消息重试 * 4、警告通知 * 5、.... * * @author tianxincoord@163.com * @since 2022/4/17 */public abstract class BaseMqMessageListener<T extends BaseMqMessage> {/*** 这里的日志记录器是哪个子类的就会被哪个子类的类进行初始化*/protected final Logger logger = LoggerFactory.getLogger(this.getClass());@Resourceprivate RocketMqTemplate rocketMqTemplate;/*** 消息者名称** @return 消费者名称*/protected abstract String consumerName();/*** 消息处理** @param message 待处理消息* @throws Exception 消费异常*/protected abstract void handleMessage(T message) throws Exception;/*** 超过重试次数消息,需要启用isRetry** @param message 待处理消息*/protected abstract void overMaxRetryTimesMessage(T message);/*** 是否过滤消息,例如某些** @param message 待处理消息* @return true: 本次消息被过滤,false:不过滤*/protected boolean isFilter(T message) {return false;}/*** 是否异常时重复发送** @return true: 消息重试,false:不重试*/protected abstract boolean isRetry();/*** 消费异常时是否抛出异常** @return true: 抛出异常,false:消费异常(如果没有开启重试则消息会被自动ack)*/protected abstract boolean isThrowException();/*** 最大重试次数** @return 最大重试次数,默认10次*/protected int maxRetryTimes() {return 10;}/*** isRetry开启时,重新入队延迟时间** @return -1:立即入队重试*/protected int retryDelayLevel() {return -1;}/*** 由父类来完成基础的日志和调配,下面的只是提供一个思路*/public void dispatchMessage(T message) {MDC.put(RocketMqSysConstant.TRACE_ID, message.getTraceId());// 基础日志记录被父类处理了logger.info("[{}]消费者收到消息[{}]", consumerName(), JsonUtil.toJson(message));if (isFilter(message)) {logger.info("消息不满足消费条件,已过滤");return;}// 超过最大重试次数时调用子类方法处理if (message.getRetryTimes() > maxRetryTimes()) {overMaxRetryTimesMessage(message);return;}try {long start = Instant.now().toEpochMilli();handleMessage(message);long end = Instant.now().toEpochMilli();logger.info("消息消费成功,耗时[{}ms]", (end - start));} catch (Exception e) {logger.error("消息消费异常", e);// 是捕获异常还是抛出,由子类决定if (isThrowException()) {throw new RuntimeException(e);}if (isRetry()) {// 获取子类RocketMQMessageListener注解拿到topic和tagRocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);if (Objects.nonNull(annotation)) {message.setSource(message.getSource() + "消息重试");message.setRetryTimes(message.getRetryTimes() + 1);SendResult sendResult;try {// 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)// 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息sendResult = rocketMqTemplate.send(annotation.topic(), annotation.selectorExpression(), message, retryDelayLevel());} catch (Exception ex) {throw new RuntimeException(ex);}// 发送失败的处理就是不进行ACK,由RocketMQ重试if (sendResult.getSendStatus() != SendStatus.SEND_OK) {throw new RuntimeException("重试消息发送失败");}}}}}}
推荐阅读
-
央视网|国家邮政局:上半年邮政行业业务收入超5000亿元
-
货车侧翻超8万罐啤酒散落地面 市民们的举动令货车司机寒心
-
-
杭州河道中捞起一名11岁女孩遗体,原本周日要回老家读书
-
-
[人民网]农业农村部:鼓励各地对蔬菜规模经营主体用工给予适当补
-
快资讯|这部电视剧播出33年还让人感动:钞票点烟成经典,张国主角荣变客串
-
-
美容|面霜你选对了吗?这几款保湿面霜,让肌肤嫩白透亮,紧致又光滑
-
电竞趣聊|一个比一个好看,有你喜欢的吗?,知名主播居然是女装大佬
-
美好,一直在身边■从云课堂谈到新冠肺炎疫情与科技发展的双向影响
-
-
醒醒爱读书|创始人却乐开花? 懂行人: 人家赚嗨了!,30万把共享雨伞有借无还!
-
-
-
旅行社|张家界上半年亏损六千万,旅行社服务收入同比下降88%
-
cnBeta[图]Windows 10 Build 20197发布:磁盘管理迁移到设置应用中
-
-
2013年俄罗斯红场胜利日阅兵,2015俄罗斯红场阅兵仪式-
-