MQ如何保证消息的可靠性的呢?
下文笔者讲述MQ保证消息可靠性的方法分享,如下所示
1.持久化存储:将消息写入磁盘防止丢失
2.生产者确认:确保消息成功发送到MQ服务器
3.消费者确认:确保消息被正确处理
4.事务支持:保证业务操作与消息发送的一致性
5.死信队列:处理异常消息防止影响正常流程
6.幂等性设计:防止消息重复消费造成业务问题
7.集群和复制:通过多节点部署提高可用性
使用以上方式对消息进行处理
即可保证MQ消息的可靠性
==============================================
具体的策略如下所示:
1.消息持久化
确保消息在系统崩溃后不会丢失
//RabbitMQ示例 - 持久化消息
@Component
public class ReliableMessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendReliableMessage(String exchange, String routingKey, Object message) {
// 创建持久化消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 消息持久化
Message persistentMessage = new Message(
SerializationUtils.serialize(message),
messageProperties
);
// 发送到持久化交换机和队列
rabbitTemplate.send(exchange, routingKey, persistentMessage);
}
}
//Kafka示例 - 持久化配置
@Configuration
public class KafkaProducerConfig {
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 确保消息持久化到磁盘
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); // 无限重试
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等性
return props;
}
}
2.生产者确认机制
确保消息成功发送到MQ服务器:
RabbitMQ Publisher Confirms
@Component
public class PublisherConfirmExample {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessageWithConfirm(String message) throws Exception {
// 启用发布者确认
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("Message confirmed: " + correlationData.getId());
} else {
log.error("Message failed: " + correlationData.getId() + ", cause: " + cause);
// 处理失败消息,如重试或记录到死信队列
handleFailedMessage(correlationData.getId(), message);
}
});
// 设置消息ID用于追踪
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("exchange", "routing.key", message, correlationData);
}
}
Kafka Producer Acks
@Component
public class KafkaReliableProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public listenableFuture<SendResult<String, String>> sendReliableMessage(
String topic, String key, String message) {
// 发送消息并获取结果Future
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topic, key, message);
// 添加回调处理
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("Message sent successfully: " + result.getRecordMetadata());
}
@Override
public void onFailure(Throwable ex) {
log.error("Failed to send message: " + message, ex);
// 处理发送失败,如重试
handleSendFailure(topic, key, message, ex);
}
});
return future;
}
}
3.消费者确认机制
确保消息被正确处理
RabbitMQ Manual Acknowledgment
@Component
public class ManualAckConsumer {
@RabbitListener(queues = "reliable.queue")
public void handleMessage(Message message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
// 处理消息
String payload = new String(message.getBody());
processMessage(payload);
// 手动确认消息
channel.basicAck(deliveryTag, false);
log.info("Message acknowledged: " + payload);
} catch (Exception e) {
log.error("Failed to process message", e);
try {
// 拒绝消息并重新入队
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
log.error("Failed to nack message", ioException);
}
}
}
private void processMessage(String message) throws Exception {
// 实际的消息处理逻辑
// 如果抛出异常,消息会重新入队
}
}
Kafka Consumer Offsets
@Component
public class KafkaReliableConsumer {
@KafkaListener(topics = "reliable-topic")
public void listen(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
// 处理消息
processMessage(record.value());
// 手动提交偏移量
ack.acknowledge();
log.info("Message processed and acknowledged: " + record.value());
} catch (Exception e) {
log.error("Failed to process message: " + record.value(), e);
// 不确认消息,下次重启后会重新消费
// 或者发送到死信主题
sendToDeadLetterTopic(record);
}
}
@KafkaListener(topics = "dead-letter-topic")
public void handleDeadLetter(ConsumerRecord<String, String> record) {
// 处理死信消息
log.warn("Processing dead letter message: " + record.value());
// 可以记录到数据库供人工处理
}
}
4.事务支持
保证消息发送与业务操作的一致性:
RabbitMQ事务
@Service
public class TransactionalMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void processBusinessWithMessage(BusinessData data) {
// 1. 执行业务操作
businessRepository.save(data);
// 2. 发送消息(在同一个事务中)
rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.convertAndSend("business.exchange", "business.key", data);
// 如果业务操作或消息发送失败,整个事务回滚
}
}
RabbitMQ Publisher Confirms with Local Transaction
@Service
public class ReliableBusinessService {
@Autowired
private BusinessRepository businessRepository;
@Autowired
private RabbitTemplate rabbitTemplate;
private final Set<String> pendingMessages = ConcurrentHashMap.newKeySet();
@Transactional
public void processBusinessOperation(BusinessData data) {
// 1. 执行业务操作
businessRepository.save(data);
// 2. 发送消息并记录待确认
String messageId = UUID.randomUUID().toString();
pendingMessages.add(messageId);
CorrelationData correlationData = new CorrelationData(messageId);
rabbitTemplate.convertAndSend("business.exchange", "business.key",
data, correlationData);
}
// 确认回调
@PostConstruct
public void setupConfirmCallback() {
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
String messageId = correlationData.getId();
if (ack) {
// 消息确认成功,从待确认集合中移除
pendingMessages.remove(messageId);
log.info("Message confirmed: " + messageId);
} else {
// 消息发送失败,需要补偿处理
log.error("Message failed: " + messageId + ", cause: " + cause);
handleUnconfirmedMessage(messageId);
}
});
}
}
5.死信队列处理
处理无法正常消费的消息:
@Configuration
public class DeadLetterConfig {
// 正常队列
@Bean
public Queue businessQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange");
args.put("x-dead-letter-routing-key", "dlx.routing.key");
args.put("x-message-ttl", 60000); // 1分钟TTL
args.put("x-max-length", 1000); // 最大长度
return QueueBuilder.durable("business.queue")
.withArguments(args)
.build();
}
// 死信交换机和队列
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("dlx.exchange");
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("dead.letter.queue").build();
}
@Bean
public Binding dlqBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("dlx.routing.key");
}
}
@Component
public class DeadLetterConsumer {
@RabbitListener(queues = "dead.letter.queue")
public void handleDeadLetter(Message message,
@Header("x-original-exchange") String originalExchange,
@Header("x-original-routing-key") String originalRoutingKey) {
log.warn("Received dead letter message from exchange: " + originalExchange +
", routing key: " + originalRoutingKey);
// 记录到数据库供人工处理
deadLetterRepository.save(new DeadLetterMessage(
message.getMessageProperties().getMessageId(),
new String(message.getBody()),
originalExchange,
originalRoutingKey,
new Date()
));
}
}
6.幂等性保证
防止消息重复消费:
@Component
public class IdempotentMessageConsumer {
@Autowired
private MessageLogRepository messageLogRepository;
@RabbitListener(queues = "business.queue")
public void handleMessage(BusinessMessage message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
// 检查消息是否已处理
if (messageLogRepository.existsByMessageId(message.getId())) {
log.info("Message already processed: " + message.getId());
channel.basicAck(deliveryTag, false);
return;
}
// 处理消息
processBusinessLogic(message);
// 记录已处理消息
messageLogRepository.save(new MessageLog(message.getId(), new Date()));
// 确认消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("Failed to process message: " + message.getId(), e);
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
log.error("Failed to nack message", ioException);
}
}
}
private void processBusinessLogic(BusinessMessage message) {
// 实际业务逻辑处理
}
}
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。


