MQ的事务性指什么呢?

欢喜 rabbitmq 发布时间:2025-08-06 16:26:10 阅读数:6603 1
下文笔者讲述MQ的事务性相关说明,如下所示

MQ事务性简介

我们将一个MQ消息,需满足以下特性,称之为MQ的事务性
    1.原子性:消息发送/处理
	       要么全部成功
		   要么全部失败
    2.一致性:保证消息状态与业务状态一致
    3.隔离性:多个消息处理之间互不干扰
    4.持久性:消息不会因系统故障而丢失
==============================================
MQ消息的实现方式,如下所示
    -生产者事务:确保消息可靠发送
    -消费者事务:确保消息可靠处理
    -分布式事务:跨多个系统的事务一致性
    -本地事务表:通过本地事务保证消息可靠性  
    -事务隔离:控制消息处理的并发性

2.生产者端事务

RabbitMQ AMQP事务

@Component
public class RabbitMQAMQPTransaction {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendMessagesInTransaction(list<MessageData> messages) {
        rabbitTemplate.setChannelTransacted(true);
        
        try {
            // 开启事务
            rabbitTemplate.execute(channel -> {
                // 发送多条消息
                for (MessageData message : messages) {
                    rabbitTemplate.convertAndSend("exchange", "routing.key", message);
                }
                
                // 提交事务
                channel.txCommit();
                log.info("All messages sent successfully");
                return null;
            });
            
        } catch (Exception e) {
            log.error("Transaction failed, rolling back", e);
            
            // 回滚事务
            rabbitTemplate.execute(channel -> {
                try {
                    channel.txRollback();
                } catch (IOException ioException) {
                    log.error("Failed to rollback transaction", ioException);
                }
                return null;
            });
            
            throw new RuntimeException("Message transaction failed", e);
        }
    }
}

RabbitMQ Publisher Confirms(本地事务模式)

@Entity
public class OutboxMessage {
    @Id
    private String id;
    private String exchange;
    private String routingKey;
    private String payload;
    private MessageStatus status; // PENDING, SENT, CONFIRMED, FAILED
    private Date createTime;
    private Date updateTime;
}

@Service
public class OutboxPatternService {
    
    @Autowired
    private OutboxMessageRepository outboxMessageRepository;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    // 本地事务 + 消息发送
    @Transactional
    public void businessOperationWithMessage(BusinessData data) {
        // 1. 执行业务逻辑
        businessRepository.save(data);
        
        // 2. 插入消息到发件箱(本地事务)
        OutboxMessage message = new OutboxMessage();
        message.setId(UUID.randomUUID().toString());
        message.setExchange("business.exchange");
        message.setRoutingKey("business.key");
        message.setPayload(JSON.toJSONString(data));
        message.setStatus(MessageStatus.PENDING);
        message.setCreateTime(new Date());
        
        outboxMessageRepository.save(message);
        
        // 3. 实际发送消息
        sendMessage(message);
    }
    
    private void sendMessage(OutboxMessage message) {
        CorrelationData correlationData = new CorrelationData(message.getId());
        
        rabbitTemplate.convertAndSend(
            message.getExchange(),
            message.getRoutingKey(),
            message.getPayload(),
            correlationData
        );
    }
    
    // 确认回调处理
    @PostConstruct
    public void setupConfirmCallback() {
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            String messageId = correlationData.getId();
            
            if (ack) {
                // 消息确认成功
                outboxMessageRepository.updateStatus(messageId, MessageStatus.CONFIRMED);
                log.info("Message confirmed: " + messageId);
            } else {
                // 消息发送失败
                outboxMessageRepository.updateStatus(messageId, MessageStatus.FAILED);
                log.error("Message send failed: " + messageId + ", cause: " + cause);
                
                // 触发重试或补偿机制
                triggerRetryOrCompensation(messageId);
            }
        });
    }
}

3.Kafka事务支持

Kafka Producer事务
 
@Component
public class KafkaTransactionalProducer {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    // Kafka本地事务
    public void sendMessagesInTransaction(List<BusinessEvent> events) {
        kafkaTemplate.executeInTransaction(operations -> {
            for (BusinessEvent event : events) {
                operations.send("business-topic", event.getKey(), JSON.toJSONString(event));
            }
            return true;
        });
    }
    
    // 跨系统分布式事务
    @Transactional("chainedTransactionManager")
    public void businessOperationWithKafkaMessage(BusinessData data) {
        // 1. 执行数据库操作(在数据库事务中)
        businessRepository.save(data);
        
        // 2. 发送Kafka消息(在同一分布式事务中)
        kafkaTemplate.send("business-topic", data.getId(), JSON.toJSONString(data));
        
        // 如果任何一步失败,整个分布式事务回滚
    }
}

@Configuration
public class KafkaTransactionConfig {
    
    @Bean
    public ChainedKafkaTransactionManager chainedTransactionManager(
            PlatformTransactionManager databaseTransactionManager,
            KafkaTransactionManager kafkaTransactionManager) {
        return new ChainedKafkaTransactionManager(databaseTransactionManager, kafkaTransactionManager);
    }
}

4.消费者端事务

手动确认机制

@Component
public class TransactionalConsumer {
    
