redis 如何实现MQ队列呢?
下文笔者讲述Redis中实现MQ功能的简介说明,如下所示
Redis可使用多种方式实现消息队列(MQ)功能
方式1: 使用list结构 方式2: 使用Pub/SUB模式 方式3: 使用 Streams(推荐) 方式4: 使用Sorted Set实现延时队列
1.使用List结构实现队列
Redis的`List`数据结构可以实现简单的消息队列 - 生产者:使用 `LPUSH` 或 `RPUSH` 命令将消息推入队列 - 消费者:使用 `RPOP` 或 `LPOP` 命令从队列中取出消息 - 阻塞式消费:使用 `BRPOP` 或 `BLPOP` 实现阻塞式等待消息例
//生产者示例
jedis.lpush("queue:messages", message);
//消费者示例(阻塞式)
List<String> result = jedis.brpop(0, "queue:messages");
String message = result.get(1);
2.使用Pub/Sub发布订阅模式
Redis 的发布订阅模式支持一对多的消息分发 - 发布者:使用 `PUBLISH` 命令发送消息到频道 - 订阅者:使用 `SUBSCRIBE` 命令订阅频道接收消息 - 特点:不持久化消息,订阅者只能收到订阅之后的消息例
// 发布消息
jedis.publish("channel:orders", message);
// 订阅频道
jedis.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
// 处理消息
}
}, "channel:orders");
3.使用Streams(笔者强烈推荐此方式)
Redis 5.0+ 引入的 `Streams` 是专门用于消息队列的数据结构: - 支持消息持久化 - 支持消费者组(Consumer Groups) - 支持消息确认机制(ACK) - 支持消息回溯和重复消费例
// 添加消息到流
jedis.xadd("stream:orders", StreamEntryID.NEW_ENTRY, Map.of("data", message));
// 消费者组消费
jedis.xgroupCreate("stream:orders", "group1", null, false);
List<StreamEntry> entries = jedis.xreadGroup("group1", "consumer1",
1, 0, false, new StreamEntryID(), "stream:orders");
4.使用Sorted Set实现延时队列
通过 `Sorted Set` 可以实现延时消息队列: - 使用时间戳作为 score - 通过 `ZRANGEBYSCORE` 查询到期消息 - 适用于需要延时处理的场景例
// 添加延时消息
long deliverTime = System.currentTimeMillis() + delayMillis;
jedis.zadd("delayed:queue", deliverTime, message);
// 获取到期消息
long now = System.currentTimeMillis();
Set<String> messages = jedis.zrangeByScore("delayed:queue", 0, now);
| 方案 | 持久化 | 消费确认 | 多消费者 | 延时消息 | 适用场景 |
| List | 是 | 需自行实现 | 支持 | 不支持 | 简单队列场景 |
| Pub/Sub | 否 | 不支持 | 支持 | 不支持 | 实时广播场景 |
| Streams | 是 | 支持 | 支持 | 不直接支持 | 完整MQ场景 |
| Sorted Set | 是 | 需自行实现 | 支持 | 支持 | 延时队列场景 |
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。


