public static ConsumerFilterData build(// topicfinal String topic,// 消费组final String consumerGroup,// 过滤表达式final String expression,// 过滤类型final String type,// 客户端版本final long clientVersion) {if (ExpressionType.isTagType(type)) {return null;ConsumerFilterData consumerFilterData = https://www.isolves.com/it/rj/jy/2022-11-27/new ConsumerFilterData();consumerFilterData.setTopic(topic);consumerFilterData.setConsumerGroup(consumerGroup);consumerFilterData.setBornTime(System.currentTimeMillis());consumerFilterData.setDeadTime(0);consumerFilterData.setExpression(expression);consumerFilterData.setExpressionType(type);consumerFilterData.setClientVersion(clientVersion);try {consumerFilterData.setCompiledExpression(FilterFactory.INSTANCE.get(type).compile(expression)} catch (Throwable e) {log.error("parse error: expr={}, topic={}, group={}, error={}", expression, topic, consumerGroup, e.getMessage());return null;return consumerFilterData;
重新激活
这里重新激活就是将死亡时间设置为0,判断是否死亡就是死亡时间deadTime是否大于出生时间bornTime;
protected void reAlive(ConsumerFilterData filterData) {long oldDeadTime = filterData.getDeadTime();filterData.setDeadTime(0);log.info("Re alive consumer filter: {}, oldDeadTime: {}", filterData, oldDeadTime);
4、根据消费组取消注册public void unRegister(final String consumerGroup) {for (Entry entry : filterDataByTopic.entrySet()) {entry.getValue().unRegister(consumerGroup);
org.Apache.rocketmq.broker.filter.ConsumerFilterManager.FilterDataMapByTopic#unRegister
public void unRegister(String consumerGroup) {if (!this.groupFilterData.containsKey(consumerGroup)) {return;// 获取消费组对应的过滤数据ConsumerFilterData data = https://www.isolves.com/it/rj/jy/2022-11-27/this.groupFilterData.get(consumerGroup);// 如果为空,或者已经死亡或者说已不可用了则直接返回if (data == null || data.isDead()) {return;long now = System.currentTimeMillis();log.info("Unregister consumer filter: {}, deadTime: {}", data, now);// 设置数据死亡时间data.setDeadTime(now);
5、判断数据是否死亡
org.apache.rocketmq.broker.filter.ConsumerFilterData#isDead
public boolean isDead() {return this.deadTime >= this.bornTime;
四、过滤原理
客户端向 Broker 端拉取消息时,Broker 从 commitlog、consumequeue 文件中拿到数据,接着会进行过滤,判断是否满足指定的条件;所以,过滤的工作是在于 DefaultMessageStore 对象中的 getMessage 方法,该方法入参中有这样的对象 MessageFilter;ExpressionMessageFilter实现其接口;
我们先看一下其接口的暴露的两个方法:
public interface MessageFilter {boolean isMatchedByConsumeQueue(final Long tagsCode,final ConsumeQueueExt.CqExtUnit cqExtUnit);boolean isMatchedByCommitLog(final ByteBuffer msgBuffer,final Map properties);
从上面的两个方法中,不难猜测出其是对 consumequeue 以及 commitlog 进行过滤;
1、TAG 过滤
tag 过滤只针对 consumequeue 的,所以在 MessageFilter 接口的 isMatchedByCommitLog 是默认返回 true;
2、SQL92
在 isMatchedByConsumeQueue 方法中,并没有 SQL92 进行过滤,而是用 BloomFilter 进行过滤,可以理解为 BloomFilter 是 SQL92 的缓存过滤器 。先通过 consumequeue 先过滤不符合的消息,然后在 isMatchedByCommitLog 严格过滤;
然而要使用这个布隆过滤器,需要打开相关的 RocketMQ 配置项才可以生效:
- enableConsumeQueueExt 是否启用 ConsumeQueue 拓展属性,默认为 false,这样子的话 isMatchedByConsumeQueue 方法,永远都会返回 true;
- enableCalcFilterBitMap 需要设置为 true,否则永远都不会被命中;因为设置为 true 时,CommitLogDispatcherCalcBitMap 才会去设置 ConsumerFilterData 对象中的 bloomFilterData 数组中的对应的位置为 1;如果不设置为 false,则 bloomFilterData 永远都是为 0;
需要设置 enableConsumeQueueExt 为 true 开启拓展属性,这样子才能使用 BloomFilter 进行过滤;
话不多说,上代码,我们重点看 isMatchedByConsumeQueue 方法的 tag 模式过滤;
public class ExpressionMessageFilter implements MessageFilter {protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);protected final SubscriptionData subscriptionData;protected final ConsumerFilterData consumerFilterData;protected final ConsumerFilterManager consumerFilterManager;protected final boolean bloomDataValid;public ExpressionMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData,ConsumerFilterManager consumerFilterManager) {this.subscriptionData = https://www.isolves.com/it/rj/jy/2022-11-27/subscriptionData;this.consumerFilterData = consumerFilterData;this.consumerFilterManager = consumerFilterManager;if (consumerFilterData == null) {bloomDataValid = false;return;BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();if (bloomFilter != null && bloomFilter.isValid(consumerFilterData.getBloomFilterData())) {bloomDataValid = true;} else {bloomDataValid = false;@Overridepublic boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {if (null == subscriptionData) {return true;if (subscriptionData.isClassFilterMode()) {return true;// by tags code.if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {// 如果consumequeue中没有tag,则返回true 。能消费该消息if (tagsCode == null) {return true;// 如果订阅组中的subString等于*,则说明订阅组是不需要过滤的,返回true,能消费该消息;if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {return true;// 如果consumequeue中的tag在订阅组的codeSet中,则说明订阅组是能消费该消息的,返回true;否则返回false;return subscriptionData.getCodeSet().contains(tagsCode.intValue());} else {// no expression or no bloomif (consumerFilterData == null || consumerFilterData.getExpression() == null|| consumerFilterData.getCompiledExpression() == null || consumerFilterData.getBloomFilterData() == null) {return true;// message is before consumerif (cqExtUnit == null || !consumerFilterData.isMsgInLive(cqExtUnit.getMsgStoreTime())) {log.debug("Pull matched because not in live: {}, {}", consumerFilterData, cqExtUnit);return true;byte[] filterBitMap = cqExtUnit.getFilterBitMap();BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();if (filterBitMap == null || !this.bloomDataValid|| filterBitMap.length * Byte.SIZE != consumerFilterData.getBloomFilterData().getBitNum()) {return true;BitsArray bitsArray = null;try {bitsArray = BitsArray.create(filterBitMap);boolean ret = bloomFilter.isHit(consumerFilterData.getBloomFilterData(), bitsArray);log.debug("Pull {} by bit map:{}, {}, {}", ret, consumerFilterData, bitsArray, cqExtUnit);return ret;} catch (Throwable e) {log.error("bloom filter error, sub=" + subscriptionData+ ", filter=" + consumerFilterData + ", bitMap=" + bitsArray, e);return true;@Overridepublic boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map properties) {if (subscriptionData == null) {return true;if (subscriptionData.isClassFilterMode()) {return true;if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {return true;ConsumerFilterData realFilterData = this.consumerFilterData;Map tempProperties = properties;// no expressionif (realFilterData == null || realFilterData.getExpression() == null|| realFilterData.getCompiledExpression() == null) {return true;if (tempProperties == null && msgBuffer != null) {tempProperties = MessageDecoder.decodeProperties(msgBuffer);Object ret = null;try {MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);ret = realFilterData.getCompiledExpression().evaluate(context);} catch (Throwable e) {log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);if (ret == null || !(ret instanceof Boolean)) {return false;return (Boolean) ret;
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 7张图大总结:SQL 数据分析常用语句
- 道恩·强森|外媒分析《黑亚当》票房扑街的原因:巨石强森不是DC需要的救世主
- 为什么要做专利分析?专利分析的价值有哪些?
- 招聘|34岁0基础想转行数据分析师晚吗?
- 商品房买卖合同常见纠纷问题分析 房屋买卖合同纠纷
- 店霸 电霸:电商大数据分析软件好用吗?拼多多数据准确吗?
- 华东五校实力优势分析 华东五校
- 9高一物理期中考试总结分析 高一期中考试总结
- 层次分析法(什么是层次分析法?)
- |学会用数据分析汇报工作,升职加薪指日可待