MQ如何保证消息的顺序性呢?

欢喜 rabbitmq 发布时间:2025-08-06 16:46:21 阅读数:13863 1
下文笔者讲述MQ保证消息顺序性的方法及示例分享,如下所示

MQ消息顺序性简介

 
MQ顺序性简介
 消息顺序性分为两种类型:
  -全局顺序:
     所有消息按照发送顺序被消费
  -局部顺序:
     特定业务标识的消息保持顺序(如同一订单的消息)
==================================================
MQ保证消息顺序性的方法
	1.分区策略:使用业务标识作为分区键,确保相关消息进入同一分区
	2.消费者控制:单消费者保证严格顺序,多消费者保证局部顺序
	3.内存队列:为每个业务标识维护独立处理队列
	4.数据库控制:通过序列号机制验证和控制消息顺序
	5.缓冲重排序:缓存乱序消息并按序处理
	6.确认机制:只有顺序正确的消息才被确认

Kafka保证顺序性方法

 单分区保证全局顺序

@Component
public class KafkaGlobalOrderProducer {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    // 所有消息发送到同一个分区,保证全局顺序
    public void sendMessagesInOrder(list<BusinessEvent> events) {
        for (BusinessEvent event : events) {
            // 使用相同的key确保进入同一分区
            kafkaTemplate.send("ordered-topic", "global-order-key", 
                             JSON.toJSONString(event));
        }
    }
}

@Component
public class KafkaGlobalOrderConsumer {
    
    // 单线程消费保证顺序
    @KafkaListener(topics = "ordered-topic", 
                   groupId = "global-order-group",
                   concurrency = "1") // 只有一个消费者线程
    public void consumeOrderedMessages(ConsumerRecord<String, String> record) {
        BusinessEvent event = JSON.parseObject(record.value(), BusinessEvent.class);
        processEventInOrder(event);
    }
}

局部顺序保证(推荐)


@Component
public class KafkaPartitionedOrderProducer {
    
    // 按业务标识分区,保证局部顺序
    public void sendOrderEvents(String orderId, List<OrderEvent> events) {
        for (OrderEvent event : events) {
            // 使用orderId作为key,确保同一订单的消息进入同一分区
            kafkaTemplate.send("order-topic", orderId, JSON.toJSONString(event));
        }
    }
}

@Component
public class KafkaPartitionedOrderConsumer {
    
    // 多线程消费,但同一orderId的消息由同一消费者处理
    @KafkaListener(topics = "order-topic", 
                   groupId = "order-processor",
                   concurrency = "5") // 5个并发消费者
    public void consumeOrderEvents(ConsumerRecord<String, String> record) {
        String orderId = record.key(); // 分区键就是orderId
        OrderEvent event = JSON.parseObject(record.value(), OrderEvent.class);
        
        // 同一orderId的消息总是由同一个消费者线程处理
        processOrderEvent(orderId, event);
    }
}

RabbitMQ顺序性保证

单消费者保证顺序

@Component
public class RabbitMQOrderedProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    // 按业务标识发送消息
    public void sendOrderMessages(String orderId, List<OrderMessage> messages) {
        for (OrderMessage message : messages) {
            // 可以添加序列号用于验证顺序
            message.setSequence(generateSequence(orderId));
            rabbitTemplate.convertAndSend("order.exchange", "order.routing.key", message);
        }
    }
}

@Component
public class RabbitMQOrderedConsumer {
    
    // 单消费者保证顺序处理
    @RabbitListener(queues = "order.queue", concurrency = "1")
    public void consumeOrderMessages(OrderMessage message, Channel channel,
                                    @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            // 验证消息顺序
            if (isMessageInOrder(message)) {
                processOrderMessage(message);
                channel.basicAck(deliveryTag, false);
            } else {
                // 消息顺序不正确,重新入队等待
                log.warn("Message out of order: " + message);
                channel.basicNack(deliveryTag, false, true); // 重新入队
            }
        } catch (Exception e) {
            log.error("Failed to process order message", e);
            try {
                channel.basicNack(deliveryTag, false, true);
            } catch (IOException ioException) {
                log.error("Failed to nack message", ioException);
            }
        }
    }
    
    private boolean isMessageInOrder(OrderMessage message) {
        // 检查数据库中的期望序列号
        Long expectedSequence = getOrderExpectedSequence(message.getOrderId());
        return message.getSequence() == expectedSequence;
    }
}

内存队列保证局部顺序

@Component
public class MemoryQueueOrderedConsumer {
    
    // 为每个业务标识维护一个处理队列
    private final Map<String, BlockingQueue<OrderEvent>> orderQueues = new ConcurrentHashMap<>();
    private final ExecutorService executorService = Executors.newCachedThreadPool();
    
    @RabbitListener(queues = "order.queue")
    public void receiveOrderEvent(OrderEvent event) {
        String orderId = event.getOrderId();
        
        // 为每个订单创建独立的处理队列
        BlockingQueue<OrderEvent> queue = orderQueues.computeIfAbsent(
            orderId, k -> new LinkedBlockingQueue<>());
        
        // 将消息加入对应订单的队列
        queue.offer(event);
        
        // 启动该订单的处理线程(如果尚未启动)
        executorService.submit(() -> processOrderQueue(orderId, queue));
    }
    
    private void processOrderQueue(String orderId, BlockingQueue<OrderEvent> queue) {
        while (true) {
            try {
                // 按顺序处理消息
                OrderEvent event = queue.take();
                processOrderEvent(event);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                log.error("Failed to process order event for: " + orderId, e);
            }
        }
    }
    
    private void processOrderEvent(OrderEvent event) {
        // 实际的业务处理逻辑
        log.info("Processing order event: " + event);
    }
}

