RocketMQ 是如何保证消息发送和消费的全局顺序与分区顺序的?
RocketMQ 支持两种级别的顺序消息:全局顺序(Global Order)和分区顺序(Partition Order,也称局部顺序)。
在分布式系统中,保证顺序需要发送端(Producer)、存储端(Broker)、消费端(Consumer)三个环节的共同配合。以下是 RocketMQ 实现这两种顺序消息的详细机制:
一、 概念区分
- 全局顺序:整个 Topic 下的所有消息严格按照发送顺序进行消费。
- 分区顺序(最常用):在同一个 Topic 下,根据消息的特定路由键(如订单 ID、用户 ID),将具有相同键的消息路由到同一个 MessageQueue(消息队列)中,保证这部分消息的严格顺序。不同键的消息在不同队列中可以并发消费。
二、 全局顺序的实现
全局顺序的实现非常简单粗暴,但代价是牺牲了分布式系统的高可用和高吞吐。
- 实现方式:将 Topic 的读写队列数(
readQueueNums和writeQueueNums)全部设置为 1。 - 原理:因为只有一个队列,所有的生产者只能向这一个队列发送消息,所有的消费者也只能从这一个队列拉取消息。配合发送端的同步发送和消费端的顺序消费机制,自然就实现了全局顺序。
- 适用场景:极少使用。仅适用于对吞吐量要求极低,且必须要求所有消息严格排序的场景(如某些配置的全局变更)。
三、 分区顺序的实现(核心重点)
分区顺序是 RocketMQ 解决顺序消息的核心方案。它需要发送、存储、消费三端的配合:
1. 发送端(Producer):保证消息按序进入同一个队列
要想保证顺序,首先要保证相关联的消息(例如同一个订单的 创建 -> 付款 -> 发货)被发送到同一个 MessageQueue。
- 队列选择器(MessageQueueSelector):
发送消息时,必须实现并传入MessageQueueSelector。通常的做法是将业务主键(如订单 ID)进行 Hash 取模:hash(orderId) % queueList.size()。这样相同业务主键的消息一定会落入固定的队列中。 - 同步发送(Sync Send):
必须使用同步发送模式。如果使用异步发送,由于网络延迟或重试机制,先发送的消息可能会晚于后发送的消息到达 Broker,从而破坏顺序。
// 发送端示例
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer orderId = (Integer) arg;
int index = orderId % mqs.size(); // 按订单ID取模选择队列
return mqs.get(index);
}
}, orderId);
2. 存储端(Broker):保证队列内部有序
Broker 端不需要做复杂的特殊处理,因为 RocketMQ 本身的存储机制就是天然有序的:
- 消息到达 Broker 后,会按照到达的先后顺序追加到
CommitLog中。 - 随后,Broker 会按照严格的顺序构建
ConsumeQueue(逻辑消费队列)。每个MessageQueue对应的ConsumeQueue中的位点(Offset)是严格递增的。
3. 消费端(Consumer):保证单线程顺序拉取和处理
这是顺序消息最复杂的环节。因为默认情况下,RocketMQ 的消费者是多线程并发消费的(MessageListenerConcurrently),这会导致即使消息在队列中是有序的,处理完毕的顺序也是错乱的。
为了保证顺序消费,消费者必须使用 MessageListenerOrderly 监听器。RocketMQ 在消费端通过两把锁和重试机制来保证顺序:
第一把锁:Broker 端的分布式锁(MessageQueue 锁)
在集群消费模式下,同一个MessageQueue同一时刻只能被同一个 Consumer 实例消费。
Consumer 在拉取顺序消息前,会定时向 Broker 发送请求,申请对目标MessageQueue加锁。只有成功拿到 Broker 端锁的 Consumer 实例,才能从该队列拉取消息。
解决的问题:防止发生 Rebalance 时,两个不同的 Consumer 实例同时消费同一个队列导致乱序。第二把锁:Consumer 端的本地锁(线程锁)
拿到 Broker 端的锁并拉取到消息后,Consumer 内部的线程池由于有多个线程,仍可能并发处理同一个队列的消息。
因此,Consumer 在将消息交给MessageListenerOrderly处理之前,会对当前的MessageQueue加上本地对象锁(synchronized或ReentrantLock)。
解决的问题:保证 Consumer 实例内部,同一时刻只有一个线程在处理特定MessageQueue的消息。处理失败与重试阻塞(SUSPEND)
如果一条消息消费失败了,在并发消费模式下,通常是把消息发回 Broker 的重试队列,然后继续消费下一条。
但在顺序消费模式下绝对不能跳过,否则顺序就乱了。如果消费失败,监听器会返回SUSPEND_CURRENT_QUEUE_A_MOMENT(挂起当前队列片刻)。
此时,Consumer 会阻塞该队列的后续消费,并在一段时间后不断尝试重新消费这条失败的消息,直到消费成功,或者达到最大重试次数(进入死信队列),才会继续消费下一条消息。
四、 顺序消息的痛点与注意事项
使用 RocketMQ 的顺序消息虽然解决了业务问题,但也带来了一些需要特别注意的坑:
- 并发度降低,容易产生消息积压:
分区顺序消息的并发度取决于 Topic 的队列数(MessageQueue 数量)。如果某个队列里的一条消息消费失败,会导致整个队列阻塞(SUSPEND),产生严重的排头阻塞(Head-of-line blocking)问题,引发消息积压。 - 动态扩缩容(队列数量发生变化):
如果在运行期间改变了 Topic 的读写队列数量,会导致发送端的 Hash 取模结果发生变化。旧订单的新消息可能会被路由到新的队列,导致顺序被破坏。
解决方案:一般不建议动态改变顺序 Topic 的队列数。如果必须扩容,通常需要通过停机扩容,或者在业务侧引入旧队列消费完再启用新队列的过渡机制。 - 热点数据倾斜:
如果某个 Hash Key(如某个大客户的订单)的数据量特别大,会导致某个 MessageQueue 的数据量激增,而其他队列空闲,造成消费者节点的负载极度不均衡。
总结
RocketMQ 保证顺序消息的核心公式:
单队列 + 路由 Hash + 同步发送 + Broker 分布式锁 + 消费者本地锁 + 失败阻塞重试。