Redis消息队列三种实现方法
本文讲解了使用 Redis 实现消息队列的三种方法:从简单的 LIST 到最可靠的 Stream。Stream 因其支持消费组和消息确认(ACK),能保证高可靠性,是生产环境的最佳选择。
使用 Redis 实现消息队列是一个非常常见的应用场景。Redis 提供了多种数据结构,可以用来实现不同复杂度和可靠性的消息队列。
下面我将从最简单到最推荐的方式,分三步介绍如何实现。
核心概念
一个消息队列通常包含三个角色:
- 生产者 (Producer):负责创建消息并将其放入队列。
- 消费者 (Consumer):负责从队列中取出消息并进行处理。
- 队列 (Queue):存储消息的中间件,这里就是 Redis。
方法一:最简单的方式 - 使用 LIST
这是最直观、最简单的实现方式。我们使用 Redis 的 LIST 数据结构,生产者从一端推入(LPUSH),消费者从另一端弹出(RPOP)。
- 生产者命令:
LPUSH queue_name message(从列表左侧推入) - 消费者命令:
RPOP queue_name(从列表右侧弹出)
这种方式是先进先出 (FIFO) 的。
生产者 (Producer) 示例 (Python)
python
# producer.py
import redis
import time
# 连接到 Redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
queue_name = 'my_simple_queue'
print("生产者已启动...")
for i in range(10):
message = f"message_{i}"
print(f"发送消息: {message}")
# 使用 LPUSH 将消息推入列表左侧
r.lpush(queue_name, message)
time.sleep(1)
print("所有消息发送完毕。")
消费者 (Consumer) 示例 (Python)
python
# consumer.py
import redis
import time
# 连接到 Redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
queue_name = 'my_simple_queue'
print("消费者已启动,等待消息...")
while True:
# 使用 RPOP 从列表右侧弹出一个消息
# 如果列表为空,RPOP 会返回 None
message = r.rpop(queue_name)
if message:
print(f"收到并处理消息: {message}")
# 在这里执行你的业务逻辑
else:
# 如果没有消息,等待一会再试
print("队列为空,等待1秒...")
time.sleep(1)
优点:
- 非常简单,容易理解和实现。
缺点:
- 效率低下: 消费者需要通过
while True循环不断地轮询 Redis,即使没有消息也会产生大量空请求,浪费 CPU 和网络资源。 - 消息丢失风险: 如果消费者在
RPOP之后、处理消息之前崩溃,那么这条消息就永远丢失了。
方法二:更高效的方式 - 使用 LIST 的阻塞命令
为了解决方法一中消费者轮询的低效问题,Redis 提供了 LIST 的阻塞版本命令。
- 生产者命令:
LPUSH queue_name message(不变) - 消费者命令:
BRPOP queue_name timeout(阻塞地从右侧弹出)
BRPOP 会在没有消息时阻塞连接,直到有新消息进入或者超时。这极大地提高了效率,避免了空轮询。
消费者 (Consumer) 示例 (使用 BRPOP)
python
# consumer_blocking.py
import redis
# 连接到 Redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
queue_name = 'my_simple_queue'
print("阻塞型消费者已启动,等待消息...")
while True:
# BRPOP 是一个阻塞操作
# 它会等待消息,直到有消息可用或超时(0 表示永不超时)
# 返回的是一个元组 (queue_name, message)
result = r.brpop(queue_name, timeout=0)
if result:
queue, message = result
print(f"从队列 '{queue}' 收到并处理消息: {message}")
# 在这里执行你的业务逻辑
优点:
- 高效: 消费者不再需要空轮询,连接会保持挂起状态,直到有消息到达,非常节省资源。
缺点:
- 消息丢失风险依然存在: 和
RPOP一样,如果消费者在收到消息后、处理完成前崩溃,消息同样会丢失。
方法三:【官方推荐】最可靠的方式 - 使用 STREAM
从 Redis 5.0 开始,引入了一个全新的、功能强大的数据结构:Stream。它专门为消息队列场景设计,解决了 LIST 方案的诸多痛点。
Stream 的核心特性:
- 消息持久化: Stream 是一个追加式的日志结构,消息会一直保留。
- 消费组 (Consumer Groups):允许多个消费者协作消费同一个 Stream,Redis 会确保每个消息只被组内的一个消费者处理。非常适合横向扩展。
- 消息确认 (ACK): 消费者在处理完消息后,需要向 Redis 发送一个
XACK命令,告知 Redis "这条消息我处理好了"。如果消费者崩溃而没有发送 ACK,这条消息会变成 "待处理" (Pending) 状态,可以被重新分配给其他消费者,从而保证了消息的至少一次消费 (At-least-once delivery)。
生产者 (Producer) 示例 (使用 Stream)
python
# producer_stream.py
import redis
import time
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
stream_name = 'my_stream_queue'
print("Stream 生产者已启动...")
for i in range(10):
message_body = {'id': i, 'content': f'stream_message_{i}'}
# 使用 XADD 添加消息到 Stream
# '*' 表示让 Redis 自动生成消息 ID
message_id = r.xadd(stream_name, message_body)
print(f"发送消息: {message_id} -> {message_body}")
time.sleep(1)
print("所有消息发送完毕。")
消费者 (Consumer) 示例 (使用 Stream 和消费组)
python
# consumer_stream.py
import redis
import time
# 消费者配置
stream_name = 'my_stream_queue'
group_name = 'my_group'
consumer_name = 'consumer_1' # 在实际应用中,每个消费者实例应该有唯一的名字
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# 1. 创建消费组 (如果不存在)
try:
# '0' 表示从头开始消费,'$' 表示只消费新消息
r.xgroup_create(stream_name, group_name, id='0', mkstream=True)
print(f"消费组 '{group_name}' 已创建。")
except redis.exceptions.ResponseError as e:
# 如果消费组已存在,会报错,这是正常现象
print(f"消费组 '{group_name}' 已存在。")
print(f"消费者 '{consumer_name}' 已启动,等待消息...")
while True:
# 2. 从消费组读取消息
# '>' 表示只接收从未传递给任何其他消费者的消息
# block=0 表示无限期阻塞等待
# count=1 表示一次只取一条
messages = r.xreadgroup(group_name, consumer_name, {stream_name: '>'}, count=1, block=0)
if messages:
for stream, msg_list in messages:
for msg_id, msg_data in msg_list:
print(f"消费者 '{consumer_name}' 收到并处理消息: {msg_id} -> {msg_data}")
# 模拟处理消息需要一些时间
time.sleep(2)
# 3. 确认消息 (ACK)
# 处理完成后,发送 ACK,将消息从 Pending 列表中移除
r.xack(stream_name, group_name, msg_id)
print(f"消息 {msg_id} 已确认 (ACK)。")
优点:
- 高可靠性: 通过消费组和 ACK 机制,保证了消息不会因为消费者崩溃而丢失。
- 高可用性/可扩展性: 可以轻松添加更多消费者到同一个消费组,共同处理消息,实现负载均衡。
- 消息可追溯: Stream 是一个日志,可以随时查看历史消息,或者让新的消费组从头开始消费。
总结与选择
| 特性 | 方法一 (LIST) | 方法二 (Blocking LIST) | 方法三 (STREAM) |
|---|---|---|---|
| 实现简单性 | ⭐⭐⭐⭐⭐ (非常简单) | ⭐⭐⭐⭐ (简单) | ⭐⭐⭐ (稍复杂) |
| 消费效率 | ⭐ (低,轮询) | ⭐⭐⭐⭐ (高,阻塞) | ⭐⭐⭐⭐⭐ (高,阻塞) |
| 消息可靠性 | ⭐ (低,易丢失) | ⭐ (低,易丢失) | ⭐⭐⭐⭐⭐ (高,有ACK) |
| 多消费者扩展 | 差 (需要自己实现逻辑) | 差 (需要自己实现逻辑) | ⭐⭐⭐⭐⭐ (原生支持消费组) |
| 适用场景 | 临时脚本、简单任务 | 简单的、能容忍消息丢失的场景 | 所有生产环境、可靠性要求高的场景 |
| Redis 版本 | 所有版本 | 所有版本 | 5.0+ |
结论:
- 如果你只是想写个小脚本,或者进行快速原型开发,使用 LIST + BRPOP 是一个不错的选择,它简单且高效。
- 但对于任何生产级别或对数据可靠性有要求的应用,强烈推荐使用 Redis Stream。它是 Redis 官方为消息队列场景提供的标准解决方案,功能强大且设计完善。