数据库控制顺序

 序列号检查机制

@Entity
public class MessageSequence {
    @Id
    private String businessId; // 业务标识(如orderId)
    private Long expectedSequence; // 期望的下一个序列号
    private Date lastUpdateTime;
}

@Component
public class DatabaseSequenceOrderedConsumer {
    
    @Autowired
    private MessageSequenceRepository sequenceRepository;
    
    @RabbitListener(queues = "ordered.queue")
    public void consumeOrderedMessage(OrderedMessage message, Channel channel,
                                     @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            // 检查消息序列号
            MessageSequence sequence = sequenceRepository.findById(message.getBusinessId()).orElse(null);
            
            if (sequence == null) {
                sequence = new MessageSequence();
                sequence.setBusinessId(message.getBusinessId());
                sequence.setExpectedSequence(1L);
            }
            
            if (message.getSequence() == sequence.getExpectedSequence()) {
                // 序列号正确,处理消息
                processMessage(message);
                
                // 更新期望序列号
                sequence.setExpectedSequence(sequence.getExpectedSequence() + 1);
                sequence.setLastUpdateTime(new Date());
                sequenceRepository.save(sequence);
                
                channel.basicAck(deliveryTag, false);
                
            } else if (message.getSequence() < sequence.getExpectedSequence()) {
                // 重复消息,直接确认
                log.warn("Duplicate message received: " + message);
                channel.basicAck(deliveryTag, false);
                
            } else {
                // 序列号不连续,消息乱序,重新入队
                log.warn("Out of order message: " + message);
                channel.basicNack(deliveryTag, false, true);
            }
            
        } catch (Exception e) {
            log.error("Failed to process ordered message", e);
            try {
                channel.basicNack(deliveryTag, false, true);
            } catch (IOException ioException) {
                log.error("Failed to nack message", ioException);
            }
        }
    }
}

Redis实现顺序处理

 Redis Streams顺序消费

@Component
public class RedisStreamsOrderedConsumer {
    
    @Autowired
    private redisTemplate<String, String> redisTemplate;
    
    // 创建消费者组
    @PostConstruct
    public void initConsumerGroup() {
        try {
            redisTemplate.opsForStream().createGroup("order-stream", "order-group");
        } catch (Exception e) {
            // 组可能已存在,忽略异常
            log.debug("Consumer group may already exist", e);
        }
    }
    
    @Scheduled(fixedDelay = 1000)
    public void consumeOrderedMessages() {
        try {
            // 从消费者组读取消息
            List<Map.Entry<String, List<StreamEntry>>> streams = 
                redisTemplate.opsForStream().read(Consumer.from("order-group", "consumer-1"),
                    StreamOffset.create("order-stream", ReadOffset.lastConsumed()));
            
            if (!streams.isEmpty()) {
                for (StreamEntry entry : streams.get(0).getValue()) {
                    String messageId = entry.getId().getValue();
                    Map<String, String> fields = entry.getFields();
                    
                    String businessId = fields.get("businessId");
                    String payload = fields.get("payload");
                    
                    try {
                        // 处理消息
                        processMessage(businessId, payload);
                        
                        // 确认消息处理完成
                        redisTemplate.opsForStream().acknowledge("order-stream", "order-group", messageId);
                        
                    } catch (Exception e) {
                        log.error("Failed to process message: " + messageId, e);
                        // 不确认消息,会重新消费
                    }
                }
            }
        } catch (Exception e) {
            log.error("Error consuming messages", e);
        }
    }
}

顺序性处理最佳实践

 消息重排序机制

@Component
public class MessageReorderingBuffer {
    
    // 缓存未按顺序到达的消息
    private final Map<String, TreeMap<Long, OrderedMessage>> buffer = new ConcurrentHashMap<>();
    
    @RabbitListener(queues = "ordered.queue")
    public void receiveMessage(OrderedMessage message) {
        String businessId = message.getBusinessId();
        Long sequence = message.getSequence();
        
        // 获取该业务标识的消息缓冲区
        TreeMap<Long, OrderedMessage> businessBuffer = buffer.computeIfAbsent(
            businessId, k -> new TreeMap<>());
        
        // 缓存消息
        businessBuffer.put(sequence, message);
        
        // 尝试处理连续的消息
        processAvailableMessages(businessId, businessBuffer);
    }
    
    private void processAvailableMessages(String businessId, TreeMap<Long, OrderedMessage> buffer) {
        Long expectedSequence = getExpectedSequence(businessId);
        
        while (!buffer.isEmpty()) {
            Map.Entry<Long, OrderedMessage> entry = buffer.firstEntry();
            
            if (entry.getKey().equals(expectedSequence)) {
                // 序列号匹配,处理消息
                OrderedMessage message = entry.getValue();
                processMessage(message);
                
                // 从缓冲区移除已处理的消息
                buffer.pollFirstEntry();
                expectedSequence++;
                
                // 更新期望序列号
                updateExpectedSequence(businessId, expectedSequence);
            } else if (entry.getKey() < expectedSequence) {
                // 重复消息,直接移除
                buffer.pollFirstEntry();
            } else {
                // 等待缺失的消息,停止处理
                break;
            }
        }
        
        // 清理空的缓冲区
        if (buffer.isEmpty()) {
            this.buffer.remove(businessId);
        }
    }
}
版权声明

本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。

本文链接: https://www.Java265.com/rabbitmq/2025/8507.html

最近发表

热门文章

好文推荐

Java265.com

https://www.java265.com

站长统计|粤ICP备14097017号-3

Powered By Java265.com信息维护小组

使用手机扫描二维码

关注我们看更多资讯

java爱好者