    @RabbitListener(queues = "business.queue")
    public void handleMessage(Message message, Channel channel,
                             @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            // 1. 解析消息
            String payload = new String(message.getBody());
            BusinessData data = JSON.parseObject(payload, BusinessData.class);
            
            // 2. 执行业务操作(在数据库事务中)
            businessService.processBusinessData(data);
            
            // 3. 确认消息(只有业务成功才确认)
            channel.basicAck(deliveryTag, false);
            log.info("Message processed and acknowledged: " + payload);
            
        } catch (Exception e) {
            log.error("Failed to process message", e);
            try {
                // 拒绝消息并决定是否重新入队
                if (shouldRequeue(e)) {
                    channel.basicNack(deliveryTag, false, true); // 重新入队
                } else {
                    channel.basicNack(deliveryTag, false, false); // 丢弃
                    // 发送到死信队列
                    sendToDeadLetterQueue(message);
                }
            } catch (IOException ioException) {
                log.error("Failed to nack message", ioException);
            }
        }
    }
    
    @Transactional
    public void processBusinessData(BusinessData data) {
        // 执行业务逻辑,如果抛出异常会回滚数据库事务
        businessRepository.save(processData(data));
    }
}

5.分布式事务模式

Saga模式

@Component
public class OrderSagaOrchestrator {
    
    @Autowired
    private MessageQueue mq;
    
    public void processOrder(Order order) {
        // 1. 创建订单
        mq.send("order.created", order);
        // 等待订单创建确认...
    }
    
    @RabbitListener(queues = "inventory.reserved")
    public void onInventoryReserved(InventoryEvent event) {
        if (event.isSuccess()) {
            // 2. 库存预留成功,继续支付
            PaymentRequest paymentRequest = new PaymentRequest(event.getOrderId(), event.getAmount());
            mq.send("payment.request", paymentRequest);
        } else {
            // 库存预留失败,取消订单
            mq.send("order.cancelled", new CancelOrderEvent(event.getOrderId()));
        }
    }
    
    @RabbitListener(queues = "payment.completed")
    public void onPaymentCompleted(PaymentEvent event) {
        if (event.isSuccess()) {
            // 3. 支付成功,完成订单
            mq.send("order.completed", new CompleteOrderEvent(event.getOrderId()));
        } else {
            // 支付失败,回滚库存
            mq.send("inventory.release", new ReleaseInventoryEvent(event.getOrderId()));
        }
    }
}


 TCC模式(Try-Confirm-Cancel)
 
public interface AccountService {
    // Try阶段:检查并预留资源
    void tryDeduct(String accountId, BigDecimal amount);
    
    // Confirm阶段:确认扣款
    void confirmDeduct(String accountId, BigDecimal amount);
    
    // Cancel阶段:取消扣款
    void cancelDeduct(String accountId, BigDecimal amount);
}

@Component
public class AccountServiceImpl implements AccountService {
    
    @Override
    public void tryDeduct(String accountId, BigDecimal amount) {
        // 检查余额是否足够,预留资金
        Account account = accountRepository.findById(accountId);
        if (account.getBalance().compareTo(amount) >= 0) {
            account.setFrozenAmount(account.getFrozenAmount().add(amount));
            account.setBalance(account.getBalance().subtract(amount));
            accountRepository.save(account);
        } else {
            throw new InsufficientBalanceException();
        }
    }
    
    @Override
    public void confirmDeduct(String accountId, BigDecimal amount) {
        // 确认扣款,释放冻结资金
        Account account = accountRepository.findById(accountId);
        account.setFrozenAmount(account.getFrozenAmount().subtract(amount));
        accountRepository.save(account);
    }
    
    @Override
    public void cancelDeduct(String accountId, BigDecimal amount) {
        // 取消扣款,恢复冻结资金
        Account account = accountRepository.findById(accountId);
        account.setBalance(account.getBalance().add(amount));
        account.setFrozenAmount(account.getFrozenAmount().subtract(amount));
        accountRepository.save(account);
    }
}

6.事务隔离级别

 消息可见性控制
 
@Component
public class MessageIsolationExample {
    
    // 设置消息预取数量,控制并发处理
    @RabbitListener(queues = "business.queue", 
                    containerFactory = "prefetchOneContainerFactory")
    public void handleMessageWithIsolation(Message message, Channel channel,
                                          @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        try {
            // 处理消息(事务性操作)
            processMessageInTransaction(message);
            
            // 确认消息
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            log.error("Message processing failed", e);
            try {
                channel.basicNack(deliveryTag, false, true);
            } catch (IOException ioException) {
                log.error("Failed to nack message", ioException);
            }
        }
    }
}

@Configuration
public class RabbitMQConfig {
    
    @Bean
    public SimpleRabbitListenerContainerFactory prefetchOneContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setPrefetchCount(1); // 每次只预取一条消息,保证处理顺序
        return factory;
    }
}
版权声明

本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。

本文链接: https://www.Java265.com/rabbitmq/2025/8506.html

最近发表

热门文章

好文推荐

Java265.com

https://www.java265.com

站长统计|粤ICP备14097017号-3

Powered By Java265.com信息维护小组

使用手机扫描二维码

关注我们看更多资讯

java爱好者