RocketMQ 4.x 是如何实现延迟消息的?
在 RocketMQ 4.x 中,延迟消息(定时消息)的实现核心思想是:“偷梁换柱” + 后台定时任务轮询。
需要特别注意的是,RocketMQ 4.x 不支持任意精度的延迟消息,而是只支持 18个固定的延迟级别(Delay Level)。默认级别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。
下面详细拆解其内部实现原理和流转过程:
1. 核心原理概述
当 Producer 发送一条带有 delayLevel 的消息时,Broker 并不会把它直接存入目标 Topic,而是将其临时存放到一个系统内部的特殊 Topic(SCHEDULE_TOPIC_XXXX)中。Broker 内部有一个定时任务服务(ScheduleMessageService),会不断地扫描这个特殊 Topic 中的消息,一旦发现消息到期了,就会把消息提取出来,恢复成原来的 Topic 和 Queue,再次存入 Broker,此时 Consumer 就能消费到了。
2. 详细运转流程
整个延迟消息的生命周期可以分为以下四个步骤:
第一步:Producer 发送消息
Producer 在发送消息时,通过 message.setDelayTimeLevel(level) 设置延迟级别。
- 如果
level == 0,代表非延迟消息。 - 如果
level > 0,代表延迟消息。
第二步:Broker 拦截并“偷梁换柱”(写入阶段)
当 Broker 接收到消息,在写入 CommitLog 之前,会进行如下判断和篡改:
- 判断级别:检查消息的
delayTimeLevel是否大于 0。 - 备份原信息:将消息原本真实的
Topic和QueueId存入消息的属性(properties)中暂存起来。 - 偷梁换柱:
- 将消息的
Topic强制修改为系统内部 Topic:SCHEDULE_TOPIC_XXXX。 - 将消息的
QueueId修改为delayLevel - 1(因为 QueueId 是从 0 开始的,这就意味着每个延迟级别对应一个独立的 Queue)。
- 将消息的
- 落盘:将篡改后的消息写入
CommitLog,随后异步构建出SCHEDULE_TOPIC_XXXX的ConsumeQueue。
巧妙设计(性能优化):在构建
SCHEDULE_TOPIC_XXXX的ConsumeQueue时,RocketMQ 会把这条消息的 预期投递时间戳(DeliverTimestamp) 存入 ConsumeQueue 索引条目的tagsCode字段中(正常消息这里存的是 Tag 的 Hash 值)。
这样做的好处是:后续定时任务扫描时,不需要去读物理文件 CommitLog,只需扫轻量级的 ConsumeQueue 索引,对比tagsCode和当前时间即可知道是否到期,极大提升了性能。
第三步:ScheduleMessageService 定时调度(等待阶段)
Broker 启动时,会启动一个后台服务 ScheduleMessageService。
- 它会为 每一个延迟级别(即每一个 Queue)启动一个独立的定时 TimerTask(
DeliverDelayedMessageTimerTask)。 - 这些 Task 会不断地去拉取对应 Queue 的
ConsumeQueue索引。 - 读取
tagsCode(即预期投递时间戳)与系统当前时间比对:- 如果 未到期:计算差值,让当前线程
sleep差值时间后再次被唤醒。 - 如果 已到期:根据 offset 去
CommitLog中把完整的消息主体拉取出来,进入第四步。
- 如果 未到期:计算差值,让当前线程
第四步:消息恢复与重新投递(到期阶段)
当定时任务发现消息到期后:
- 清除延迟属性:将消息体内的
delayTimeLevel清除(防止再次被当成延迟消息)。 - 恢复原貌:从消息的
properties中取出第一步备份的、真实的Topic和QueueId,重新设置回消息体中。 - 再次落盘:将恢复后的消息重新当作一条普通的新消息,再次写入
CommitLog。 - 消费:此时,消息会被正常分发到真实 Topic 的
ConsumeQueue中,Consumer 就可以像消费普通消息一样消费它了。
3. 设计的优点与局限性(面试常考)
优点(为什么 4.x 要这么设计?)
- 复用现有存储架构:完全复用了 RocketMQ 原有的
CommitLog+ConsumeQueue机制,没有引入额外的存储引擎。 - 性能极高:相同延迟级别的消息被写入同一个 Queue,保证了严格的按时间先后顺序排列。定时任务只需要顺序扫描即可,不需要对消息进行复杂的排序操作(避免了 O(NlogN) 的性能损耗)。
- 内存占用小:通过复用
tagsCode存储时间戳,判断是否过期无需读 CommitLog,避免了大量随机读 IO。
局限性(缺点)
- 不支持任意时间延迟:业务只能在固定的 18 个级别里选,不够灵活。如果业务需要延迟 13 分钟,只能妥协选 10 分钟或 20 分钟。
- 增加磁盘 IO:一条延迟消息会被写入两次
CommitLog(一次是写入 SCHEDULE_TOPIC,一次是到期后写入真实 Topic),增加了磁盘压力。
补充扩展:RocketMQ 5.x 的进化
正因为 4.x 存在不支持任意时间的痛点,RocketMQ 5.x 引入了基于时间轮(TimerWheel)的任意时间延迟消息。
它通过单独的 TimerLog 和 TimerWheel 文件来管理任意时间戳的消息,彻底解决了 4.x 只能使用固定级别的问题。如果在面试中你能顺带提一下 5.x 的演进,会是非常好的加分项。