基于本文回答

播面 播面

文图音视,全方位拆解八股文
0
评论

Redis消息队列三种实现方法

知识点图片

本文讲解了使用 Redis 实现消息队列的三种方法:从简单的 LIST 到最可靠的 Stream。Stream 因其支持消费组和消息确认(ACK),能保证高可靠性,是生产环境的最佳选择。

使用 Redis 实现消息队列是一个非常常见的应用场景。Redis 提供了多种数据结构,可以用来实现不同复杂度和可靠性的消息队列。

下面我将从最简单最推荐的方式,分三步介绍如何实现。

核心概念

一个消息队列通常包含三个角色:

  1. 生产者 (Producer):负责创建消息并将其放入队列。
  2. 消费者 (Consumer):负责从队列中取出消息并进行处理。
  3. 队列 (Queue):存储消息的中间件,这里就是 Redis。

方法一:最简单的方式 - 使用 LIST

这是最直观、最简单的实现方式。我们使用 Redis 的 LIST 数据结构,生产者从一端推入(LPUSH),消费者从另一端弹出(RPOP)。

  • 生产者命令: LPUSH queue_name message (从列表左侧推入)
  • 消费者命令: RPOP queue_name (从列表右侧弹出)

这种方式是先进先出 (FIFO) 的。

生产者 (Producer) 示例 (Python)

python
# producer.py
import redis
import time

# 连接到 Redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

queue_name = 'my_simple_queue'

print("生产者已启动...")
for i in range(10):
    message = f"message_{i}"
    print(f"发送消息: {message}")
    # 使用 LPUSH 将消息推入列表左侧
    r.lpush(queue_name, message)
    time.sleep(1)

print("所有消息发送完毕。")

消费者 (Consumer) 示例 (Python)

python
# consumer.py
import redis
import time

# 连接到 Redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

queue_name = 'my_simple_queue'

print("消费者已启动,等待消息...")
while True:
    # 使用 RPOP 从列表右侧弹出一个消息
    # 如果列表为空,RPOP 会返回 None
    message = r.rpop(queue_name)
    if message:
        print(f"收到并处理消息: {message}")
        # 在这里执行你的业务逻辑
    else:
        # 如果没有消息,等待一会再试
        print("队列为空,等待1秒...")
        time.sleep(1)

优点:

  • 非常简单,容易理解和实现。

缺点:

  • 效率低下: 消费者需要通过 while True 循环不断地轮询 Redis,即使没有消息也会产生大量空请求,浪费 CPU 和网络资源。
  • 消息丢失风险: 如果消费者在 RPOP 之后、处理消息之前崩溃,那么这条消息就永远丢失了。

方法二:更高效的方式 - 使用 LIST 的阻塞命令

为了解决方法一中消费者轮询的低效问题,Redis 提供了 LIST 的阻塞版本命令。

  • 生产者命令: LPUSH queue_name message (不变)
  • 消费者命令: BRPOP queue_name timeout (阻塞地从右侧弹出)

BRPOP 会在没有消息时阻塞连接,直到有新消息进入或者超时。这极大地提高了效率,避免了空轮询。

消费者 (Consumer) 示例 (使用 BRPOP)

python
# consumer_blocking.py
import redis

# 连接到 Redis
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

queue_name = 'my_simple_queue'

print("阻塞型消费者已启动,等待消息...")
while True:
    # BRPOP 是一个阻塞操作
    # 它会等待消息,直到有消息可用或超时(0 表示永不超时)
    # 返回的是一个元组 (queue_name, message)
    result = r.brpop(queue_name, timeout=0)

    if result:
        queue, message = result
        print(f"从队列 '{queue}' 收到并处理消息: {message}")
        # 在这里执行你的业务逻辑

优点:

  • 高效: 消费者不再需要空轮询,连接会保持挂起状态,直到有消息到达,非常节省资源。

缺点:

  • 消息丢失风险依然存在: 和 RPOP 一样,如果消费者在收到消息后、处理完成前崩溃,消息同样会丢失。

方法三:【官方推荐】最可靠的方式 - 使用 STREAM

从 Redis 5.0 开始,引入了一个全新的、功能强大的数据结构:Stream。它专门为消息队列场景设计,解决了 LIST 方案的诸多痛点。

