MQ的事务性指什么呢?
下文笔者讲述MQ的事务性相关说明,如下所示
MQ事务性简介
我们将一个MQ消息,需满足以下特性,称之为MQ的事务性 1.原子性:消息发送/处理 要么全部成功 要么全部失败 2.一致性:保证消息状态与业务状态一致 3.隔离性:多个消息处理之间互不干扰 4.持久性:消息不会因系统故障而丢失 ============================================== MQ消息的实现方式,如下所示 -生产者事务:确保消息可靠发送 -消费者事务:确保消息可靠处理 -分布式事务:跨多个系统的事务一致性 -本地事务表:通过本地事务保证消息可靠性 -事务隔离:控制消息处理的并发性
2.生产者端事务
RabbitMQ AMQP事务 @Component public class RabbitMQAMQPTransaction { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessagesInTransaction(list<MessageData> messages) { rabbitTemplate.setChannelTransacted(true); try { // 开启事务 rabbitTemplate.execute(channel -> { // 发送多条消息 for (MessageData message : messages) { rabbitTemplate.convertAndSend("exchange", "routing.key", message); } // 提交事务 channel.txCommit(); log.info("All messages sent successfully"); return null; }); } catch (Exception e) { log.error("Transaction failed, rolling back", e); // 回滚事务 rabbitTemplate.execute(channel -> { try { channel.txRollback(); } catch (IOException ioException) { log.error("Failed to rollback transaction", ioException); } return null; }); throw new RuntimeException("Message transaction failed", e); } } } RabbitMQ Publisher Confirms(本地事务模式) @Entity public class OutboxMessage { @Id private String id; private String exchange; private String routingKey; private String payload; private MessageStatus status; // PENDING, SENT, CONFIRMED, FAILED private Date createTime; private Date updateTime; } @Service public class OutboxPatternService { @Autowired private OutboxMessageRepository outboxMessageRepository; @Autowired private RabbitTemplate rabbitTemplate; // 本地事务 + 消息发送 @Transactional public void businessOperationWithMessage(BusinessData data) { // 1. 执行业务逻辑 businessRepository.save(data); // 2. 插入消息到发件箱(本地事务) OutboxMessage message = new OutboxMessage(); message.setId(UUID.randomUUID().toString()); message.setExchange("business.exchange"); message.setRoutingKey("business.key"); message.setPayload(JSON.toJSONString(data)); message.setStatus(MessageStatus.PENDING); message.setCreateTime(new Date()); outboxMessageRepository.save(message); // 3. 实际发送消息 sendMessage(message); } private void sendMessage(OutboxMessage message) { CorrelationData correlationData = new CorrelationData(message.getId()); rabbitTemplate.convertAndSend( message.getExchange(), message.getRoutingKey(), message.getPayload(), correlationData ); } // 确认回调处理 @PostConstruct public void setupConfirmCallback() { rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { String messageId = correlationData.getId(); if (ack) { // 消息确认成功 outboxMessageRepository.updateStatus(messageId, MessageStatus.CONFIRMED); log.info("Message confirmed: " + messageId); } else { // 消息发送失败 outboxMessageRepository.updateStatus(messageId, MessageStatus.FAILED); log.error("Message send failed: " + messageId + ", cause: " + cause); // 触发重试或补偿机制 triggerRetryOrCompensation(messageId); } }); } }
3.Kafka事务支持
Kafka Producer事务 @Component public class KafkaTransactionalProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; // Kafka本地事务 public void sendMessagesInTransaction(List<BusinessEvent> events) { kafkaTemplate.executeInTransaction(operations -> { for (BusinessEvent event : events) { operations.send("business-topic", event.getKey(), JSON.toJSONString(event)); } return true; }); } // 跨系统分布式事务 @Transactional("chainedTransactionManager") public void businessOperationWithKafkaMessage(BusinessData data) { // 1. 执行数据库操作(在数据库事务中) businessRepository.save(data); // 2. 发送Kafka消息(在同一分布式事务中) kafkaTemplate.send("business-topic", data.getId(), JSON.toJSONString(data)); // 如果任何一步失败,整个分布式事务回滚 } } @Configuration public class KafkaTransactionConfig { @Bean public ChainedKafkaTransactionManager chainedTransactionManager( PlatformTransactionManager databaseTransactionManager, KafkaTransactionManager kafkaTransactionManager) { return new ChainedKafkaTransactionManager(databaseTransactionManager, kafkaTransactionManager); } }
4.消费者端事务
手动确认机制 @Component public class TransactionalConsumer { @RabbitListener(queues = "business.queue") public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { try { // 1. 解析消息 String payload = new String(message.getBody()); BusinessData data = JSON.parseObject(payload, BusinessData.class); // 2. 执行业务操作(在数据库事务中) businessService.processBusinessData(data); // 3. 确认消息(只有业务成功才确认) channel.basicAck(deliveryTag, false); log.info("Message processed and acknowledged: " + payload); } catch (Exception e) { log.error("Failed to process message", e); try { // 拒绝消息并决定是否重新入队 if (shouldRequeue(e)) { channel.basicNack(deliveryTag, false, true); // 重新入队 } else { channel.basicNack(deliveryTag, false, false); // 丢弃 // 发送到死信队列 sendToDeadLetterQueue(message); } } catch (IOException ioException) { log.error("Failed to nack message", ioException); } } } @Transactional public void processBusinessData(BusinessData data) { // 执行业务逻辑,如果抛出异常会回滚数据库事务 businessRepository.save(processData(data)); } }
5.分布式事务模式
Saga模式 @Component public class OrderSagaOrchestrator { @Autowired private MessageQueue mq; public void processOrder(Order order) { // 1. 创建订单 mq.send("order.created", order); // 等待订单创建确认... } @RabbitListener(queues = "inventory.reserved") public void onInventoryReserved(InventoryEvent event) { if (event.isSuccess()) { // 2. 库存预留成功,继续支付 PaymentRequest paymentRequest = new PaymentRequest(event.getOrderId(), event.getAmount()); mq.send("payment.request", paymentRequest); } else { // 库存预留失败,取消订单 mq.send("order.cancelled", new CancelOrderEvent(event.getOrderId())); } } @RabbitListener(queues = "payment.completed") public void onPaymentCompleted(PaymentEvent event) { if (event.isSuccess()) { // 3. 支付成功,完成订单 mq.send("order.completed", new CompleteOrderEvent(event.getOrderId())); } else { // 支付失败,回滚库存 mq.send("inventory.release", new ReleaseInventoryEvent(event.getOrderId())); } } } TCC模式(Try-Confirm-Cancel) public interface AccountService { // Try阶段:检查并预留资源 void tryDeduct(String accountId, BigDecimal amount); // Confirm阶段:确认扣款 void confirmDeduct(String accountId, BigDecimal amount); // Cancel阶段:取消扣款 void cancelDeduct(String accountId, BigDecimal amount); } @Component public class AccountServiceImpl implements AccountService { @Override public void tryDeduct(String accountId, BigDecimal amount) { // 检查余额是否足够,预留资金 Account account = accountRepository.findById(accountId); if (account.getBalance().compareTo(amount) >= 0) { account.setFrozenAmount(account.getFrozenAmount().add(amount)); account.setBalance(account.getBalance().subtract(amount)); accountRepository.save(account); } else { throw new InsufficientBalanceException(); } } @Override public void confirmDeduct(String accountId, BigDecimal amount) { // 确认扣款,释放冻结资金 Account account = accountRepository.findById(accountId); account.setFrozenAmount(account.getFrozenAmount().subtract(amount)); accountRepository.save(account); } @Override public void cancelDeduct(String accountId, BigDecimal amount) { // 取消扣款,恢复冻结资金 Account account = accountRepository.findById(accountId); account.setBalance(account.getBalance().add(amount)); account.setFrozenAmount(account.getFrozenAmount().subtract(amount)); accountRepository.save(account); } }
6.事务隔离级别
消息可见性控制 @Component public class MessageIsolationExample { // 设置消息预取数量,控制并发处理 @RabbitListener(queues = "business.queue", containerFactory = "prefetchOneContainerFactory") public void handleMessageWithIsolation(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { try { // 处理消息(事务性操作) processMessageInTransaction(message); // 确认消息 channel.basicAck(deliveryTag, false); } catch (Exception e) { log.error("Message processing failed", e); try { channel.basicNack(deliveryTag, false, true); } catch (IOException ioException) { log.error("Failed to nack message", ioException); } } } } @Configuration public class RabbitMQConfig { @Bean public SimpleRabbitListenerContainerFactory prefetchOneContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setPrefetchCount(1); // 每次只预取一条消息,保证处理顺序 return factory; } }
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。