基于本文回答

播面 播面

文图音视,全方位拆解八股文
0
评论

RocketMQ 4.x 是如何实现延迟消息的?

在 RocketMQ 4.x 中,延迟消息(定时消息)的实现核心思想是:“偷梁换柱” + 后台定时任务轮询

需要特别注意的是,RocketMQ 4.x 不支持任意精度的延迟消息,而是只支持 18个固定的延迟级别(Delay Level)。默认级别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

下面详细拆解其内部实现原理和流转过程:

1. 核心原理概述

当 Producer 发送一条带有 delayLevel 的消息时,Broker 并不会把它直接存入目标 Topic,而是将其临时存放到一个系统内部的特殊 Topic(SCHEDULE_TOPIC_XXXX)中。Broker 内部有一个定时任务服务(ScheduleMessageService),会不断地扫描这个特殊 Topic 中的消息,一旦发现消息到期了,就会把消息提取出来,恢复成原来的 Topic 和 Queue,再次存入 Broker,此时 Consumer 就能消费到了。

2. 详细运转流程

整个延迟消息的生命周期可以分为以下四个步骤:

第一步:Producer 发送消息

Producer 在发送消息时,通过 message.setDelayTimeLevel(level) 设置延迟级别。

  • 如果 level == 0,代表非延迟消息。
  • 如果 level > 0,代表延迟消息。

第二步:Broker 拦截并“偷梁换柱”(写入阶段)

当 Broker 接收到消息,在写入 CommitLog 之前,会进行如下判断和篡改:

  1. 判断级别:检查消息的 delayTimeLevel 是否大于 0。
  2. 备份原信息:将消息原本真实的 TopicQueueId 存入消息的属性(properties)中暂存起来。
  3. 偷梁换柱
    • 将消息的 Topic 强制修改为系统内部 Topic:SCHEDULE_TOPIC_XXXX
    • 将消息的 QueueId 修改为 delayLevel - 1(因为 QueueId 是从 0 开始的,这就意味着每个延迟级别对应一个独立的 Queue)。
  4. 落盘:将篡改后的消息写入 CommitLog,随后异步构建出 SCHEDULE_TOPIC_XXXXConsumeQueue

巧妙设计(性能优化):在构建 SCHEDULE_TOPIC_XXXXConsumeQueue 时,RocketMQ 会把这条消息的 预期投递时间戳(DeliverTimestamp) 存入 ConsumeQueue 索引条目的 tagsCode 字段中(正常消息这里存的是 Tag 的 Hash 值)。
这样做的好处是:后续定时任务扫描时,不需要去读物理文件 CommitLog,只需扫轻量级的 ConsumeQueue 索引,对比 tagsCode 和当前时间即可知道是否到期,极大提升了性能。

第三步:ScheduleMessageService 定时调度(等待阶段)

Broker 启动时,会启动一个后台服务 ScheduleMessageService

  1. 它会为 每一个延迟级别(即每一个 Queue)启动一个独立的定时 TimerTask(DeliverDelayedMessageTimerTask
  2. 这些 Task 会不断地去拉取对应 Queue 的 ConsumeQueue 索引。
  3. 读取 tagsCode(即预期投递时间戳)与系统当前时间比对:
    • 如果 未到期:计算差值,让当前线程 sleep 差值时间后再次被唤醒。
    • 如果 已到期:根据 offset 去 CommitLog 中把完整的消息主体拉取出来,进入第四步。

第四步:消息恢复与重新投递(到期阶段)

当定时任务发现消息到期后:

  1. 清除延迟属性:将消息体内的 delayTimeLevel 清除(防止再次被当成延迟消息)。
  2. 恢复原貌:从消息的 properties 中取出第一步备份的、真实的 TopicQueueId,重新设置回消息体中。
  3. 再次落盘:将恢复后的消息重新当作一条普通的新消息,再次写入 CommitLog
  4. 消费:此时,消息会被正常分发到真实 Topic 的 ConsumeQueue 中,Consumer 就可以像消费普通消息一样消费它了。

3. 设计的优点与局限性(面试常考)

优点(为什么 4.x 要这么设计?)

  1. 复用现有存储架构:完全复用了 RocketMQ 原有的 CommitLog + ConsumeQueue 机制,没有引入额外的存储引擎。
  2. 性能极高:相同延迟级别的消息被写入同一个 Queue,保证了严格的按时间先后顺序排列。定时任务只需要顺序扫描即可,不需要对消息进行复杂的排序操作(避免了 O(NlogN) 的性能损耗)。
  3. 内存占用小:通过复用 tagsCode 存储时间戳,判断是否过期无需读 CommitLog,避免了大量随机读 IO。

局限性(缺点)

  1. 不支持任意时间延迟:业务只能在固定的 18 个级别里选,不够灵活。如果业务需要延迟 13 分钟,只能妥协选 10 分钟或 20 分钟。
  2. 增加磁盘 IO:一条延迟消息会被写入两次 CommitLog(一次是写入 SCHEDULE_TOPIC,一次是到期后写入真实 Topic),增加了磁盘压力。

补充扩展:RocketMQ 5.x 的进化

正因为 4.x 存在不支持任意时间的痛点,RocketMQ 5.x 引入了基于时间轮(TimerWheel)的任意时间延迟消息
它通过单独的 TimerLogTimerWheel 文件来管理任意时间戳的消息,彻底解决了 4.x 只能使用固定级别的问题。如果在面试中你能顺带提一下 5.x 的演进,会是非常好的加分项。

00:00
00:00