RocketMQ的消息过滤机制
详解RocketMQ Tag与SQL92过滤机制,剖析Broker端Hash筛选原理及不同模式的适用场景。
RocketMQ 的消息过滤机制允许消费者只订阅 Topic 中感兴趣的一部分消息,而不是全部消息。这不仅能简化消费端的业务逻辑,更能显著减少网络传输带宽和客户端的内存压力。
RocketMQ 的过滤机制主要分为两种:Tag 过滤(标签过滤)和 SQL92 过滤(属性过滤)。
以下是详细的机制解析:
1. 过滤发生的时机与位置
RocketMQ 的消息过滤主要发生在 Broker 端,但为了保证准确性,Consumer 端也会进行二次校验。
- Broker 端过滤(核心): 减少无用消息的网络传输,降低 IO 开销。Broker 读取消息索引(ConsumeQueue)或消息体,判断是否符合条件,符合才推送给 Consumer。
- Consumer 端过滤(兜底): 由于 Broker 端采用 Hash 过滤(Tag 模式下)可能存在哈希碰撞,消费者收到消息后会再次比对 Tag 字符串以确保准确。
2. 两种主要的过滤模式
A. Tag 过滤 (Tag Filtering)
这是最常用、最高效的过滤方式。
- 原理: Producer 发送消息时设置一个 Tag。Consumer 订阅时指定订阅哪些 Tag。
- 语法: 支持精确匹配和
||(OR) 运算。- 例如:
TagA || TagB(消费 TagA 或 TagB 的消息)。 *代表订阅所有 Tag。
- 例如:
- 实现细节:
ConsumeQueue文件结构中存储了[CommitLog Offset, Size, TagsCode]。- 这里的
TagsCode是 Tag 字符串的 Hash Code (Java 的hashCode())。 - Broker 过滤: Broker 读取 ConsumeQueue 时,直接比较 Hash Code。如果不匹配,就不去 CommitLog 读完整消息,极大地节省了 IO。
- 哈希冲突: 因为是 Hash 对比,不同的 Tag 可能会有相同的 Hash Code。所以 Broker 可能会传回少量不属于该 Consumer 的消息,Consumer 收到后会对比原始 Tag 字符串进行过滤。
- 优缺点:
- ✅ 优点: 性能极高(基于位运算和定长对比),实现简单。
- ❌ 缺点: 逻辑简单,只支持“包含”关系,不支持复杂的 AND、范围查询等。
B. SQL92 过滤 (Property Filtering)
当 Tag 过滤无法满足需求(例如需要范围查询、多属性组合查询)时使用。
- 原理: Producer 发送消息时通过
putUserProperty设置自定义键值对属性。Consumer 使用 SQL92 语法筛选。 - 语法: 支持标准的 SQL 语法。
- 数值比较:
>,>=,<,<=,BETWEEN,=. - 字符比较:
=,<>,IN. - 逻辑运算:
AND,OR,NOT. - 判空:
IS NULL,IS NOT NULL. - 示例:
a > 10 AND b = 'abc'
- 数值比较:
- 实现细节:
- 消息的 User Properties 存储在 CommitLog 的消息体中。
- Broker 在过滤时,通常需要利用 Bloom Filter(布隆过滤器)来优化,或者在从 CommitLog 读出消息后解析属性进行计算。
- 配置要求:
- 需要在
broker.conf中开启:enablePropertyFilter=true。
- 需要在
- 优缺点:
- ✅ 优点: 非常灵活,支持复杂业务逻辑。
- ❌ 缺点: 性能较 Tag 模式差。Broker 需要解析消息属性并进行表达式计算,CPU 开销大;如果未命中布隆过滤器,可能增加 IO 开销。
3. 详细架构原理(底层实现)
为了理解过滤机制,必须理解 RocketMQ 的存储结构:
- CommitLog: 存储完整的消息物理数据(包含 Body, Tag, Properties 等)。
- ConsumeQueue: 消息的逻辑队列(索引),每个条目 20 字节:
CommitLog Offset(8 byte)Size(4 byte)Message Tag HashCode(8 byte) —— 关键点
过滤流程图解:
- 订阅关系同步: Consumer 启动时,将订阅信息(Tag 哈希值或 SQL 语句)发送给 Broker,Broker 保存在
ConsumerFilterManager中。 - 消息拉取请求: Consumer 向 Broker 发起 Pull 请求。
- Broker 检索 (ConsumeQueue):
- Broker 读取
ConsumeQueue。 - Tag 模式: 拿到存储的
TagsCode,与订阅的 Hash 集合对比。如果 Hash 不匹配,直接跳过(不会去读 CommitLog)。 - SQL 模式: 同样先利用 Bloom Filter (如果有) 或其他预判机制筛选。
- Broker 读取
- 读取消息 (CommitLog):
- 如果索引匹配,Broker 根据 Offset 和 Size 去
CommitLog读取真正的消息数据。
- 如果索引匹配,Broker 根据 Offset 和 Size 去
- 二次计算 (仅 SQL 模式):
- 如果是 SQL 过滤,Broker 此时拥有了消息的完整属性,会执行 SQL 表达式解析和计算。如果不符合,丢弃该消息,继续找下一条。
- 发送消息: 将通过过滤的消息返回给 Consumer。
- Client 端校验:
- Consumer 收到消息后,再次检查 Tag 字符串或 SQL 逻辑,防止 Broker 端的 Hash 碰撞导致的误判。
4. 代码示例
Tag 过滤示例
Producer:
java
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
producer.send(msg);
Consumer:
java
// 订阅 TagA 或者 TagB
consumer.subscribe("TopicTest", "TagA || TagB");
SQL92 过滤示例
Producer:
java
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
// 设置自定义属性
msg.putUserProperty("a", "10");
msg.putUserProperty("b", "abc");
producer.send(msg);
Consumer:
java
// 使用 MessageSelector 构造 SQL
consumer.subscribe("TopicTest", MessageSelector.bySql("a > 5 AND b = 'abc'"));
5. 总结与建议
| 特性 | Tag 过滤 | SQL92 过滤 |
|---|---|---|
| 性能 | 极高 (直接比对 Hash,无需读取完整消息) | 中等 (需要解析属性,CPU消耗大) |
| 灵活性 | 低 (仅支持 | |
| Broker 压力 | 小 | 大 |
| 使用场景 | 区分业务大类,简单过滤 | 复杂的多维度筛选,数据量不大的场景 |
最佳实践:
- 优先使用 Tag 过滤。它在设计上就是为了高性能而生的。
- 如果业务场景复杂,尽量将筛选逻辑上移到 Topic 拆分(不同的 Topic 代表完全不同的业务)。
- 只有在无法拆分 Topic 且 Tag 无法满足(如需要基于金额范围消费)时,才谨慎使用 SQL92 过滤。