MQ如何保证消息的顺序性呢?
下文笔者讲述MQ保证消息顺序性的方法及示例分享,如下所示
MQ消息顺序性简介
MQ顺序性简介
消息顺序性分为两种类型:
-全局顺序:
所有消息按照发送顺序被消费
-局部顺序:
特定业务标识的消息保持顺序(如同一订单的消息)
==================================================
MQ保证消息顺序性的方法
1.分区策略:使用业务标识作为分区键,确保相关消息进入同一分区
2.消费者控制:单消费者保证严格顺序,多消费者保证局部顺序
3.内存队列:为每个业务标识维护独立处理队列
4.数据库控制:通过序列号机制验证和控制消息顺序
5.缓冲重排序:缓存乱序消息并按序处理
6.确认机制:只有顺序正确的消息才被确认
Kafka保证顺序性方法
单分区保证全局顺序
@Component
public class KafkaGlobalOrderProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// 所有消息发送到同一个分区,保证全局顺序
public void sendMessagesInOrder(list<BusinessEvent> events) {
for (BusinessEvent event : events) {
// 使用相同的key确保进入同一分区
kafkaTemplate.send("ordered-topic", "global-order-key",
JSON.toJSONString(event));
}
}
}
@Component
public class KafkaGlobalOrderConsumer {
// 单线程消费保证顺序
@KafkaListener(topics = "ordered-topic",
groupId = "global-order-group",
concurrency = "1") // 只有一个消费者线程
public void consumeOrderedMessages(ConsumerRecord<String, String> record) {
BusinessEvent event = JSON.parseObject(record.value(), BusinessEvent.class);
processEventInOrder(event);
}
}
局部顺序保证(推荐)
@Component
public class KafkaPartitionedOrderProducer {
// 按业务标识分区,保证局部顺序
public void sendOrderEvents(String orderId, List<OrderEvent> events) {
for (OrderEvent event : events) {
// 使用orderId作为key,确保同一订单的消息进入同一分区
kafkaTemplate.send("order-topic", orderId, JSON.toJSONString(event));
}
}
}
@Component
public class KafkaPartitionedOrderConsumer {
// 多线程消费,但同一orderId的消息由同一消费者处理
@KafkaListener(topics = "order-topic",
groupId = "order-processor",
concurrency = "5") // 5个并发消费者
public void consumeOrderEvents(ConsumerRecord<String, String> record) {
String orderId = record.key(); // 分区键就是orderId
OrderEvent event = JSON.parseObject(record.value(), OrderEvent.class);
// 同一orderId的消息总是由同一个消费者线程处理
processOrderEvent(orderId, event);
}
}
RabbitMQ顺序性保证
单消费者保证顺序
@Component
public class RabbitMQOrderedProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
// 按业务标识发送消息
public void sendOrderMessages(String orderId, List<OrderMessage> messages) {
for (OrderMessage message : messages) {
// 可以添加序列号用于验证顺序
message.setSequence(generateSequence(orderId));
rabbitTemplate.convertAndSend("order.exchange", "order.routing.key", message);
}
}
}
@Component
public class RabbitMQOrderedConsumer {
// 单消费者保证顺序处理
@RabbitListener(queues = "order.queue", concurrency = "1")
public void consumeOrderMessages(OrderMessage message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
// 验证消息顺序
if (isMessageInOrder(message)) {
processOrderMessage(message);
channel.basicAck(deliveryTag, false);
} else {
// 消息顺序不正确,重新入队等待
log.warn("Message out of order: " + message);
channel.basicNack(deliveryTag, false, true); // 重新入队
}
} catch (Exception e) {
log.error("Failed to process order message", e);
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
log.error("Failed to nack message", ioException);
}
}
}
private boolean isMessageInOrder(OrderMessage message) {
// 检查数据库中的期望序列号
Long expectedSequence = getOrderExpectedSequence(message.getOrderId());
return message.getSequence() == expectedSequence;
}
}
内存队列保证局部顺序
@Component
public class MemoryQueueOrderedConsumer {
// 为每个业务标识维护一个处理队列
private final Map<String, BlockingQueue<OrderEvent>> orderQueues = new ConcurrentHashMap<>();
private final ExecutorService executorService = Executors.newCachedThreadPool();
@RabbitListener(queues = "order.queue")
public void receiveOrderEvent(OrderEvent event) {
String orderId = event.getOrderId();
// 为每个订单创建独立的处理队列
BlockingQueue<OrderEvent> queue = orderQueues.computeIfAbsent(
orderId, k -> new LinkedBlockingQueue<>());
// 将消息加入对应订单的队列
queue.offer(event);
// 启动该订单的处理线程(如果尚未启动)
executorService.submit(() -> processOrderQueue(orderId, queue));
}
private void processOrderQueue(String orderId, BlockingQueue<OrderEvent> queue) {
while (true) {
try {
// 按顺序处理消息
OrderEvent event = queue.take();
processOrderEvent(event);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("Failed to process order event for: " + orderId, e);
}
}
}
private void processOrderEvent(OrderEvent event) {
// 实际的业务处理逻辑
log.info("Processing order event: " + event);
}
}
数据库控制顺序
序列号检查机制
@Entity
public class MessageSequence {
@Id
private String businessId; // 业务标识(如orderId)
private Long expectedSequence; // 期望的下一个序列号
private Date lastUpdateTime;
}
@Component
public class DatabaseSequenceOrderedConsumer {
@Autowired
private MessageSequenceRepository sequenceRepository;
@RabbitListener(queues = "ordered.queue")
public void consumeOrderedMessage(OrderedMessage message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
// 检查消息序列号
MessageSequence sequence = sequenceRepository.findById(message.getBusinessId()).orElse(null);
if (sequence == null) {
sequence = new MessageSequence();
sequence.setBusinessId(message.getBusinessId());
sequence.setExpectedSequence(1L);
}
if (message.getSequence() == sequence.getExpectedSequence()) {
// 序列号正确,处理消息
processMessage(message);
// 更新期望序列号
sequence.setExpectedSequence(sequence.getExpectedSequence() + 1);
sequence.setLastUpdateTime(new Date());
sequenceRepository.save(sequence);
channel.basicAck(deliveryTag, false);
} else if (message.getSequence() < sequence.getExpectedSequence()) {
// 重复消息,直接确认
log.warn("Duplicate message received: " + message);
channel.basicAck(deliveryTag, false);
} else {
// 序列号不连续,消息乱序,重新入队
log.warn("Out of order message: " + message);
channel.basicNack(deliveryTag, false, true);
}
} catch (Exception e) {
log.error("Failed to process ordered message", e);
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
log.error("Failed to nack message", ioException);
}
}
}
}
Redis实现顺序处理
Redis Streams顺序消费
@Component
public class RedisStreamsOrderedConsumer {
@Autowired
private redisTemplate<String, String> redisTemplate;
// 创建消费者组
@PostConstruct
public void initConsumerGroup() {
try {
redisTemplate.opsForStream().createGroup("order-stream", "order-group");
} catch (Exception e) {
// 组可能已存在,忽略异常
log.debug("Consumer group may already exist", e);
}
}
@Scheduled(fixedDelay = 1000)
public void consumeOrderedMessages() {
try {
// 从消费者组读取消息
List<Map.Entry<String, List<StreamEntry>>> streams =
redisTemplate.opsForStream().read(Consumer.from("order-group", "consumer-1"),
StreamOffset.create("order-stream", ReadOffset.lastConsumed()));
if (!streams.isEmpty()) {
for (StreamEntry entry : streams.get(0).getValue()) {
String messageId = entry.getId().getValue();
Map<String, String> fields = entry.getFields();
String businessId = fields.get("businessId");
String payload = fields.get("payload");
try {
// 处理消息
processMessage(businessId, payload);
// 确认消息处理完成
redisTemplate.opsForStream().acknowledge("order-stream", "order-group", messageId);
} catch (Exception e) {
log.error("Failed to process message: " + messageId, e);
// 不确认消息,会重新消费
}
}
}
} catch (Exception e) {
log.error("Error consuming messages", e);
}
}
}
顺序性处理最佳实践
消息重排序机制
@Component
public class MessageReorderingBuffer {
// 缓存未按顺序到达的消息
private final Map<String, TreeMap<Long, OrderedMessage>> buffer = new ConcurrentHashMap<>();
@RabbitListener(queues = "ordered.queue")
public void receiveMessage(OrderedMessage message) {
String businessId = message.getBusinessId();
Long sequence = message.getSequence();
// 获取该业务标识的消息缓冲区
TreeMap<Long, OrderedMessage> businessBuffer = buffer.computeIfAbsent(
businessId, k -> new TreeMap<>());
// 缓存消息
businessBuffer.put(sequence, message);
// 尝试处理连续的消息
processAvailableMessages(businessId, businessBuffer);
}
private void processAvailableMessages(String businessId, TreeMap<Long, OrderedMessage> buffer) {
Long expectedSequence = getExpectedSequence(businessId);
while (!buffer.isEmpty()) {
Map.Entry<Long, OrderedMessage> entry = buffer.firstEntry();
if (entry.getKey().equals(expectedSequence)) {
// 序列号匹配,处理消息
OrderedMessage message = entry.getValue();
processMessage(message);
// 从缓冲区移除已处理的消息
buffer.pollFirstEntry();
expectedSequence++;
// 更新期望序列号
updateExpectedSequence(businessId, expectedSequence);
} else if (entry.getKey() < expectedSequence) {
// 重复消息,直接移除
buffer.pollFirstEntry();
} else {
// 等待缺失的消息,停止处理
break;
}
}
// 清理空的缓冲区
if (buffer.isEmpty()) {
this.buffer.remove(businessId);
}
}
}
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。


