【RocketMQ】消息过滤机制介绍及实践(TAG、SQL92、FilterServer)

编程入门 行业动态 更新时间:2024-10-09 05:12:59

【RocketMQ】消息过滤<a href=https://www.elefans.com/category/jswz/34/1771042.html style=机制介绍及实践(TAG、SQL92、FilterServer)"/>

【RocketMQ】消息过滤机制介绍及实践(TAG、SQL92、FilterServer)

文章目录

  • 表达式过滤
    • TAG
      • 使用
      • 原理
    • SQL92
      • 使用
      • 原理
  • 类过滤
    • 使用
    • 原理

RocketMQ支持 表达式过滤类过滤两种模式 。

表达式过滤

其中表达式模式分为 TAGSQL92表达式两种。

TAG

顾名思义,TAG 模式就是简单地为消息定义标签,根据消息的标签进行匹配。

使用

1、在消息发送时,我们可以为每一条消息设置一个TAG标签,消息消费者订阅自己感兴趣的TAG, 一般使用的场景是,对于同一类的功能(如:数据同步)创建一个主题,但对于该主题下的数据,可能不同的系统关心的数据不一样,基础数据各个系统都需要同步,设置标签为ALL,而订单数据只有订单下游子系统关心,其他系统并不关心,则设置标签为ORDER,库存子系统则关注库存相关的数据,设置标签为CAPCITY

2、消费者组订阅相同的主题不同的TAG,多个TAG 用“|”分隔,注意 :同一个消费组订阅的主题, TAG必须相同。

Producer代码示例:

@Slf4j
public class TestTagFilterProducer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("zhurunhua", false);producer.setNamesrvAddr("172.20.10.42:9976;172.20.10.43:9976");producer.start();long l = System.currentTimeMillis();try {Message msg = new Message("test_topic","ORDER","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("cost:%s-->%s%n", (System.currentTimeMillis() - l), sendResult);} catch (Exception e) {log.error("", e);}producer.shutdown();}
}

Consumer代码示例:

public class TestTagFilterConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-filter");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("test_topic", "ORDER");consumer.setNamesrvAddr("172.20.10.42:9976;172.20.10.43:9976");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();System.out.printf("Consumer Started.%n");}
}

原理

消息发送者在消息发送时如果设置了消息的tags属性,存储在消息属性中,先存储在CommitLog文件中,然后转发到消息消费队列ConsumeQueue,消息消费队列会用 8 个字节存储消息tag的 hashcode,之所以不直接存储tag字符串,是因为将ConumeQueue设计为定长结构,加快消息消费的加载性能。在Broker端拉取消息时,遍历ConsumeQueue,只对比消息tag的hashoode,如果匹配则返回,否则忽略该消息。Consumer在收到消息后,同样需要先对消息进行过滤,只是此时比较的是消息tag的值而不再是hashcode。

为什么过滤要这样做?

  • Message Tag存储Hashcode,是为了在ConsumeQueue定长方式存储,节约空间;

  • 过滤过程中不会访问CommitLog数据,可以保证堆积情况下也能高效过滤;

  • 即使存在Hash冲突,也可以在Consumer端进行修正,保证万无一失;

  • 优点是简单高效,缺点就是在Hash冲突时,并不是消费者订阅的消息,还会向消费者发送 。

流程图

SQL92

TAG模式一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算,从而过滤出客户端订阅的消息。

使用

1、只有使用push模式的消费者才能用使用SQL92标准的sql语句;

2、使用Filter功能,需要在启动配置文件当中配置以下选项:enablePropertyFilter=true,否则会报错:

3、基本语法:

RocketMQ 仅仅提供了一些基本的语法来支持此特性:

  1. 数值比较: >, >=, <, <=, BETWEEN, =;
  2. 字符比较: =, <>, IN;
  3. IS NULL or IS NOT NULL;
  4. 逻辑: AND, OR, NOT;

常量类型如下:

  1. 数字, 如: 123, 3.1415;
  2. 字符, 如: ‘abc’, 必须是单引号;
  3. NULL, 特殊常量;
  4. 布尔, TRUE or FALSE;

Producer示例:

