基于本文回答
0
评论

如何保证RocketMQ的消息不丢失?(从生产、存储、消费角度)

知识点图片

RocketMQ防丢指南:生产端同步发送且重试,Broker端同步刷盘与同步复制,消费端业务成功后再ACK确认。

要保证 RocketMQ 消息不丢失,必须从消息的整个生命周期入手,即:生产阶段(Producer)存储阶段(Broker)消费阶段(Consumer)

这通常是一个“至少一次(At Least Once)”投递的承诺,意味着为了不丢消息,可能会产生重复消息,因此消费端必须做好幂等性处理。

以下是详细的分析和配置建议:


一、 生产阶段(Producer):确保消息成功发出去

在生产阶段,消息丢失通常是因为网络抖动、Broker 宕机或 Producer 自身崩溃导致消息没发出去,或者发出去后没收到确认。

1. 使用同步发送(Sync Send)

RocketMQ 提供了三种发送方式:同步、异步、单向(Oneway)。

  • 方案必须使用同步发送
  • 原理producer.send(msg) 会阻塞等待 Broker 的响应(ACK)。只有收到 SendStatus.SEND_OK,才算发送成功。
  • 注意:如果收到 FLUSH_DISK_TIMEOUT(刷盘超时)或 FLUSH_SLAVE_TIMEOUT(同步 Slave 超时),虽然消息可能已在内存中,但为了绝对安全,也可以视为失败进行重试。

2. 失败重试机制

如果发送失败(抛出异常或返回值非 OK),Producer 需要进行重试。

  • 配置producer.setRetryTimesWhenSendFailed(3)(默认是 2)。
  • 策略:如果向 Broker A 发送失败,重试时 RocketMQ 会自动尝试投递到 Broker B(如果有集群),从而规避单点故障。

3. 使用事务消息(Transaction Message)

如果业务场景是“本地数据库事务 + 发送消息”必须一致(例如:订单创建成功后必须发送消息给库存系统)。

  • 方案:使用 RocketMQ 的事务消息机制。
  • 原理
    1. 发送 Half 消息(半消息,消费者不可见)。
    2. 执行本地事务。
    3. 根据本地事务结果,提交(Commit)或回滚(Rollback)消息。
    4. 如果第 3 步失败,RocketMQ 会回查本地事务状态。

二、 存储阶段(Broker):确保消息持久化且不单点故障

这是消息不丢失的核心环节。如果 Broker 收到消息在内存中还没写入磁盘就宕机,或者主节点挂了从节点没同步到数据,消息就会丢失。

1. 开启同步刷盘(Sync Flush)

RocketMQ 默认可能是异步刷盘(Async Flush),即消息写入 PageCache 就返回成功,由操作系统决定何时写入物理磁盘。

  • 风险:如果机器断电,PageCache 中的数据会丢失。
  • 方案:修改 broker.conf,配置 flushDiskType = SYNC_FLUSH
  • 效果:消息必须写入物理磁盘后,Broker 才会给 Producer 返回成功 ACK。性能会有所下降,但安全性最高。

2. 开启同步复制(Sync Replication)

在集群模式下(Master-Slave),默认可能是异步复制。Master 收到消息就返回成功,后台异步同步给 Slave。

  • 风险:如果 Master 宕机且数据还没同步给 Slave,这部分数据就丢了。
  • 方案:修改 broker.conf,配置 brokerRole = SYNC_MASTER
  • 效果:Master 收到消息后,必须等待 Slave 也写入成功,才会给 Producer 返回成功。

3. 使用 Dledger(Raft 协议)模式(推荐)

传统的 Master-Slave 在 Master 挂掉后需要人工介入或复杂的切换。RocketMQ 4.5+ 引入了 Dledger。

  • 方案:使用 Dledger 模式部署 Broker。
  • 原理:基于 Raft 协议,多副本强一致性。消息必须在大多数节点(Quorum)写入成功才算成功。当 Master 宕机,集群会自动选出新 Master,且保证数据一致不丢失。

三、 消费阶段(Consumer):确保消息处理完再确认

在消费阶段,消息丢失通常是因为消费者拿到消息后,还没处理完业务逻辑(比如写库失败、空指针异常),就告诉 Broker “我消费完了”。

1. 业务逻辑执行完再 ACK

RocketMQ 的消费者注册监听器时,会返回一个状态。

  • 错误做法:先 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS,再异步处理业务。
  • 正确做法
    java
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                try {
                    // 1. 执行核心业务逻辑(如写数据库)
                    doBusinessLogic(msg);
                } catch (Exception e) {
                    // 2. 发生异常,返回重试状态
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            // 3. 业务成功,才返回成功状态
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });

2. 消费重试与死信队列(DLQ)

如果业务逻辑报错,返回 RECONSUME_LATER,Broker 会在一段时间后重新投递该消息。

  • 机制:默认重试 16 次(时间间隔逐步递增)。
  • 兜底:如果 16 次都失败,消息会进入死信队列(Dead Letter Queue)。虽然此时消费者没处理成功,但消息本身没有“丢失”,依然保存在 MQ 中,可以通过人工介入处理死信队列的数据。

3. 重要前提:幂等性设计

为了保证不丢消息,我们实施了“同步发送”、“失败重试”、“同步复制”等手段,这极大概率会导致消息重复(例如:Broker 写入磁盘成功,但在返回 ACK 给 Producer 时网络断了,Producer 以为失败了又发了一遍)。

  • 要求:消费端必须实现幂等性(Idempotency)。
  • 手段:利用数据库唯一索引、Redis 分布式锁等方式,确保同一条消息(通过 Message Key 或 Unique ID 标识)不会被重复处理。

总结配置清单

要想达到极高的可靠性(消息 0 丢失),请参照以下配置:

环节 关键策略 关键配置/代码 备注
生产端 同步发送 + 重试 producer.send(msg)
producer.setRetryTimesWhenSendFailed(3)
性能会有损耗
存储端 同步刷盘 flushDiskType = SYNC_FLUSH TPS 会下降,磁盘 IO 成为瓶颈
存储端 同步复制 brokerRole = SYNC_MASTER 需配合 Slave 节点
消费端 手动 ACK 先执行业务,无异常后再 return CONSUME_SUCCESS 切忌异步消费后立即返回

权衡(Trade-off):
完全保证消息不丢失会严重牺牲吞吐量(TPS)和增加延迟。在实际生产中,很多非金融级核心链路会选择“异步刷盘 + 同步双写”或者“异步刷盘 + 异步复制”来换取高性能,容忍极小概率的消息丢失。只有在涉及资金、核心数据的场景下,才建议开启全链路的强一致性配置。

右滑查看面试常问