Redis消息队列
2026年2月26日大约 6 分钟
Redis消息队列
消息队列 Message Queue:简单来说就是存放消息的队列。
- 基本消息队列模型包括3个角色:
- 消息队列:存储和管理消息,也被称为消息代理。
- 生产者:发生消息到消息队列。
- 消费者:从消息队列中获取消息并处理。
- Redis提供了三种方式实现消息队列:
- 列表(List)数据结构:生产者将消息推入列表的尾部,消费者从列表的头部弹出消息进行处理。
- 发布/订阅(Pub/Sub)模式:生产者发布消息到一个频道,消费者订阅这个频道来接收消息。
- 流(Stream)数据结构:提供了更复杂的功能,如消息分组、消费者组等,适用于更复杂的消息队列场景。
基于List结构模拟信息队列
提示
了解即可
Redis的List数据结构是一个双向链表,可以用来实现简单的消息队列。生产者使用RPUSH命令将消息推入列表的尾部,消费者使用LPOP命令从列表的头部弹出消息进行处理。
但是这种方式没有消息时会返回nil。
- 因此可以使用
BLPOP命令来实现阻塞式的消息队列,当列表为空时,消费者会被阻塞,直到有新的消息被推入列表。 - 优点:
- 利用Redis存储,不受JVM内存上限影响。
- 基于Redis持久化机制,数据安全可靠。
- 满足有序性。
- 缺点:
- 消息丢失
- 只能支持单个消费者。
基于发布订阅模式实现消息队列
提示
了解即可
发布订阅模式(PubSub)是一种消息通信模式,生产者发布消息到一个频道,消费者订阅这个频道来接收消息。
消费者可以订阅一个或多个频道,当生产者发布消息到这些频道时,所有订阅了这些频道的消费者都会收到消息。
SUBSCRIBE channel:订阅或多个频道PUBLISH channel message:向一个频道发布消息PSUBSCRIBE pattern:订阅与pattern格式匹配的所有频道- 优点:
- 支持多个生产者消费者。
- 缺点:
- 不支持数据持久化。
- 无法避免消息丢失。
- 消息堆积有上限,超出数据丢失。
Stream数据结构实现消息队列-重点
Redis 5.0引入了Stream数据结构,提供了更复杂的功能,如消息分组、消费者组等,适用于更复杂的消息队列场景。
- 发送消息:使用
XADD命令将消息添加到Stream中。- 语法:
XADD key ID field value [field value ...] - 参数说明:
key:Stream的名称。ID:消息ID,可以是*,表示由Redis自动生成一个唯一的ID。格式为时间戳-递增数字,例如1609459200000-0。field value [field value ...]:消息的字段和值,可以有多个字段值对。
- 示例:
XADD mystream * field1 value1 field2 value2:向名为mystream的Stream中添加一条消息,消息ID由Redis自动生成,包含两个字段field1和field2。
- 语法:
- 读取消息:使用
XREAD命令从Stream中读取消息。- 语法:
XREAD COUNT count [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] - 参数说明:
COUNT count:指定要读取的消息数量。BLOCK milliseconds:指定阻塞时间,单位为毫秒,如果没有消息可读,消费者会被阻塞直到有新消息到来或超时。STREAMS key [key ...]:指定要读取的Stream名称,可以一次读取多个Stream。ID [ID ...]:指定要读取的消息ID,可以是>,表示从上次读取的位置开始读取新消息。0表示从Stream的开始位置读取消息。$表示从Stream的末尾位置读取消息,读最新的消息。>表示从上次读取的位置开始读取新消息。
- 示例:
XREAD COUNT 10 STREAMS mystream >:从名为mystream的Stream中读取最多10条新消息。
- 语法:
- 优点:
- 消息可重新读取:消费者可以根据消息ID重新读取消息,避免了消息丢失的问题。
- 一个消息可以被多个消费者消费:通过消费者组功能,一个消息可以被多个消费者消费,满足了多消费者的需求。
- 可以阻塞读。
- 缺点:
- 有消息漏读风险。
消费者组
消费者组:将多个消费者划分成一个组来消费Stream中的消息。消费者组的主要功能包括:
- 消息分流:队列中的消息会分流给组内不同消费者,而不是重复消费。
- 消息表示:消费者组会维护一个标识,用来记录最后一个被处理的消息,消费者重启后仍然可以通过标识读取消息,避免了消息丢失的问题。
- 消息确认:消费者获取到消息后,消息会处于pending状态,消费者需要通过
XACK命令来确认消息已经被处理,只有被确认的消息才会从pending状态中移除。
创建消费者组
使用XGROUP CREATE命令创建一个消费者组。
- 语法:
XGROUP CREATE key groupname ID [MKSTREAM] - 参数说明:
key:Stream的名称。groupname:消费者组的名称。ID:指定消费者组的起始位置,可以是0,表示从Stream的开始位置读取消息,或者$,表示从Stream的末尾位置读取消息。MKSTREAM:如果Stream不存在,是否创建一个空的Stream。
- 示例:
XGROUP CREATE mystream mygroup 0:在名为mystream的Stream上创建一个名为mygroup的消费者组,起始位置为Stream的开始位置。
从消费者组中读取消息
使用XREADGROUP命令从消费者组中读取消息。
- 语法:
XREADGROUP GROUP groupname consumername COUNT count [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] - 参数说明:
GROUP groupname consumername:指定消费者组的名称和消费者的名称。COUNT count:指定要读取的消息数量。BLOCK milliseconds:指定阻塞时间,单位为毫秒,如果没有消息可读,消费者会被阻塞直到有新消息到来或超时。STREAMS key [key ...]:指定要读取的Stream名称,可以一次读取多个Stream。ID [ID ...]:指定要读取的消息ID,可以是>,表示从下一个未消费的消息开始读取。>表示从下一个未消费的消息开始读取。——常用- 其他则表示从pending-list中获取已经消费但未确认的消息,适用于消费者重启后重新获取消息。
- 示例:
XREADGROUP GROUP mygroup consumer1 COUNT 10 STREAMS mystream >:从名为mygroup的消费者组中,消费者consumer1读取名为mystream的Stream中最多10条新消息。