MQ如何保证消息的顺序性呢?
下文笔者讲述MQ保证消息顺序性的方法及示例分享,如下所示
MQ消息顺序性简介
MQ顺序性简介 消息顺序性分为两种类型: -全局顺序: 所有消息按照发送顺序被消费 -局部顺序: 特定业务标识的消息保持顺序(如同一订单的消息) ================================================== MQ保证消息顺序性的方法 1.分区策略:使用业务标识作为分区键,确保相关消息进入同一分区 2.消费者控制:单消费者保证严格顺序,多消费者保证局部顺序 3.内存队列:为每个业务标识维护独立处理队列 4.数据库控制:通过序列号机制验证和控制消息顺序 5.缓冲重排序:缓存乱序消息并按序处理 6.确认机制:只有顺序正确的消息才被确认
Kafka保证顺序性方法
单分区保证全局顺序 @Component public class KafkaGlobalOrderProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; // 所有消息发送到同一个分区,保证全局顺序 public void sendMessagesInOrder(list<BusinessEvent> events) { for (BusinessEvent event : events) { // 使用相同的key确保进入同一分区 kafkaTemplate.send("ordered-topic", "global-order-key", JSON.toJSONString(event)); } } } @Component public class KafkaGlobalOrderConsumer { // 单线程消费保证顺序 @KafkaListener(topics = "ordered-topic", groupId = "global-order-group", concurrency = "1") // 只有一个消费者线程 public void consumeOrderedMessages(ConsumerRecord<String, String> record) { BusinessEvent event = JSON.parseObject(record.value(), BusinessEvent.class); processEventInOrder(event); } }
局部顺序保证(推荐)
@Component public class KafkaPartitionedOrderProducer { // 按业务标识分区,保证局部顺序 public void sendOrderEvents(String orderId, List<OrderEvent> events) { for (OrderEvent event : events) { // 使用orderId作为key,确保同一订单的消息进入同一分区 kafkaTemplate.send("order-topic", orderId, JSON.toJSONString(event)); } } } @Component public class KafkaPartitionedOrderConsumer { // 多线程消费,但同一orderId的消息由同一消费者处理 @KafkaListener(topics = "order-topic", groupId = "order-processor", concurrency = "5") // 5个并发消费者 public void consumeOrderEvents(ConsumerRecord<String, String> record) { String orderId = record.key(); // 分区键就是orderId OrderEvent event = JSON.parseObject(record.value(), OrderEvent.class); // 同一orderId的消息总是由同一个消费者线程处理 processOrderEvent(orderId, event); } }
RabbitMQ顺序性保证
单消费者保证顺序 @Component public class RabbitMQOrderedProducer { @Autowired private RabbitTemplate rabbitTemplate; // 按业务标识发送消息 public void sendOrderMessages(String orderId, List<OrderMessage> messages) { for (OrderMessage message : messages) { // 可以添加序列号用于验证顺序 message.setSequence(generateSequence(orderId)); rabbitTemplate.convertAndSend("order.exchange", "order.routing.key", message); } } } @Component public class RabbitMQOrderedConsumer { // 单消费者保证顺序处理 @RabbitListener(queues = "order.queue", concurrency = "1") public void consumeOrderMessages(OrderMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { try { // 验证消息顺序 if (isMessageInOrder(message)) { processOrderMessage(message); channel.basicAck(deliveryTag, false); } else { // 消息顺序不正确,重新入队等待 log.warn("Message out of order: " + message); channel.basicNack(deliveryTag, false, true); // 重新入队 } } catch (Exception e) { log.error("Failed to process order message", e); try { channel.basicNack(deliveryTag, false, true); } catch (IOException ioException) { log.error("Failed to nack message", ioException); } } } private boolean isMessageInOrder(OrderMessage message) { // 检查数据库中的期望序列号 Long expectedSequence = getOrderExpectedSequence(message.getOrderId()); return message.getSequence() == expectedSequence; } }
内存队列保证局部顺序
@Component public class MemoryQueueOrderedConsumer { // 为每个业务标识维护一个处理队列 private final Map<String, BlockingQueue<OrderEvent>> orderQueues = new ConcurrentHashMap<>(); private final ExecutorService executorService = Executors.newCachedThreadPool(); @RabbitListener(queues = "order.queue") public void receiveOrderEvent(OrderEvent event) { String orderId = event.getOrderId(); // 为每个订单创建独立的处理队列 BlockingQueue<OrderEvent> queue = orderQueues.computeIfAbsent( orderId, k -> new LinkedBlockingQueue<>()); // 将消息加入对应订单的队列 queue.offer(event); // 启动该订单的处理线程(如果尚未启动) executorService.submit(() -> processOrderQueue(orderId, queue)); } private void processOrderQueue(String orderId, BlockingQueue<OrderEvent> queue) { while (true) { try { // 按顺序处理消息 OrderEvent event = queue.take(); processOrderEvent(event); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } catch (Exception e) { log.error("Failed to process order event for: " + orderId, e); } } } private void processOrderEvent(OrderEvent event) { // 实际的业务处理逻辑 log.info("Processing order event: " + event); } }
数据库控制顺序
序列号检查机制 @Entity public class MessageSequence { @Id private String businessId; // 业务标识(如orderId) private Long expectedSequence; // 期望的下一个序列号 private Date lastUpdateTime; } @Component public class DatabaseSequenceOrderedConsumer { @Autowired private MessageSequenceRepository sequenceRepository; @RabbitListener(queues = "ordered.queue") public void consumeOrderedMessage(OrderedMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { try { // 检查消息序列号 MessageSequence sequence = sequenceRepository.findById(message.getBusinessId()).orElse(null); if (sequence == null) { sequence = new MessageSequence(); sequence.setBusinessId(message.getBusinessId()); sequence.setExpectedSequence(1L); } if (message.getSequence() == sequence.getExpectedSequence()) { // 序列号正确,处理消息 processMessage(message); // 更新期望序列号 sequence.setExpectedSequence(sequence.getExpectedSequence() + 1); sequence.setLastUpdateTime(new Date()); sequenceRepository.save(sequence); channel.basicAck(deliveryTag, false); } else if (message.getSequence() < sequence.getExpectedSequence()) { // 重复消息,直接确认 log.warn("Duplicate message received: " + message); channel.basicAck(deliveryTag, false); } else { // 序列号不连续,消息乱序,重新入队 log.warn("Out of order message: " + message); channel.basicNack(deliveryTag, false, true); } } catch (Exception e) { log.error("Failed to process ordered message", e); try { channel.basicNack(deliveryTag, false, true); } catch (IOException ioException) { log.error("Failed to nack message", ioException); } } } }
Redis实现顺序处理
Redis Streams顺序消费 @Component public class RedisStreamsOrderedConsumer { @Autowired private redisTemplate<String, String> redisTemplate; // 创建消费者组 @PostConstruct public void initConsumerGroup() { try { redisTemplate.opsForStream().createGroup("order-stream", "order-group"); } catch (Exception e) { // 组可能已存在,忽略异常 log.debug("Consumer group may already exist", e); } } @Scheduled(fixedDelay = 1000) public void consumeOrderedMessages() { try { // 从消费者组读取消息 List<Map.Entry<String, List<StreamEntry>>> streams = redisTemplate.opsForStream().read(Consumer.from("order-group", "consumer-1"), StreamOffset.create("order-stream", ReadOffset.lastConsumed())); if (!streams.isEmpty()) { for (StreamEntry entry : streams.get(0).getValue()) { String messageId = entry.getId().getValue(); Map<String, String> fields = entry.getFields(); String businessId = fields.get("businessId"); String payload = fields.get("payload"); try { // 处理消息 processMessage(businessId, payload); // 确认消息处理完成 redisTemplate.opsForStream().acknowledge("order-stream", "order-group", messageId); } catch (Exception e) { log.error("Failed to process message: " + messageId, e); // 不确认消息,会重新消费 } } } } catch (Exception e) { log.error("Error consuming messages", e); } } }
顺序性处理最佳实践
消息重排序机制 @Component public class MessageReorderingBuffer { // 缓存未按顺序到达的消息 private final Map<String, TreeMap<Long, OrderedMessage>> buffer = new ConcurrentHashMap<>(); @RabbitListener(queues = "ordered.queue") public void receiveMessage(OrderedMessage message) { String businessId = message.getBusinessId(); Long sequence = message.getSequence(); // 获取该业务标识的消息缓冲区 TreeMap<Long, OrderedMessage> businessBuffer = buffer.computeIfAbsent( businessId, k -> new TreeMap<>()); // 缓存消息 businessBuffer.put(sequence, message); // 尝试处理连续的消息 processAvailableMessages(businessId, businessBuffer); } private void processAvailableMessages(String businessId, TreeMap<Long, OrderedMessage> buffer) { Long expectedSequence = getExpectedSequence(businessId); while (!buffer.isEmpty()) { Map.Entry<Long, OrderedMessage> entry = buffer.firstEntry(); if (entry.getKey().equals(expectedSequence)) { // 序列号匹配,处理消息 OrderedMessage message = entry.getValue(); processMessage(message); // 从缓冲区移除已处理的消息 buffer.pollFirstEntry(); expectedSequence++; // 更新期望序列号 updateExpectedSequence(businessId, expectedSequence); } else if (entry.getKey() < expectedSequence) { // 重复消息,直接移除 buffer.pollFirstEntry(); } else { // 等待缺失的消息,停止处理 break; } } // 清理空的缓冲区 if (buffer.isEmpty()) { this.buffer.remove(businessId); } } }
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。