@Slf4j
public class TestSQL92FilterProducer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("zhurunhua", false);producer.setNamesrvAddr("172.20.10.42:9976;172.20.10.43:9976");producer.start();long l = System.currentTimeMillis();try {for (int i = 1; i <= 10; i++) {Message msg = new Message("test_topic","Apple","iPhone","测试过滤消息:SQL92".getBytes(StandardCharsets.UTF_8));//设置属性(模拟10台不同序列号的苹果12手机)msg.putUserProperty("name", "IPhone");msg.putUserProperty("serial", "12");msg.putUserProperty("color", "blue");msg.putUserProperty("sequence", String.valueOf(i));SendResult sendResult = producer.send(msg);System.out.printf("cost:%s-->%s%n", (System.currentTimeMillis() - l), sendResult);}} catch (Exception e) {log.error("", e);}producer.shutdown();}
}

Consumer示例:

public class TestSQL92FilterConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-filter");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setNamesrvAddr("172.20.10.42:9976;172.20.10.43:9976");//过滤蓝色 序列号大于5的String sql = "color = 'blue' and sequence > 5";consumer.subscribe("test_topic", MessageSelector.bySql(sql));consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();System.out.printf("Consumer Started.%n");}
}

原理

功能设计图:

1、Broker通过心跳请求收集Consumer的SQL表达式,并保存在ConsumerFilterManager中;

2、当使用者拉取消息时,Broker将构造一个带有已编译好的表达式和订阅数据的MessageFilter(接口),用以在CommitLog中选择匹配的消息;

但是这样拉取消息时做过滤性能差,所以采用BloomFilter和预计算的方法进行优化:

1、在Broker理注册时,每个Consumer都被分配了BloomFilter的某个bit上;

2、当消息写入CommitLog后,Broker会构建ConsumeQueue,此时会计算消费者对应的过滤结果,所有的结果会保存到ConsumeQueueExt的bit数组中;

3、ConsumeQueueExt是链接到ConsumeQueue的存储文件,ConsumeQueue会根据tagsCode找到数据,tagsCode存的是ConsumeQueueExt生成的地址信息;

4、ExpressionMessageFilter使用bit数组检查消息是否匹配。由于BloomFilter的冲突,它还需要解码消息属性来计算匹配的消息;

消费流程图

类过滤

类过滤模式,其实就相当于是启动一个FilterServer做消息中转,FilterServer本身就是一个Consumer,在拉取到消息后,经过用户自定义的过滤器,返回给消费者过滤好的数据,这个模式在2019年之后的版本中已经去掉了,大概是因为太麻烦,且对性能影响大。

使用

1、部署FilterServer

一般部署在Broker所在的机器,减少网络延迟,一个Broker最好部署多个FilterServer,

启动脚本为{ROCKETMQ_HOME}/bin/startfsrv.sh,需要在{ROCKETMQ_HOME}/conf下新增filtersrv.properites文件:

#nameServer 地址 分号分割
namesrvAddr=127.0.0.1:9876 
connectWhichBroker=127.0.0.1:10911

之后执行启动脚本即可,启动成功日志:

load config properties file OK, d:/rocketmq/conf/filtersrv.properties 
The Filter Server boot success, 192.168.1.3:62832

2、编写自定义FIlter类:

public class IphoneColorFIlter implements MessageFilter {@Overridepublic boolean match(MessageExt messageExt, FilterContext filterContext) {String color = messageExt.getUserProperty("color");if ("blue".equals(color)) {return true;}return false;}
}

3、Consumer代码:

public class TestClassFilterConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-filter");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setNamesrvAddr("172.20.10.42:9976;172.20.10.43:9976");//加载过滤器文件ClassLoader classLoader = Thread.currentThread().getContextClassLoader();File classFile = new File(classLoader.getResource("IphoneColorFilter.java").getFile());String file2String = MixAll.file2String(classFile);consumer.subscribe("test_topic", file2String);consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();System.out.printf("Consumer Started.%n");}
}

原理

1、Broker所在的服务器会启动多个FilterServer进程;

2、消费者在订阅消息主题时会上传一个自定义的消息过滤实现类,FilterServer加载并实例化;

3、Consumer向FilterServer发送消息拉取请求,FilterServer接收到消费者消息拉取请求后,FilterServer将消息拉取请求转发给Broker, Broker返回消息后,在FilterServer端执行消息过滤逻辑,然后返回符合订阅信息的消息给消息消费者进行消费;

通常消息消费者是直接向Broker订阅主题然后从Broker上拉取消息,类过滤模式的一个特别之处在于消息消费者是从FilterServer拉取消息。

流程图

参考:《RocketMQ技术内幕》

更多推荐

【RocketMQ】消息过滤机制介绍及实践(TAG、SQL92、FilterServer)

本文发布于:2024-02-06 14:47:37,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1749935.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:机制   消息   RocketMQ   FilterServer   TAG

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!