Redis Stream 高性能消息队列详解
本文讲解Redis 5.0的Stream:一个支持持久化、消费组和ACK机制的强大消息队列,可用于构建类似Kafka的可靠消息系统。
Redis 5.0 推出的 Stream 类型是一个非常重要且强大的新数据结构。你可以把它理解为一个内置在 Redis 中的、功能完备的消息队列(Message Queue)。
它的设计灵感主要来源于 Apache Kafka,旨在解决 Redis 之前方案(如 Pub/Sub 或 Lists)在消息队列场景下的不足。
1. 核心概念:一个可持久化的日志(Append-only Log)
从根本上说,一个 Stream 是一个仅追加(Append-only)的数据结构。你可以把它想象成一个日志文件或者一个账本:
- 只能在末尾追加新条目,不能在中间插入或修改。
- 每个条目(消息)都有一个唯一的、自动递增的 ID。
- 数据是持久化的(遵循 Redis 的持久化策略 RDB/AOF),不像 Pub/Sub 那样发后即忘。
2. Stream 的关键组成部分
一个 Stream 主要由以下几个概念构成:
a. 条目 (Entry/Message)
每个添加到 Stream 中的消息就是一个条目。它不是一个简单的字符串,而是一个包含多个键值对(field-value pairs)的字典,类似于一个小的 Hash。
b. 条目 ID (Entry ID)
这是 Stream 最核心的设计之一。每个条目都有一个唯一的 ID,格式为 <timestamp>-<sequence>,例如 1672531200000-0。
<timestamp>:毫秒级的时间戳,由 Redis 服务器生成。这保证了 ID 是大致按时间排序的。<sequence>:序列号。用于区分同一毫秒内产生的多个条目。
这个 ID 保证了消息的唯一性和有序性,并且客户端也可以自己指定 ID。
c. 消费组 (Consumer Group) - 最强大的特性
这是 Stream 区别于简单列表(List)的关键。一个消费组允许多个消费者协同消费同一个 Stream 里的消息。
- 负载均衡:Stream 会把消息分发给组内的不同消费者,确保一条消息只被组内的一个消费者处理。
- 状态记录:消费组会记录组内每个消费者“读到哪里了”(即最后一条被消费的消息 ID)。
- 故障转移:如果一个消费者宕机,它正在处理但尚未确认(ACK)的消息可以被重新分配给组内的其他消费者进行处理。
d. 待处理条目列表 (Pending Entries List, PEL)
每个消费组都有一个 PEL。当一个消费者读取了一条消息后,这条消息就会被放入 PEL,表示“已被读取,但尚未处理完成”。消费者处理完消息后,需要发送一个 XACK 命令来告知 Redis,然后 Redis 才会将该消息从 PEL 中移除。这实现了消息确认(ACK)机制。
3. 核心命令
写入
XADD key ID field string [field string ...]:向 Stream 中添加一条新消息。ID 可以用*让 Redis 自动生成。
plaintext# 向 a_stream 添加一条消息,内容是 { "name": "alice", "age": "30" } XADD a_stream * name alice age 30读取(独立消费)
XRANGE key start end [COUNT count]:获取指定 ID 范围内的消息列表(类似LRANGE)。XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]:从一个或多个 Stream 中读取消息,可以阻塞等待新消息。
消费组相关操作
XGROUP CREATE key groupname ID|$:创建一个消费组。XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key >:从消费组中读取消息。>表示只读取尚未被消费的新消息。这是消费组最常用的命令。XACK key group ID [ID ...]:向消费组确认消息已处理完成。XPENDING key group:查看待处理(未 ACK)的消息列表。XCLAIM ...:当某个消费者长时间未 ACK 消息(可能已宕机),允许其他消费者“认领”这些消息并重新处理。
4. Stream vs. 之前的方案
| 特性 | List (BRPOP) | Pub/Sub | Stream |
|---|---|---|---|
| 消息持久化 | 是 | 否 | 是 |
| 多消费者 | 不支持(一条消息只能被一个消费者弹出) | 支持(广播给所有订阅者) | 支持(通过消费组实现负载均衡) |
| 消息确认(ACK) | 否 | 否 | 是 |
| 历史消息回溯 | 不支持(弹出后就没了) | 否 | 是(可从任意ID开始读取) |
| 数据结构 | 简单字符串列表 | 简单字符串 | 键值对集合 |
总结
Redis Stream 是一个功能强大的数据结构,它将 Redis 从一个缓存和简单数据结构服务器,提升为了一个轻量级的、高性能的、可靠的消息中间件。
它特别适用于以下场景:
- 可靠的消息队列/任务队列:替代 RabbitMQ, Kafka 等更重的中间件。
- 事件溯源(Event Sourcing):记录系统中发生的所有状态变更事件。
- 日志收集:作为中心化的日志收集点。
- 实时数据管道:连接不同的微服务。
如果你需要在 Redis 中实现一个可靠的消息传递系统,那么 Stream 是目前最佳且唯一的选择。