redis 如何实现MQ队列呢?

欢喜 Java每日一问 发布时间:2025-08-06 14:47:06 阅读数:7826 1
下文笔者讲述Redis中实现MQ功能的简介说明,如下所示

Redis可使用多种方式实现消息队列(MQ)功能

方式1:
   使用list结构

方式2:
   使用Pub/SUB模式

方式3:
   使用 Streams(推荐) 

方式4:
   使用Sorted Set实现延时队列

1.使用List结构实现队列

Redis的`List`数据结构可以实现简单的消息队列
- 生产者:使用 `LPUSH` 或 `RPUSH` 命令将消息推入队列
- 消费者:使用 `RPOP` 或 `LPOP` 命令从队列中取出消息
- 阻塞式消费:使用 `BRPOP` 或 `BLPOP` 实现阻塞式等待消息
 
//生产者示例
jedis.lpush("queue:messages", message);

//消费者示例(阻塞式)
List<String> result = jedis.brpop(0, "queue:messages");
String message = result.get(1);

2.使用Pub/Sub发布订阅模式

Redis 的发布订阅模式支持一对多的消息分发
	- 发布者:使用 `PUBLISH` 命令发送消息到频道
	- 订阅者:使用 `SUBSCRIBE` 命令订阅频道接收消息
	- 特点:不持久化消息,订阅者只能收到订阅之后的消息
 
// 发布消息
jedis.publish("channel:orders", message);

// 订阅频道
jedis.subscribe(new JedisPubSub() {
    @Override
    public void onMessage(String channel, String message) {
        // 处理消息
    }
}, "channel:orders");

3.使用Streams(笔者强烈推荐此方式)

Redis 5.0+ 引入的 `Streams` 是专门用于消息队列的数据结构:
- 支持消息持久化
- 支持消费者组(Consumer Groups)
- 支持消息确认机制(ACK)
- 支持消息回溯和重复消费
 
// 添加消息到流
jedis.xadd("stream:orders", StreamEntryID.NEW_ENTRY, Map.of("data", message));

// 消费者组消费
jedis.xgroupCreate("stream:orders", "group1", null, false);
List<StreamEntry> entries = jedis.xreadGroup("group1", "consumer1", 
    1, 0, false, new StreamEntryID(), "stream:orders");

4.使用Sorted Set实现延时队列

通过 `Sorted Set` 可以实现延时消息队列:
 - 使用时间戳作为 score
 - 通过 `ZRANGEBYSCORE` 查询到期消息
 - 适用于需要延时处理的场景
 
// 添加延时消息
long deliverTime = System.currentTimeMillis() + delayMillis;
jedis.zadd("delayed:queue", deliverTime, message);

// 获取到期消息
long now = System.currentTimeMillis();
Set<String> messages = jedis.zrangeByScore("delayed:queue", 0, now);
方案 持久化 消费确认 多消费者 延时消息 适用场景
List 需自行实现 支持 不支持 简单队列场景
Pub/Sub 不支持 支持 不支持 实时广播场景
Streams 支持 支持 不直接支持 完整MQ场景
Sorted Set 需自行实现 支持 支持 延时队列场景
版权声明

本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。

本文链接: https://www.Java265.com/JavaProblem/202508/8503.html

最近发表

热门文章

好文推荐

Java265.com

https://www.java265.com

站长统计|粤ICP备14097017号-3

Powered By Java265.com信息维护小组

使用手机扫描二维码

关注我们看更多资讯

java爱好者