MQ如何保证消息的可靠性的呢?

欢喜 rabbitmq 发布时间:2025-08-06 16:25:29 阅读数:3982 1
下文笔者讲述MQ保证消息可靠性的方法分享,如下所示
   1.持久化存储:将消息写入磁盘防止丢失
   2.生产者确认:确保消息成功发送到MQ服务器
   3.消费者确认:确保消息被正确处理
   4.事务支持:保证业务操作与消息发送的一致性
   5.死信队列:处理异常消息防止影响正常流程
   6.幂等性设计:防止消息重复消费造成业务问题
   7.集群和复制:通过多节点部署提高可用性
 使用以上方式对消息进行处理
    即可保证MQ消息的可靠性
==============================================
具体的策略如下所示:

1.消息持久化

确保消息在系统崩溃后不会丢失
 
//RabbitMQ示例 - 持久化消息
@Component
public class ReliableMessageProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendReliableMessage(String exchange, String routingKey, Object message) {
        // 创建持久化消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 消息持久化
        
        Message persistentMessage = new Message(
            SerializationUtils.serialize(message), 
            messageProperties
        );
        
        // 发送到持久化交换机和队列
        rabbitTemplate.send(exchange, routingKey, persistentMessage);
    }
}

//Kafka示例 - 持久化配置
@Configuration
public class KafkaProducerConfig {
    
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        
        // 确保消息持久化到磁盘
        props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 无限重试
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性
        
        return props;
    }
}

2.生产者确认机制

确保消息成功发送到MQ服务器:

RabbitMQ Publisher Confirms
 
@Component
public class PublisherConfirmExample {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMessageWithConfirm(String message) throws Exception {
        // 启用发布者确认
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("Message confirmed: " + correlationData.getId());
            } else {
                log.error("Message failed: " + correlationData.getId() + ", cause: " + cause);
                // 处理失败消息,如重试或记录到死信队列
                handleFailedMessage(correlationData.getId(), message);
            }
        });
        
        // 设置消息ID用于追踪
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("exchange", "routing.key", message, correlationData);
    }
}
 

Kafka Producer Acks
 
@Component
public class KafkaReliableProducer {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public listenableFuture<SendResult<String, String>> sendReliableMessage(
            String topic, String key, String message) {
        
        // 发送消息并获取结果Future
        ListenableFuture<SendResult<String, String>> future = 
            kafkaTemplate.send(topic, key, message);
        
        // 添加回调处理
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("Message sent successfully: " + result.getRecordMetadata());
            }
            
            @Override
            public void onFailure(Throwable ex) {
                log.error("Failed to send message: " + message, ex);
                // 处理发送失败,如重试
                handleSendFailure(topic, key, message, ex);
            }
        });
        
        return future;
    }
}

3.消费者确认机制

确保消息被正确处理

RabbitMQ Manual Acknowledgment

@Component
public class ManualAckConsumer {
    
    @RabbitListener(queues = "reliable.queue")
    public void handleMessage(Message message, Channel channel, 
                             @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            // 处理消息
            String payload = new String(message.getBody());
            processMessage(payload);
            
            // 手动确认消息
            channel.basicAck(deliveryTag, false);
            log.info("Message acknowledged: " + payload);
            
        } catch (Exception e) {
            log.error("Failed to process message", e);
            try {
                // 拒绝消息并重新入队
                channel.basicNack(deliveryTag, false, true);
            } catch (IOException ioException) {
                log.error("Failed to nack message", ioException);
            }
        }
    }
    
    private void processMessage(String message) throws Exception {
        // 实际的消息处理逻辑
        // 如果抛出异常,消息会重新入队
    }
}


Kafka Consumer Offsets

@Component
public class KafkaReliableConsumer {
    
    @KafkaListener(topics = "reliable-topic")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {
        try {
            // 处理消息
            processMessage(record.value());
            
            // 手动提交偏移量
            ack.acknowledge();
            log.info("Message processed and acknowledged: " + record.value());
            
        } catch (Exception e) {
            log.error("Failed to process message: " + record.value(), e);
            // 不确认消息,下次重启后会重新消费
            // 或者发送到死信主题
            sendToDeadLetterTopic(record);
        }
    }
    
    @KafkaListener(topics = "dead-letter-topic")
    public void handleDeadLetter(ConsumerRecord<String, String> record) {
        // 处理死信消息
        log.warn("Processing dead letter message: " + record.value());
        // 可以记录到数据库供人工处理
    }
}

4.事务支持

保证消息发送与业务操作的一致性:

RabbitMQ事务
 
@Service
public class TransactionalMessageService {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Transactional
    public void processBusinessWithMessage(BusinessData data) {
        // 1. 执行业务操作
        businessRepository.save(data);
        
        // 2. 发送消息(在同一个事务中)
        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.convertAndSend("business.exchange", "business.key", data);
        
        // 如果业务操作或消息发送失败,整个事务回滚
    }
}
 
RabbitMQ Publisher Confirms with Local Transaction

@Service
public class ReliableBusinessService {
    
    @Autowired
    private BusinessRepository businessRepository;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    private final Set<String> pendingMessages = ConcurrentHashMap.newKeySet();
    
    @Transactional
    public void processBusinessOperation(BusinessData data) {
        // 1. 执行业务操作
        businessRepository.save(data);
        
        // 2. 发送消息并记录待确认
        String messageId = UUID.randomUUID().toString();
        pendingMessages.add(messageId);
        
        CorrelationData correlationData = new CorrelationData(messageId);
        rabbitTemplate.convertAndSend("business.exchange", "business.key", 
                                    data, correlationData);
    }
    
    // 确认回调
    @PostConstruct
    public void setupConfirmCallback() {
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            String messageId = correlationData.getId();
            if (ack) {
                // 消息确认成功,从待确认集合中移除
                pendingMessages.remove(messageId);
                log.info("Message confirmed: " + messageId);
            } else {
                // 消息发送失败,需要补偿处理
                log.error("Message failed: " + messageId + ", cause: " + cause);
                handleUnconfirmedMessage(messageId);
            }
        });
    }
}

5.死信队列处理

处理无法正常消费的消息:
 
@Configuration
public class DeadLetterConfig {
    
    // 正常队列
    @Bean
    public Queue businessQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "dlx.exchange");
        args.put("x-dead-letter-routing-key", "dlx.routing.key");
        args.put("x-message-ttl", 60000); // 1分钟TTL
        args.put("x-max-length", 1000); // 最大长度
        
        return QueueBuilder.durable("business.queue")
                .withArguments(args)
                .build();
    }
    
    // 死信交换机和队列
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("dlx.exchange");
    }
    
    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable("dead.letter.queue").build();
    }
    
    @Bean
    public Binding dlqBinding() {
        return BindingBuilder.bind(deadLetterQueue())
                .to(deadLetterExchange())
                .with("dlx.routing.key");
    }
}

@Component
public class DeadLetterConsumer {
    
    @RabbitListener(queues = "dead.letter.queue")
    public void handleDeadLetter(Message message, 
                                @Header("x-original-exchange") String originalExchange,
                                @Header("x-original-routing-key") String originalRoutingKey) {
        log.warn("Received dead letter message from exchange: " + originalExchange + 
                ", routing key: " + originalRoutingKey);
        
        // 记录到数据库供人工处理
        deadLetterRepository.save(new DeadLetterMessage(
            message.getMessageProperties().getMessageId(),
            new String(message.getBody()),
            originalExchange,
            originalRoutingKey,
            new Date()
        ));
    }
}

6.幂等性保证

防止消息重复消费:

@Component
public class IdempotentMessageConsumer {
    
    @Autowired
    private MessageLogRepository messageLogRepository;
    
    @RabbitListener(queues = "business.queue")
    public void handleMessage(BusinessMessage message, Channel channel, 
                             @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            // 检查消息是否已处理
            if (messageLogRepository.existsByMessageId(message.getId())) {
                log.info("Message already processed: " + message.getId());
                channel.basicAck(deliveryTag, false);
                return;
            }
            
            // 处理消息
            processBusinessLogic(message);
            
            // 记录已处理消息
            messageLogRepository.save(new MessageLog(message.getId(), new Date()));
            
            // 确认消息
            channel.basicAck(deliveryTag, false);
            
        } catch (Exception e) {
            log.error("Failed to process message: " + message.getId(), e);
            try {
                channel.basicNack(deliveryTag, false, true);
            } catch (IOException ioException) {
                log.error("Failed to nack message", ioException);
            }
        }
    }
    
    private void processBusinessLogic(BusinessMessage message) {
        // 实际业务逻辑处理
    }
}
版权声明

本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。

本文链接: https://www.Java265.com/rabbitmq/2025/8505.html

最近发表

热门文章

好文推荐

Java265.com

https://www.java265.com

站长统计|粤ICP备14097017号-3

Powered By Java265.com信息维护小组

使用手机扫描二维码

关注我们看更多资讯

java爱好者