Stream 的核心特性:

  • 消息持久化: Stream 是一个追加式的日志结构,消息会一直保留。
  • 消费组 (Consumer Groups):允许多个消费者协作消费同一个 Stream,Redis 会确保每个消息只被组内的一个消费者处理。非常适合横向扩展。
  • 消息确认 (ACK): 消费者在处理完消息后,需要向 Redis 发送一个 XACK 命令,告知 Redis "这条消息我处理好了"。如果消费者崩溃而没有发送 ACK,这条消息会变成 "待处理" (Pending) 状态,可以被重新分配给其他消费者,从而保证了消息的至少一次消费 (At-least-once delivery)

生产者 (Producer) 示例 (使用 Stream)

python
# producer_stream.py
import redis
import time

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

stream_name = 'my_stream_queue'

print("Stream 生产者已启动...")
for i in range(10):
    message_body = {'id': i, 'content': f'stream_message_{i}'}
    # 使用 XADD 添加消息到 Stream
    # '*' 表示让 Redis 自动生成消息 ID
    message_id = r.xadd(stream_name, message_body)
    print(f"发送消息: {message_id} -> {message_body}")
    time.sleep(1)

print("所有消息发送完毕。")

消费者 (Consumer) 示例 (使用 Stream 和消费组)

python
# consumer_stream.py
import redis
import time

# 消费者配置
stream_name = 'my_stream_queue'
group_name = 'my_group'
consumer_name = 'consumer_1' # 在实际应用中,每个消费者实例应该有唯一的名字

r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# 1. 创建消费组 (如果不存在)
try:
    # '0' 表示从头开始消费,'$' 表示只消费新消息
    r.xgroup_create(stream_name, group_name, id='0', mkstream=True)
    print(f"消费组 '{group_name}' 已创建。")
except redis.exceptions.ResponseError as e:
    # 如果消费组已存在,会报错,这是正常现象
    print(f"消费组 '{group_name}' 已存在。")


print(f"消费者 '{consumer_name}' 已启动,等待消息...")
while True:
    # 2. 从消费组读取消息
    # '>' 表示只接收从未传递给任何其他消费者的消息
    # block=0 表示无限期阻塞等待
    # count=1 表示一次只取一条
    messages = r.xreadgroup(group_name, consumer_name, {stream_name: '>'}, count=1, block=0)

    if messages:
        for stream, msg_list in messages:
            for msg_id, msg_data in msg_list:
                print(f"消费者 '{consumer_name}' 收到并处理消息: {msg_id} -> {msg_data}")

                # 模拟处理消息需要一些时间
                time.sleep(2)

                # 3. 确认消息 (ACK)
                # 处理完成后,发送 ACK,将消息从 Pending 列表中移除
                r.xack(stream_name, group_name, msg_id)
                print(f"消息 {msg_id} 已确认 (ACK)。")

优点:

  • 高可靠性: 通过消费组和 ACK 机制,保证了消息不会因为消费者崩溃而丢失。
  • 高可用性/可扩展性: 可以轻松添加更多消费者到同一个消费组,共同处理消息,实现负载均衡。
  • 消息可追溯: Stream 是一个日志,可以随时查看历史消息,或者让新的消费组从头开始消费。

总结与选择

特性 方法一 (LIST) 方法二 (Blocking LIST) 方法三 (STREAM)
实现简单性 ⭐⭐⭐⭐⭐ (非常简单) ⭐⭐⭐⭐ (简单) ⭐⭐⭐ (稍复杂)
消费效率 ⭐ (低,轮询) ⭐⭐⭐⭐ (高,阻塞) ⭐⭐⭐⭐⭐ (高,阻塞)
消息可靠性 ⭐ (低,易丢失) ⭐ (低,易丢失) ⭐⭐⭐⭐⭐ (高,有ACK)
多消费者扩展 差 (需要自己实现逻辑) 差 (需要自己实现逻辑) ⭐⭐⭐⭐⭐ (原生支持消费组)
适用场景 临时脚本、简单任务 简单的、能容忍消息丢失的场景 所有生产环境、可靠性要求高的场景
Redis 版本 所有版本 所有版本 5.0+

结论:

  • 如果你只是想写个小脚本,或者进行快速原型开发,使用 LIST + BRPOP 是一个不错的选择,它简单且高效。
  • 但对于任何生产级别或对数据可靠性有要求的应用,强烈推荐使用 Redis Stream。它是 Redis 官方为消息队列场景提供的标准解决方案,功能强大且设计完善。
00:00
00:00