MQ如何保证消息的可靠性的呢?
下文笔者讲述MQ保证消息可靠性的方法分享,如下所示
1.持久化存储:将消息写入磁盘防止丢失 2.生产者确认:确保消息成功发送到MQ服务器 3.消费者确认:确保消息被正确处理 4.事务支持:保证业务操作与消息发送的一致性 5.死信队列:处理异常消息防止影响正常流程 6.幂等性设计:防止消息重复消费造成业务问题 7.集群和复制:通过多节点部署提高可用性 使用以上方式对消息进行处理 即可保证MQ消息的可靠性 ============================================== 具体的策略如下所示:
1.消息持久化
确保消息在系统崩溃后不会丢失 //RabbitMQ示例 - 持久化消息 @Component public class ReliableMessageProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendReliableMessage(String exchange, String routingKey, Object message) { // 创建持久化消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 消息持久化 Message persistentMessage = new Message( SerializationUtils.serialize(message), messageProperties ); // 发送到持久化交换机和队列 rabbitTemplate.send(exchange, routingKey, persistentMessage); } } //Kafka示例 - 持久化配置 @Configuration public class KafkaProducerConfig { @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 确保消息持久化到磁盘 props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认 props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 无限重试 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性 return props; } }
2.生产者确认机制
确保消息成功发送到MQ服务器: RabbitMQ Publisher Confirms @Component public class PublisherConfirmExample { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessageWithConfirm(String message) throws Exception { // 启用发布者确认 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("Message confirmed: " + correlationData.getId()); } else { log.error("Message failed: " + correlationData.getId() + ", cause: " + cause); // 处理失败消息,如重试或记录到死信队列 handleFailedMessage(correlationData.getId(), message); } }); // 设置消息ID用于追踪 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("exchange", "routing.key", message, correlationData); } } Kafka Producer Acks @Component public class KafkaReliableProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public listenableFuture<SendResult<String, String>> sendReliableMessage( String topic, String key, String message) { // 发送消息并获取结果Future ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message); // 添加回调处理 future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { log.info("Message sent successfully: " + result.getRecordMetadata()); } @Override public void onFailure(Throwable ex) { log.error("Failed to send message: " + message, ex); // 处理发送失败,如重试 handleSendFailure(topic, key, message, ex); } }); return future; } }
3.消费者确认机制
确保消息被正确处理 RabbitMQ Manual Acknowledgment @Component public class ManualAckConsumer { @RabbitListener(queues = "reliable.queue") public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { try { // 处理消息 String payload = new String(message.getBody()); processMessage(payload); // 手动确认消息 channel.basicAck(deliveryTag, false); log.info("Message acknowledged: " + payload); } catch (Exception e) { log.error("Failed to process message", e); try { // 拒绝消息并重新入队 channel.basicNack(deliveryTag, false, true); } catch (IOException ioException) { log.error("Failed to nack message", ioException); } } } private void processMessage(String message) throws Exception { // 实际的消息处理逻辑 // 如果抛出异常,消息会重新入队 } } Kafka Consumer Offsets @Component public class KafkaReliableConsumer { @KafkaListener(topics = "reliable-topic") public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) { try { // 处理消息 processMessage(record.value()); // 手动提交偏移量 ack.acknowledge(); log.info("Message processed and acknowledged: " + record.value()); } catch (Exception e) { log.error("Failed to process message: " + record.value(), e); // 不确认消息,下次重启后会重新消费 // 或者发送到死信主题 sendToDeadLetterTopic(record); } } @KafkaListener(topics = "dead-letter-topic") public void handleDeadLetter(ConsumerRecord<String, String> record) { // 处理死信消息 log.warn("Processing dead letter message: " + record.value()); // 可以记录到数据库供人工处理 } }
4.事务支持
保证消息发送与业务操作的一致性: RabbitMQ事务 @Service public class TransactionalMessageService { @Autowired private RabbitTemplate rabbitTemplate; @Transactional public void processBusinessWithMessage(BusinessData data) { // 1. 执行业务操作 businessRepository.save(data); // 2. 发送消息(在同一个事务中) rabbitTemplate.setChannelTransacted(true); rabbitTemplate.convertAndSend("business.exchange", "business.key", data); // 如果业务操作或消息发送失败,整个事务回滚 } } RabbitMQ Publisher Confirms with Local Transaction @Service public class ReliableBusinessService { @Autowired private BusinessRepository businessRepository; @Autowired private RabbitTemplate rabbitTemplate; private final Set<String> pendingMessages = ConcurrentHashMap.newKeySet(); @Transactional public void processBusinessOperation(BusinessData data) { // 1. 执行业务操作 businessRepository.save(data); // 2. 发送消息并记录待确认 String messageId = UUID.randomUUID().toString(); pendingMessages.add(messageId); CorrelationData correlationData = new CorrelationData(messageId); rabbitTemplate.convertAndSend("business.exchange", "business.key", data, correlationData); } // 确认回调 @PostConstruct public void setupConfirmCallback() { rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { String messageId = correlationData.getId(); if (ack) { // 消息确认成功,从待确认集合中移除 pendingMessages.remove(messageId); log.info("Message confirmed: " + messageId); } else { // 消息发送失败,需要补偿处理 log.error("Message failed: " + messageId + ", cause: " + cause); handleUnconfirmedMessage(messageId); } }); } }
5.死信队列处理
处理无法正常消费的消息: @Configuration public class DeadLetterConfig { // 正常队列 @Bean public Queue businessQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "dlx.exchange"); args.put("x-dead-letter-routing-key", "dlx.routing.key"); args.put("x-message-ttl", 60000); // 1分钟TTL args.put("x-max-length", 1000); // 最大长度 return QueueBuilder.durable("business.queue") .withArguments(args) .build(); } // 死信交换机和队列 @Bean public DirectExchange deadLetterExchange() { return new DirectExchange("dlx.exchange"); } @Bean public Queue deadLetterQueue() { return QueueBuilder.durable("dead.letter.queue").build(); } @Bean public Binding dlqBinding() { return BindingBuilder.bind(deadLetterQueue()) .to(deadLetterExchange()) .with("dlx.routing.key"); } } @Component public class DeadLetterConsumer { @RabbitListener(queues = "dead.letter.queue") public void handleDeadLetter(Message message, @Header("x-original-exchange") String originalExchange, @Header("x-original-routing-key") String originalRoutingKey) { log.warn("Received dead letter message from exchange: " + originalExchange + ", routing key: " + originalRoutingKey); // 记录到数据库供人工处理 deadLetterRepository.save(new DeadLetterMessage( message.getMessageProperties().getMessageId(), new String(message.getBody()), originalExchange, originalRoutingKey, new Date() )); } }
6.幂等性保证
防止消息重复消费: @Component public class IdempotentMessageConsumer { @Autowired private MessageLogRepository messageLogRepository; @RabbitListener(queues = "business.queue") public void handleMessage(BusinessMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { try { // 检查消息是否已处理 if (messageLogRepository.existsByMessageId(message.getId())) { log.info("Message already processed: " + message.getId()); channel.basicAck(deliveryTag, false); return; } // 处理消息 processBusinessLogic(message); // 记录已处理消息 messageLogRepository.save(new MessageLog(message.getId(), new Date())); // 确认消息 channel.basicAck(deliveryTag, false); } catch (Exception e) { log.error("Failed to process message: " + message.getId(), e); try { channel.basicNack(deliveryTag, false, true); } catch (IOException ioException) { log.error("Failed to nack message", ioException); } } } private void processBusinessLogic(BusinessMessage message) { // 实际业务逻辑处理 } }
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。