- 主页 > 生活百科 > >
RocketMQ与SpringBoot整合进行生产级二次封装( 四 )
- 这个类是最基础的原始封装类,相当于餐馆提供的点餐服务 。上面提供无业务特性的发送,比如想要发送日志消息或者动态发送消息目的场景
3.2.3 增强RocketMQTemplate
- 以订单处理中心来说,变化点仅仅只是单号等业务数据不一样,发往订单处理中心的消息不管是topic还是tag等等其实完全都一样,那么此时可以根据业务来增加封装
- 增强原始功能需要注意下面两个点 所有父类能出现的地方,子类都能出现:也就是子类拥有功能 >= 父类 ,比如Java的List,只要入参是List的地方,传ArrayList和LinkedList完全可以 增强功能不能改变原始功能的行为:比如父类有一个方法say是说话,结果子类覆写了say改成了行为是吃饭,然后当调用者调用say的时候得到了一个完全预期外的结果
- 就以订单中心消息发送为例,封装OrderMessageTemplate继承自RocketMqTemplate,此时前者就拥有了封装父类的所有基础方法,拥有了所有父类的功能 。然后可以在前者增加自身业务特性的发送方法,比如发送订单处理消息
package com.codecoord.rocketmq.template;import com.codecoord.rocketmq.constant.RocketMqBizConstant;import com.codecoord.rocketmq.domain.RocketMqEntityMessage;import org.apache.rocketmq.client.producer.SendResult;import org.springframework.stereotype.Component;import javax.annotation.Resource;import javax.validation.constraints.NotNull;import java.time.LocalDate;import java.time.LocalDateTime;/** * 订单类发送消息模板工具类 * * @author tianxincode@163.com * @since 2022/6/16 */@Componentpublic class OrderMessageTemplate extends RocketMqTemplate {/// 如果不采用继承也可以直接注入使用/* @Resourceprivate RocketMqTemplate rocketMqTemplate; *//*** 入参只需要传入是哪个订单号和业务体消息即可,其他操作根据需要处理* 这样对于调用者而言,可以更加简化调用*/public SendResult sendOrderPaid(@NotNull String orderId, String body) {RocketMqEntityMessage message = new RocketMqEntityMessage();message.setKey(orderId);message.setSource("订单支付");message.setMessage(body);// 这两个字段只是为了测试message.setBirthday(LocalDate.now());message.setTradeTime(LocalDateTime.now());return send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.ORDER_PAID_TAG, message);}} - 此时对于调用者只需要 orderMessageTemplate.sendOrderPaid("O001", "xxx");就可以把消息发送到订单处理中心
- 封装后的好处,假如现在有10个订单来源,现在需要调整消息发送格式,如果不进行封装那么10个来源发送的地方都需要改;如果进行了二次封装,只需要改sendOrderPaid方法即可,而且还不会出错,此时优势就体现出来了
2.3 RocketMQListener封装
- RocketMQListener是消费消息的核心,同时也涉及到更多的操作,比如:基础日志记录、异常处理、消息重试、警告通知等等等
- 按照抽离变化点,RocketMQListener只应该处理与自身业务相关的,除此之外的其它应该交给父类,子类只需要告诉父类要不要异常处理、要不要重试等等,点餐式服务
- 封装消息消费的抽象类 注意泛型限定为标准基础消息类,这样能到消费者的一定有统一的标准类BaseMqMessage 下面简单封装示例
package com.codecoord.rocketmq.listener;import com.codecoord.rocketmq.constant.RocketMqSysConstant;import com.codecoord.rocketmq.domain.BaseMqMessage;import com.codecoord.rocketmq.template.RocketMqTemplate;import com.codecoord.rocketmq.util.JsonUtil;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.client.producer.SendStatus;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.slf4j.MDC;import javax.annotation.Resource;import java.time.Instant;import java.util.Objects;/** * 抽象消息监听器,封装了所有公共处理业务,如 * 1、基础日志记录 * 2、异常处理 * 3、消息重试 * 4、警告通知 * 5、.... * * @author tianxincoord@163.com * @since 2022/4/17 */public abstract class BaseMqMessageListener<T extends BaseMqMessage> {/*** 这里的日志记录器是哪个子类的就会被哪个子类的类进行初始化*/protected final Logger logger = LoggerFactory.getLogger(this.getClass());@Resourceprivate RocketMqTemplate rocketMqTemplate;/*** 消息者名称** @return 消费者名称*/protected abstract String consumerName();/*** 消息处理** @param message 待处理消息* @throws Exception 消费异常*/protected abstract void handleMessage(T message) throws Exception;/*** 超过重试次数消息,需要启用isRetry** @param message 待处理消息*/protected abstract void overMaxRetryTimesMessage(T message);/*** 是否过滤消息,例如某些** @param message 待处理消息* @return true: 本次消息被过滤,false:不过滤*/protected boolean isFilter(T message) {return false;}/*** 是否异常时重复发送** @return true: 消息重试,false:不重试*/protected abstract boolean isRetry();/*** 消费异常时是否抛出异常** @return true: 抛出异常,false:消费异常(如果没有开启重试则消息会被自动ack)*/protected abstract boolean isThrowException();/*** 最大重试次数** @return 最大重试次数,默认10次*/protected int maxRetryTimes() {return 10;}/*** isRetry开启时,重新入队延迟时间** @return -1:立即入队重试*/protected int retryDelayLevel() {return -1;}/*** 由父类来完成基础的日志和调配,下面的只是提供一个思路*/public void dispatchMessage(T message) {MDC.put(RocketMqSysConstant.TRACE_ID, message.getTraceId());// 基础日志记录被父类处理了logger.info("[{}]消费者收到消息[{}]", consumerName(), JsonUtil.toJson(message));if (isFilter(message)) {logger.info("消息不满足消费条件,已过滤");return;}// 超过最大重试次数时调用子类方法处理if (message.getRetryTimes() > maxRetryTimes()) {overMaxRetryTimesMessage(message);return;}try {long start = Instant.now().toEpochMilli();handleMessage(message);long end = Instant.now().toEpochMilli();logger.info("消息消费成功,耗时[{}ms]", (end - start));} catch (Exception e) {logger.error("消息消费异常", e);// 是捕获异常还是抛出,由子类决定if (isThrowException()) {throw new RuntimeException(e);}if (isRetry()) {// 获取子类RocketMQMessageListener注解拿到topic和tagRocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);if (Objects.nonNull(annotation)) {message.setSource(message.getSource() + "消息重试");message.setRetryTimes(message.getRetryTimes() + 1);SendResult sendResult;try {// 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)// 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息sendResult = rocketMqTemplate.send(annotation.topic(), annotation.selectorExpression(), message, retryDelayLevel());} catch (Exception ex) {throw new RuntimeException(ex);}// 发送失败的处理就是不进行ACK,由RocketMQ重试if (sendResult.getSendStatus() != SendStatus.SEND_OK) {throw new RuntimeException("重试消息发送失败");}}}}}}
推荐阅读
-
-
过往年少|安兔兔公布6月安卓手机性能榜!骁龙865霸榜高端天玑820中端最强
-
中新经纬|银保监会:前8个月银行业新增人民币贷款投放14.4万亿
-
蒋劲夫近况曝光,“白天收破烂,晚上送外卖”,31岁活得狼狈不堪
-
考古|古墓中出土的五样东西最好不要碰,前三件太邪性!收藏需谨慎
-
唐诗|唐朝最能喝酒的诗人,不是李白,而是王勃爷爷的弟弟,唐诗的先驱
-
海外资金|行情震荡!深受外资青睐的35只大蓝筹(名单)一览,均是龙头
-
「暗物质」反物质、暗物质、暗能量之间是什么关系?来带你重新认识这三者
-
-
-
-
[针织开衫]寒露来袭,这“针织开衫”配半身裙不要太美,上身超有质感
-
三星堆究竟隐藏着什么秘密? 或许考古学家已经接近真相了
-
海外网|菅义伟计划对厚生劳动省进行改革 创设"数码厅"
-
「解救不开心哦」都说像她老公,闺蜜很郁闷,我说那不是更好,闺蜜生了一个儿子
-
硕士 澳洲名校悉尼大学研究生课程设置及申请要求解析
-
搞笑的笑话|家有外甥女七岁,正是淘气的年纪,经典爆笑闺女糗事笑话
-
土豆■几款常见家常菜,烹饪方法全部在此,学会不用叫外卖了!
-
小可讲事|李世民为什么不着急?生死簿中早已说明一切,唐僧取经14年不见人
-
吉林教育电视台|2020年7月四六级成绩28号可以查询!查分方式有了新变化!