RocketMQ源码分析之过滤器ExpressionMessageFilter( 二 )


// 一个topic的各个消费组的过滤数据public static class FilterDataMapByTopic {// 一个topic,针对他订阅的各个消费组的过滤数据映射关系private ConcurrentMapgroupFilterData = https://www.isolves.com/it/rj/jy/2022-11-27/new ConcurrentHashMap();private String topic;public class ConsumerFilterData {private String consumerGroup;private String topic;private String expression;private String expressionType;private transient Expression compiledExpression;private long bornTime;private long deadTime = 0;private BloomFilterData bloomFilterData;private long clientVersion;3、消费组批量注册过滤数据对象
场景:当一个消费组里的一个消费者客户端跟我的broker之间建立了连接了以后,注册消费者之后就需要批量注册布隆过滤器,会通过DefaultConsumerIdsChangeListener监听器的handle方法调用ConsumerFilterManager的register方法完成批量注册,之前在分析ConsumerManager消费者管理组件有分析过的;
// 注册消费组里的最新的一波订阅和过滤public void register(final String consumerGroup,final Collection subList) {// 对订阅数据进行遍历for (SubscriptionData subscriptionData : subList) {// 注册register(subscriptionData.getTopic(),consumerGroup,subscriptionData.getSubString(),subscriptionData.getexpressionType(),subscriptionData.getSubVersion()// make illegal topic dead.// 对我的一个消费组拿到我对各个topic的过滤数据Collection groupFilterData = https://www.isolves.com/it/rj/jy/2022-11-27/getByGroup(consumerGroup);Iterator iterator = groupFilterData.iterator();while (iterator.hasNext()) {ConsumerFilterData filterData = iterator.next();boolean exist = false;for (SubscriptionData subscriptionData : subList) {// 判断是否存在if (subscriptionData.getTopic().equals(filterData.getTopic())) {exist = true;break;if (!exist && !filterData.isDead()) {filterData.setDeadTime(System.currentTimeMillis());log.info("Consumer filter changed: {}, make illegal topic dead:{}", consumerGroup, filterData);public boolean register(// topicfinal String topic,// 消费组final String consumerGroup,// 过滤表达式final String expression,// 过滤类型final String type,// 客户端版本号final long clientVersion) {if (ExpressionType.isTagType(type)) {return false;if (expression == null || expression.length() == 0) {return false;// 根据topic获取topic的各个消费组的过滤数据FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic);if (filterDataMapByTopic == null) {FilterDataMapByTopic temp = new FilterDataMapByTopic(topic);FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp);filterDataMapByTopic = prev != null ? prev : temp;// 生成了一个消费组对一个topic的布隆过滤器BloomFilterData bloomFilterData = https://www.isolves.com/it/rj/jy/2022-11-27/bloomFilter.generate(consumerGroup + "#" + topic);return filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion);
再调用FilterDataMapByTopic的register方法
public boolean register(// 消费组String consumerGroup,// 过滤表达式String expression,// 过滤类型String type,// 布隆过滤器数据BloomFilterData bloomFilterData,// 客户端版本long clientVersion) {// 获取到这个消费组的过滤器数据ConsumerFilterData old = this.groupFilterData.get(consumerGroup);if (old == null) {ConsumerFilterData consumerFilterData = https://www.isolves.com/it/rj/jy/2022-11-27/build(topic, consumerGroup, expression, type, clientVersion);if (consumerFilterData == null) {return false;// 给这个过滤数据里设置进去一个布隆过滤器consumerFilterData.setBloomFilterData(bloomFilterData);old = this.groupFilterData.putIfAbsent(consumerGroup, consumerFilterData);if (old == null) {log.info("New consumer filter registered: {}", consumerFilterData);return true;} else {// 如果存在则比较一下版本是否一致if (clientVersion <= old.getClientVersion()) {if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {log.warn("Ignore consumer({} : {}) filter(concurrent), because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",consumerGroup, topic,clientVersion, old.getClientVersion(),old.getExpressionType(), old.getExpression(),type, expression);// 如果数据已存在,并且数据状态为死亡,则重新激活if (clientVersion == old.getClientVersion() && old.isDead()) {// 重新激活reAlive(old);return true;return false;} else {this.groupFilterData.put(consumerGroup, consumerFilterData);log.info("New consumer filter registered(concurrent): {}, old: {}", consumerFilterData, old);return true;} else {if (clientVersion <= old.getClientVersion()) {if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {log.info("Ignore consumer({}:{}) filter, because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",consumerGroup, topic,clientVersion, old.getClientVersion(),old.getExpressionType(), old.getExpression(),type, expression);if (clientVersion == old.getClientVersion() && old.isDead()) {reAlive(old);return true;return false;boolean change = !old.getExpression().equals(expression) || !old.getExpressionType().equals(type);if (old.getBloomFilterData() == null && bloomFilterData != null) {change = true;if (old.getBloomFilterData() != null && !old.getBloomFilterData().equals(bloomFilterData)) {change = true;// if subscribe data is changed, or consumer is died too long.if (change) {ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);if (consumerFilterData == null) {// new expression compile error, remove old, let client report error.this.groupFilterData.remove(consumerGroup);return false;consumerFilterData.setBloomFilterData(bloomFilterData);this.groupFilterData.put(consumerGroup, consumerFilterData);log.info("Consumer filter info change, old: {}, new: {}, change: {}",old, consumerFilterData, change);return true;} else {old.setClientVersion(clientVersion);if (old.isDead()) {reAlive(old);return true;


推荐阅读