如何保证RocketMQ的消息不丢失?(从生产、存储、消费角度)
RocketMQ防丢指南:生产端同步发送且重试,Broker端同步刷盘与同步复制,消费端业务成功后再ACK确认。
要保证 RocketMQ 消息不丢失,必须从消息的整个生命周期入手,即:生产阶段(Producer)、存储阶段(Broker)、消费阶段(Consumer)。
这通常是一个“至少一次(At Least Once)”投递的承诺,意味着为了不丢消息,可能会产生重复消息,因此消费端必须做好幂等性处理。
以下是详细的分析和配置建议:
一、 生产阶段(Producer):确保消息成功发出去
在生产阶段,消息丢失通常是因为网络抖动、Broker 宕机或 Producer 自身崩溃导致消息没发出去,或者发出去后没收到确认。
1. 使用同步发送(Sync Send)
RocketMQ 提供了三种发送方式:同步、异步、单向(Oneway)。
- 方案:必须使用同步发送。
- 原理:
producer.send(msg)会阻塞等待 Broker 的响应(ACK)。只有收到SendStatus.SEND_OK,才算发送成功。 - 注意:如果收到
FLUSH_DISK_TIMEOUT(刷盘超时)或FLUSH_SLAVE_TIMEOUT(同步 Slave 超时),虽然消息可能已在内存中,但为了绝对安全,也可以视为失败进行重试。
2. 失败重试机制
如果发送失败(抛出异常或返回值非 OK),Producer 需要进行重试。
- 配置:
producer.setRetryTimesWhenSendFailed(3)(默认是 2)。 - 策略:如果向 Broker A 发送失败,重试时 RocketMQ 会自动尝试投递到 Broker B(如果有集群),从而规避单点故障。
3. 使用事务消息(Transaction Message)
如果业务场景是“本地数据库事务 + 发送消息”必须一致(例如:订单创建成功后必须发送消息给库存系统)。
- 方案:使用 RocketMQ 的事务消息机制。
- 原理:
- 发送 Half 消息(半消息,消费者不可见)。
- 执行本地事务。
- 根据本地事务结果,提交(Commit)或回滚(Rollback)消息。
- 如果第 3 步失败,RocketMQ 会回查本地事务状态。
二、 存储阶段(Broker):确保消息持久化且不单点故障
这是消息不丢失的核心环节。如果 Broker 收到消息在内存中还没写入磁盘就宕机,或者主节点挂了从节点没同步到数据,消息就会丢失。
1. 开启同步刷盘(Sync Flush)
RocketMQ 默认可能是异步刷盘(Async Flush),即消息写入 PageCache 就返回成功,由操作系统决定何时写入物理磁盘。
- 风险:如果机器断电,PageCache 中的数据会丢失。
- 方案:修改
broker.conf,配置flushDiskType = SYNC_FLUSH。 - 效果:消息必须写入物理磁盘后,Broker 才会给 Producer 返回成功 ACK。性能会有所下降,但安全性最高。
2. 开启同步复制(Sync Replication)
在集群模式下(Master-Slave),默认可能是异步复制。Master 收到消息就返回成功,后台异步同步给 Slave。
- 风险:如果 Master 宕机且数据还没同步给 Slave,这部分数据就丢了。
- 方案:修改
broker.conf,配置brokerRole = SYNC_MASTER。 - 效果:Master 收到消息后,必须等待 Slave 也写入成功,才会给 Producer 返回成功。
3. 使用 Dledger(Raft 协议)模式(推荐)
传统的 Master-Slave 在 Master 挂掉后需要人工介入或复杂的切换。RocketMQ 4.5+ 引入了 Dledger。
- 方案:使用 Dledger 模式部署 Broker。
- 原理:基于 Raft 协议,多副本强一致性。消息必须在大多数节点(Quorum)写入成功才算成功。当 Master 宕机,集群会自动选出新 Master,且保证数据一致不丢失。
三、 消费阶段(Consumer):确保消息处理完再确认
在消费阶段,消息丢失通常是因为消费者拿到消息后,还没处理完业务逻辑(比如写库失败、空指针异常),就告诉 Broker “我消费完了”。
1. 业务逻辑执行完再 ACK
RocketMQ 的消费者注册监听器时,会返回一个状态。
- 错误做法:先
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS,再异步处理业务。 - 正确做法:java
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { try { // 1. 执行核心业务逻辑(如写数据库) doBusinessLogic(msg); } catch (Exception e) { // 2. 发生异常,返回重试状态 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } // 3. 业务成功,才返回成功状态 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } });
2. 消费重试与死信队列(DLQ)
如果业务逻辑报错,返回 RECONSUME_LATER,Broker 会在一段时间后重新投递该消息。
- 机制:默认重试 16 次(时间间隔逐步递增)。
- 兜底:如果 16 次都失败,消息会进入死信队列(Dead Letter Queue)。虽然此时消费者没处理成功,但消息本身没有“丢失”,依然保存在 MQ 中,可以通过人工介入处理死信队列的数据。
3. 重要前提:幂等性设计
为了保证不丢消息,我们实施了“同步发送”、“失败重试”、“同步复制”等手段,这极大概率会导致消息重复(例如:Broker 写入磁盘成功,但在返回 ACK 给 Producer 时网络断了,Producer 以为失败了又发了一遍)。
- 要求:消费端必须实现幂等性(Idempotency)。
- 手段:利用数据库唯一索引、Redis 分布式锁等方式,确保同一条消息(通过 Message Key 或 Unique ID 标识)不会被重复处理。
总结配置清单
要想达到极高的可靠性(消息 0 丢失),请参照以下配置:
| 环节 | 关键策略 | 关键配置/代码 | 备注 |
|---|---|---|---|
| 生产端 | 同步发送 + 重试 | producer.send(msg) producer.setRetryTimesWhenSendFailed(3) |
性能会有损耗 |
| 存储端 | 同步刷盘 | flushDiskType = SYNC_FLUSH |
TPS 会下降,磁盘 IO 成为瓶颈 |
| 存储端 | 同步复制 | brokerRole = SYNC_MASTER |
需配合 Slave 节点 |
| 消费端 | 手动 ACK | 先执行业务,无异常后再 return CONSUME_SUCCESS |
切忌异步消费后立即返回 |
权衡(Trade-off):
完全保证消息不丢失会严重牺牲吞吐量(TPS)和增加延迟。在实际生产中,很多非金融级核心链路会选择“异步刷盘 + 同步双写”或者“异步刷盘 + 异步复制”来换取高性能,容忍极小概率的消息丢失。只有在涉及资金、核心数据的场景下,才建议开启全链路的强一致性配置。