如何确保RabbitMQ 的幂等性
RabbitMQ 幂等性核心在于全局唯一 ID 配合业务去重。常用方案包括:利用数据库唯一索引拦截、使用 Redis 原子性(SETNX)记录消费状态,或通过乐观锁/状态机确保重复操作不改变数据结果。
为什么需要幂等性?
在消息队列(MQ)环境中,消息重复是不可避免的。主要原因有两个:
- 生产者重试:生产者发送消息给 MQ,MQ 收到但在返回 ACK 时网络中断,生产者误以为失败而重发。
- 消费者重试:消费者处理完业务但在向 MQ 发送 ACK 时失败,MQ 认为消息未消费,重新投递给消费者。
因此,幂等性的目标是:无论消息被消费多少次,最终的业务结果必须与消费一次的结果一致。
解决方案详解
实现幂等性的前提是:每条消息必须携带一个全局唯一的 ID(Message ID/Business ID)。
1. 利用数据库唯一索引(强一致性,适用于新增)
这是最简单且最严谨的方案。利用数据库的主键或唯一约束(Unique Key)特性。
- 原理:将消息的唯一 ID(如订单号)作为数据库表的唯一键。
- 流程:
- 消费者收到消息。
- 尝试向数据库插入数据。
- 如果插入成功,流程结束。
- 如果抛出
DuplicateKeyException(主键冲突异常),说明该消息已被消费过,直接丢弃或记录日志。
sql
INSERT INTO order_table (order_id, status, ...) VALUES ('unique_msg_id', ...);
-- 如果 order_id 已存在,DB 会报错,程序捕获异常即可
2. 使用 Redis 原子操作(高性能,适用于高并发)
利用 Redis 的 SETNX (Set if Not Exists) 命令来实现去重。
- 原理:在处理业务前,先去 Redis 占位。
- 流程:
- 消费者收到消息,获取唯一 ID。
- 执行
SETNX key value(Key 为消息 ID)。 - 返回值判断:
- 成功(返回 1):说明是第一次消费,执行业务逻辑。
- 失败(返回 0):说明已消费过,直接返回 ACK,结束。
- 注意:即使业务执行失败,Redis 中的 Key 也可能需要设置过期时间,防止死锁(即业务失败了但 Key 还在,导致后续重试无法执行)。更好的方式是业务执行成功后再更新 Redis 状态,或者结合数据库事务。
java
// 伪代码示例
Boolean isNotProcessed = redisTemplate.opsForValue().setIfAbsent("msg_id:" + messageId, "1", 10, TimeUnit.MINUTES);
if (!isNotProcessed) {
return; // 已经消费过
}
// 执行业务逻辑...
3. 乐观锁机制(适用于更新操作)
如果消息是用来更新数据的(例如扣减库存、更新状态),可以使用版本号(Version)机制。
- 原理:在数据表中增加一个
version字段。 - 流程:
- 查询当前数据的版本号,假设为
v1。 - 执行更新时,带上版本号条件。
- 如果消息被重复消费,第二次执行时版本号已变,更新将失败(影响行数为 0)。
- 查询当前数据的版本号,假设为
sql
-- 第一次消费
UPDATE account SET balance = balance - 100, version = version + 1
WHERE id = 1 AND version = 1;
-- 成功,影响行数 1
-- 重复消费(此时库里 version 已经是 2 了)
UPDATE account SET balance = balance - 100, version = version + 1
WHERE id = 1 AND version = 1;
-- 失败,影响行数 0,也就是操作无效,保证了幂等
4. 状态机机制(适用于复杂业务流)
对于有明确状态流转的业务(如订单状态:待支付 -> 已支付 -> 发货),利用状态判断来保证幂等。
- 原理:更新前检查当前状态是否允许流转。
- 流程:
- 消息意图:将订单从“待支付”更新为“已支付”。
- SQL 逻辑:只有当前是“待支付”才更新。
sql
UPDATE order_table SET status = 'PAID'
WHERE order_id = 'xxx' AND status = 'UNPAID';
如果消息重复消费,第二次执行时状态已经是 PAID,SQL 也就不会生效。
总结推荐
- 关键业务(如资金类):推荐 数据库唯一索引 或 乐观锁,因为数据库具有 ACID 特性,数据最安全。
- 高并发业务(如日志、统计):推荐 Redis 方案,性能最好,但需处理好 Redis 宕机或数据丢失的极端情况。
- 通用做法:通常是 Redis 去重(挡住大部分流量) + 数据库唯一索引(兜底) 的组合拳。