基于本文回答

播面 播面

文图音视,全方位拆解八股文
0
评论

RocketMQ的消息过滤机制

知识点图片

详解RocketMQ Tag与SQL92过滤机制,剖析Broker端Hash筛选原理及不同模式的适用场景。

RocketMQ 的消息过滤机制允许消费者只订阅 Topic 中感兴趣的一部分消息,而不是全部消息。这不仅能简化消费端的业务逻辑,更能显著减少网络传输带宽和客户端的内存压力。

RocketMQ 的过滤机制主要分为两种:Tag 过滤(标签过滤)和 SQL92 过滤(属性过滤)。

以下是详细的机制解析:


1. 过滤发生的时机与位置

RocketMQ 的消息过滤主要发生在 Broker 端,但为了保证准确性,Consumer 端也会进行二次校验。

  • Broker 端过滤(核心): 减少无用消息的网络传输,降低 IO 开销。Broker 读取消息索引(ConsumeQueue)或消息体,判断是否符合条件,符合才推送给 Consumer。
  • Consumer 端过滤(兜底): 由于 Broker 端采用 Hash 过滤(Tag 模式下)可能存在哈希碰撞,消费者收到消息后会再次比对 Tag 字符串以确保准确。

2. 两种主要的过滤模式

A. Tag 过滤 (Tag Filtering)

这是最常用、最高效的过滤方式。

  • 原理: Producer 发送消息时设置一个 Tag。Consumer 订阅时指定订阅哪些 Tag。
  • 语法: 支持精确匹配和 || (OR) 运算。
    • 例如:TagA || TagB (消费 TagA 或 TagB 的消息)。
    • * 代表订阅所有 Tag。
  • 实现细节:
    • ConsumeQueue 文件结构中存储了 [CommitLog Offset, Size, TagsCode]
    • 这里的 TagsCode 是 Tag 字符串的 Hash Code (Java 的 hashCode())。
    • Broker 过滤: Broker 读取 ConsumeQueue 时,直接比较 Hash Code。如果不匹配,就不去 CommitLog 读完整消息,极大地节省了 IO。
    • 哈希冲突: 因为是 Hash 对比,不同的 Tag 可能会有相同的 Hash Code。所以 Broker 可能会传回少量不属于该 Consumer 的消息,Consumer 收到后会对比原始 Tag 字符串进行过滤。
  • 优缺点:
    • 优点: 性能极高(基于位运算和定长对比),实现简单。
    • 缺点: 逻辑简单,只支持“包含”关系,不支持复杂的 AND、范围查询等。

B. SQL92 过滤 (Property Filtering)

当 Tag 过滤无法满足需求(例如需要范围查询、多属性组合查询)时使用。

  • 原理: Producer 发送消息时通过 putUserProperty 设置自定义键值对属性。Consumer 使用 SQL92 语法筛选。
  • 语法: 支持标准的 SQL 语法。
    • 数值比较:>,>=, <, <=, BETWEEN, =.
    • 字符比较:=, <>, IN.
    • 逻辑运算:AND, OR, NOT.
    • 判空:IS NULL, IS NOT NULL.
    • 示例: a > 10 AND b = 'abc'
  • 实现细节:
    • 消息的 User Properties 存储在 CommitLog 的消息体中。
    • Broker 在过滤时,通常需要利用 Bloom Filter(布隆过滤器)来优化,或者在从 CommitLog 读出消息后解析属性进行计算。
  • 配置要求:
    • 需要在 broker.conf 中开启:enablePropertyFilter=true
  • 优缺点:
    • 优点: 非常灵活,支持复杂业务逻辑。
    • 缺点: 性能较 Tag 模式差。Broker 需要解析消息属性并进行表达式计算,CPU 开销大;如果未命中布隆过滤器,可能增加 IO 开销。

3. 详细架构原理(底层实现)

为了理解过滤机制,必须理解 RocketMQ 的存储结构:

  1. CommitLog: 存储完整的消息物理数据(包含 Body, Tag, Properties 等)。
  2. ConsumeQueue: 消息的逻辑队列(索引),每个条目 20 字节:
    • CommitLog Offset (8 byte)
    • Size (4 byte)
    • Message Tag HashCode (8 byte) —— 关键点

过滤流程图解:

  1. 订阅关系同步: Consumer 启动时,将订阅信息(Tag 哈希值或 SQL 语句)发送给 Broker,Broker 保存在 ConsumerFilterManager 中。
  2. 消息拉取请求: Consumer 向 Broker 发起 Pull 请求。
  3. Broker 检索 (ConsumeQueue):
    • Broker 读取 ConsumeQueue
    • Tag 模式: 拿到存储的 TagsCode,与订阅的 Hash 集合对比。如果 Hash 不匹配,直接跳过(不会去读 CommitLog)。
    • SQL 模式: 同样先利用 Bloom Filter (如果有) 或其他预判机制筛选。
  4. 读取消息 (CommitLog):
    • 如果索引匹配,Broker 根据 Offset 和 Size 去 CommitLog 读取真正的消息数据。
  5. 二次计算 (仅 SQL 模式):
    • 如果是 SQL 过滤,Broker 此时拥有了消息的完整属性,会执行 SQL 表达式解析和计算。如果不符合,丢弃该消息,继续找下一条。
  6. 发送消息: 将通过过滤的消息返回给 Consumer。
  7. Client 端校验:
    • Consumer 收到消息后,再次检查 Tag 字符串或 SQL 逻辑,防止 Broker 端的 Hash 碰撞导致的误判。

4. 代码示例

Tag 过滤示例

Producer:

java
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
producer.send(msg);

Consumer:

java
// 订阅 TagA 或者 TagB
consumer.subscribe("TopicTest", "TagA || TagB");

SQL92 过滤示例

Producer:

java
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
// 设置自定义属性
msg.putUserProperty("a", "10");
msg.putUserProperty("b", "abc");
producer.send(msg);

Consumer:

java
// 使用 MessageSelector 构造 SQL
consumer.subscribe("TopicTest", MessageSelector.bySql("a > 5 AND b = 'abc'"));

5. 总结与建议

特性 Tag 过滤 SQL92 过滤
性能 极高 (直接比对 Hash,无需读取完整消息) 中等 (需要解析属性,CPU消耗大)
灵活性 低 (仅支持
Broker 压力
使用场景 区分业务大类,简单过滤 复杂的多维度筛选,数据量不大的场景

最佳实践:

  1. 优先使用 Tag 过滤。它在设计上就是为了高性能而生的。
  2. 如果业务场景复杂,尽量将筛选逻辑上移到 Topic 拆分(不同的 Topic 代表完全不同的业务)。
  3. 只有在无法拆分 Topic 且 Tag 无法满足(如需要基于金额范围消费)时,才谨慎使用 SQL92 过滤。
00:00
00:00