RocketMQ 的消息优先级处理方案
在 Apache RocketMQ 中,由于其底层采用了基于 CommitLog 追加写入的流式存储模型,原生设计上是以高吞吐、FIFO(先进先出)为导向的。然而,在实际业务场景中,我们经常需要优先处理核心业务(如订单支付、高价值客户请求)。
目前在 RocketMQ 中,实现消息优先级处理主要分为最新原生方案和经典架构替代方案两大类。
一、 最新原生方案:RIP-80 优先级消息(适用于 RocketMQ 5.4.0+ / 阿里云 5.0 版)
随着 RocketMQ 5.4.0(引入 RIP-80 提案)以及部分云厂商商业版 5.0 的推出,RocketMQ 已经开始支持原生的优先级消息。
- 实现原理:
- 发送端:生产者发送消息时,可以为消息设置一个优先级数值属性(通常支持
0 ~ 9的整数,数值越大,优先级越高)。 - 服务端存储:系统根据消息的优先级将其路由并写入对应 Topic 的不同队列中,每个队列承担特定的优先级。
- 消费端(POP 消费):消费者从 Broker 获取消息时,服务端会优先从高优先级队列拉取消息进行消费。
- 发送端:生产者发送消息时,可以为消息设置一个优先级数值属性(通常支持
- 适用场景与限制:
- 仅在消息堆积时生效:只有在消费端跟不上生产端、消息发生排队(堆积)时,高优先级才会优先投递。若系统没有拥堵,所有消息仍会即时投递。
- 单 Broker 严格有序:高优先级消息在单个 Broker 节点内是严格优先投递的,跨 Broker 节点只能做到“尽可能全局优先”。
- 重试消息失效问题:消息如果消费失败进入重试队列(Retry Topic),可能会丢失原本的优先级,需要结合特定配置(如开启独立的重试队列
useSeparateRetryQueue)来解决。
二、 经典架构替代方案(适用于旧版本或开源通用场景)
如果您的 RocketMQ 处于较低版本(如 4.x、5.x 早期版本),或者出于架构解耦、稳定性考虑,可以采用以下几种行业内主流的替代设计。
方案 1:多 Topic 分离法(行业最常用、最稳定的推荐方案)
既然单个 Topic 无法动态排序,最直接的思路就是将不同优先级的消息分配到不同的 Topic 中。
- 设计方式:
- 定义三个 Topic:
TOPIC_HIGH(高)、TOPIC_NORMAL(中)、TOPIC_LOW(低)。 - 消费端资源倾斜(分配不同线程/实例数):
- 为高优先级 Topic 创建独立的 Consumer 实例,并为其分配更多的消费线程池资源(如 20 个线程)。
- 为低优先级 Topic 仅分配较少的线程(如 2 个线程)。
- 消费端主动权重控制(Pull / LitePull 模式):
- 使用
LitePullConsumer主动拉取消息,在消费线程中设计循环逻辑。例如每次循环:拉取 3 次TOPIC_HIGH的消息,拉取 2 次TOPIC_NORMAL的消息,拉取 1 次TOPIC_LOW的消息。
- 使用
- 定义三个 Topic:
- 优缺点:
- 优点:物理隔离性极强。低优先级消息哪怕积压千万条,也完全不会阻塞或延迟高优先级消息的处理。
- 缺点:当优先级划分极多时(比如超过 10 级),Topic 的数量会随之激增,管理维护成本较高。
方案 2:客户端本地内存缓冲重排序(Local Priority Queue)
这种方案将优先级的排序工作下沉到消费者客户端的本地内存中进行。
- 设计方式:
- 消费者启动后,在本地维护一个有界的高并发优先级队列(例如 Java 中的
PriorityBlockingQueue)。 - 消费者通过 RocketMQ 正常拉取消息(不区分优先级),但拉取到本地后不立刻执行业务逻辑,而是先将消息放入本地的
PriorityBlockingQueue中。 - 本地启动独立的业务工作线程池,不断从该
PriorityBlockingQueue中取出消息(根据自定义的优先级字段进行排序,如 Order ID 或 Priority Tag)并执行真正的业务处理。
- 消费者启动后,在本地维护一个有界的高并发优先级队列(例如 Java 中的
- 优缺点:
- 优点:实现简单,不需要对 RocketMQ 做任何改造,支持任意粒度的细微优先级划分。
- 缺点:
- 局部有效性:只能保证“已经拉取到本地内存的消息”之间具有优先级。如果 Broker 端由于低优先级消息暴增导致网络或拉取受阻,高优先级消息可能根本没有被拉取到本地。
- 内存溢出风险:如果消息拉取过快而消费过慢,会导致本地内存堆积,有 OOM(内存溢出)的风险。
方案 3:单 Topic 区分 Queue 消费法(队列级隔离)
通过自定义消息的路由规则,将不同优先级的消息分流到不同的 MessageQueue(分区队列)中。
- 设计方式:
- 创建一个拥有较多 Queue 的 Topic(如 16 个 Queue)。
- 发送端:通过实现
MessageQueueSelector,将高优先级消息发送到指定的几个 Queue(如 Queue 0-3),将低优先级消息发送到其余 Queue(如 Queue 4-15)。 - 消费端:使用拉模式(LitePullConsumer)来分配这些 Queue。消费者专门控制部分线程优先拉取和处理高优先级的 Queue(Queue 0-3)。
- 优缺点:
- 优点:避免了 Topic 泛滥的问题,在同一个 Topic 内部完成了物理隔离。
- 缺点:需要自己实现较复杂的 Queue 分配与消费调度逻辑。
方案 4:延时消息降级方案(主动避让)
对于某些实时性要求不高的低优先级任务,可以通过延迟消息来为主业务“腾挪”资源。
- 设计方式:
- 高优先级消息直接发送(普通消息),即时投递消费。
- 低优先级消息在发送时,设置一定的延迟时间(例如延迟 10 分钟或更久,RocketMQ 5.x 支持任意秒级定时消息)。
- 优缺点:
- 优点:极大地平滑了瞬时高峰,保证了当前的 CPU 与算力资源优先服务于最核心、最即时的任务。
- 缺点:不适合对低优先级消息有一定时间敏感要求的场景。
三、 总结与选型建议
| 方案 | 适用场景 | 开发复杂度 | 推荐指数 |
|---|---|---|---|
| 原生优先级消息 (5.4.0+) | 使用较新版本 RocketMQ,存在大流量排队,需要轻量优先级标签控制 | 低(直接配置消息属性) | ★★★★☆ |
| 多 Topic 分离法 | 生产环境中最成熟的经典方案,高低优先级业务差异明显,需要彻底隔离 | 中(需要维护多个 Topic 和 Consumer) | ★★★★★ |
| 客户端本地重排序 | 局部消息需要排序,客户端算力和内存相对充裕 | 低(通过内存队列排序) | ★★★☆☆ |
| 延时降级方案 | 低优先级任务时间敏感度极低,属于定时/背景执行任务 | 低 | ★★★☆☆ |
右滑查看面试常问