redis 如何实现MQ队列呢?
下文笔者讲述Redis中实现MQ功能的简介说明,如下所示
Redis可使用多种方式实现消息队列(MQ)功能
方式1: 使用list结构 方式2: 使用Pub/SUB模式 方式3: 使用 Streams(推荐) 方式4: 使用Sorted Set实现延时队列
1.使用List结构实现队列
Redis的`List`数据结构可以实现简单的消息队列 - 生产者:使用 `LPUSH` 或 `RPUSH` 命令将消息推入队列 - 消费者:使用 `RPOP` 或 `LPOP` 命令从队列中取出消息 - 阻塞式消费:使用 `BRPOP` 或 `BLPOP` 实现阻塞式等待消息例
//生产者示例 jedis.lpush("queue:messages", message); //消费者示例(阻塞式) List<String> result = jedis.brpop(0, "queue:messages"); String message = result.get(1);
2.使用Pub/Sub发布订阅模式
Redis 的发布订阅模式支持一对多的消息分发 - 发布者:使用 `PUBLISH` 命令发送消息到频道 - 订阅者:使用 `SUBSCRIBE` 命令订阅频道接收消息 - 特点:不持久化消息,订阅者只能收到订阅之后的消息例
// 发布消息 jedis.publish("channel:orders", message); // 订阅频道 jedis.subscribe(new JedisPubSub() { @Override public void onMessage(String channel, String message) { // 处理消息 } }, "channel:orders");
3.使用Streams(笔者强烈推荐此方式)
Redis 5.0+ 引入的 `Streams` 是专门用于消息队列的数据结构: - 支持消息持久化 - 支持消费者组(Consumer Groups) - 支持消息确认机制(ACK) - 支持消息回溯和重复消费例
// 添加消息到流 jedis.xadd("stream:orders", StreamEntryID.NEW_ENTRY, Map.of("data", message)); // 消费者组消费 jedis.xgroupCreate("stream:orders", "group1", null, false); List<StreamEntry> entries = jedis.xreadGroup("group1", "consumer1", 1, 0, false, new StreamEntryID(), "stream:orders");
4.使用Sorted Set实现延时队列
通过 `Sorted Set` 可以实现延时消息队列: - 使用时间戳作为 score - 通过 `ZRANGEBYSCORE` 查询到期消息 - 适用于需要延时处理的场景例
// 添加延时消息 long deliverTime = System.currentTimeMillis() + delayMillis; jedis.zadd("delayed:queue", deliverTime, message); // 获取到期消息 long now = System.currentTimeMillis(); Set<String> messages = jedis.zrangeByScore("delayed:queue", 0, now);
方案 | 持久化 | 消费确认 | 多消费者 | 延时消息 | 适用场景 |
List | 是 | 需自行实现 | 支持 | 不支持 | 简单队列场景 |
Pub/Sub | 否 | 不支持 | 支持 | 不支持 | 实时广播场景 |
Streams | 是 | 支持 | 支持 | 不直接支持 | 完整MQ场景 |
Sorted Set | 是 | 需自行实现 | 支持 | 支持 | 延时队列场景 |
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。