MQ的事务性指什么呢?
下文笔者讲述MQ的事务性相关说明,如下所示
MQ事务性简介
我们将一个MQ消息,需满足以下特性,称之为MQ的事务性
1.原子性:消息发送/处理
要么全部成功
要么全部失败
2.一致性:保证消息状态与业务状态一致
3.隔离性:多个消息处理之间互不干扰
4.持久性:消息不会因系统故障而丢失
==============================================
MQ消息的实现方式,如下所示
-生产者事务:确保消息可靠发送
-消费者事务:确保消息可靠处理
-分布式事务:跨多个系统的事务一致性
-本地事务表:通过本地事务保证消息可靠性
-事务隔离:控制消息处理的并发性
2.生产者端事务
RabbitMQ AMQP事务
@Component
public class RabbitMQAMQPTransaction {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessagesInTransaction(list<MessageData> messages) {
rabbitTemplate.setChannelTransacted(true);
try {
// 开启事务
rabbitTemplate.execute(channel -> {
// 发送多条消息
for (MessageData message : messages) {
rabbitTemplate.convertAndSend("exchange", "routing.key", message);
}
// 提交事务
channel.txCommit();
log.info("All messages sent successfully");
return null;
});
} catch (Exception e) {
log.error("Transaction failed, rolling back", e);
// 回滚事务
rabbitTemplate.execute(channel -> {
try {
channel.txRollback();
} catch (IOException ioException) {
log.error("Failed to rollback transaction", ioException);
}
return null;
});
throw new RuntimeException("Message transaction failed", e);
}
}
}
RabbitMQ Publisher Confirms(本地事务模式)
@Entity
public class OutboxMessage {
@Id
private String id;
private String exchange;
private String routingKey;
private String payload;
private MessageStatus status; // PENDING, SENT, CONFIRMED, FAILED
private Date createTime;
private Date updateTime;
}
@Service
public class OutboxPatternService {
@Autowired
private OutboxMessageRepository outboxMessageRepository;
@Autowired
private RabbitTemplate rabbitTemplate;
// 本地事务 + 消息发送
@Transactional
public void businessOperationWithMessage(BusinessData data) {
// 1. 执行业务逻辑
businessRepository.save(data);
// 2. 插入消息到发件箱(本地事务)
OutboxMessage message = new OutboxMessage();
message.setId(UUID.randomUUID().toString());
message.setExchange("business.exchange");
message.setRoutingKey("business.key");
message.setPayload(JSON.toJSONString(data));
message.setStatus(MessageStatus.PENDING);
message.setCreateTime(new Date());
outboxMessageRepository.save(message);
// 3. 实际发送消息
sendMessage(message);
}
private void sendMessage(OutboxMessage message) {
CorrelationData correlationData = new CorrelationData(message.getId());
rabbitTemplate.convertAndSend(
message.getExchange(),
message.getRoutingKey(),
message.getPayload(),
correlationData
);
}
// 确认回调处理
@PostConstruct
public void setupConfirmCallback() {
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
String messageId = correlationData.getId();
if (ack) {
// 消息确认成功
outboxMessageRepository.updateStatus(messageId, MessageStatus.CONFIRMED);
log.info("Message confirmed: " + messageId);
} else {
// 消息发送失败
outboxMessageRepository.updateStatus(messageId, MessageStatus.FAILED);
log.error("Message send failed: " + messageId + ", cause: " + cause);
// 触发重试或补偿机制
triggerRetryOrCompensation(messageId);
}
});
}
}
3.Kafka事务支持
Kafka Producer事务
@Component
public class KafkaTransactionalProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// Kafka本地事务
public void sendMessagesInTransaction(List<BusinessEvent> events) {
kafkaTemplate.executeInTransaction(operations -> {
for (BusinessEvent event : events) {
operations.send("business-topic", event.getKey(), JSON.toJSONString(event));
}
return true;
});
}
// 跨系统分布式事务
@Transactional("chainedTransactionManager")
public void businessOperationWithKafkaMessage(BusinessData data) {
// 1. 执行数据库操作(在数据库事务中)
businessRepository.save(data);
// 2. 发送Kafka消息(在同一分布式事务中)
kafkaTemplate.send("business-topic", data.getId(), JSON.toJSONString(data));
// 如果任何一步失败,整个分布式事务回滚
}
}
@Configuration
public class KafkaTransactionConfig {
@Bean
public ChainedKafkaTransactionManager chainedTransactionManager(
PlatformTransactionManager databaseTransactionManager,
KafkaTransactionManager kafkaTransactionManager) {
return new ChainedKafkaTransactionManager(databaseTransactionManager, kafkaTransactionManager);
}
}
4.消费者端事务
手动确认机制
@Component
public class TransactionalConsumer {
@RabbitListener(queues = "business.queue")
public void handleMessage(Message message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
// 1. 解析消息
String payload = new String(message.getBody());
BusinessData data = JSON.parseObject(payload, BusinessData.class);
// 2. 执行业务操作(在数据库事务中)
businessService.processBusinessData(data);
// 3. 确认消息(只有业务成功才确认)
channel.basicAck(deliveryTag, false);
log.info("Message processed and acknowledged: " + payload);
} catch (Exception e) {
log.error("Failed to process message", e);
try {
// 拒绝消息并决定是否重新入队
if (shouldRequeue(e)) {
channel.basicNack(deliveryTag, false, true); // 重新入队
} else {
channel.basicNack(deliveryTag, false, false); // 丢弃
// 发送到死信队列
sendToDeadLetterQueue(message);
}
} catch (IOException ioException) {
log.error("Failed to nack message", ioException);
}
}
}
@Transactional
public void processBusinessData(BusinessData data) {
// 执行业务逻辑,如果抛出异常会回滚数据库事务
businessRepository.save(processData(data));
}
}
5.分布式事务模式
Saga模式
@Component
public class OrderSagaOrchestrator {
@Autowired
private MessageQueue mq;
public void processOrder(Order order) {
// 1. 创建订单
mq.send("order.created", order);
// 等待订单创建确认...
}
@RabbitListener(queues = "inventory.reserved")
public void onInventoryReserved(InventoryEvent event) {
if (event.isSuccess()) {
// 2. 库存预留成功,继续支付
PaymentRequest paymentRequest = new PaymentRequest(event.getOrderId(), event.getAmount());
mq.send("payment.request", paymentRequest);
} else {
// 库存预留失败,取消订单
mq.send("order.cancelled", new CancelOrderEvent(event.getOrderId()));
}
}
@RabbitListener(queues = "payment.completed")
public void onPaymentCompleted(PaymentEvent event) {
if (event.isSuccess()) {
// 3. 支付成功,完成订单
mq.send("order.completed", new CompleteOrderEvent(event.getOrderId()));
} else {
// 支付失败,回滚库存
mq.send("inventory.release", new ReleaseInventoryEvent(event.getOrderId()));
}
}
}
TCC模式(Try-Confirm-Cancel)
public interface AccountService {
// Try阶段:检查并预留资源
void tryDeduct(String accountId, BigDecimal amount);
// Confirm阶段:确认扣款
void confirmDeduct(String accountId, BigDecimal amount);
// Cancel阶段:取消扣款
void cancelDeduct(String accountId, BigDecimal amount);
}
@Component
public class AccountServiceImpl implements AccountService {
@Override
public void tryDeduct(String accountId, BigDecimal amount) {
// 检查余额是否足够,预留资金
Account account = accountRepository.findById(accountId);
if (account.getBalance().compareTo(amount) >= 0) {
account.setFrozenAmount(account.getFrozenAmount().add(amount));
account.setBalance(account.getBalance().subtract(amount));
accountRepository.save(account);
} else {
throw new InsufficientBalanceException();
}
}
@Override
public void confirmDeduct(String accountId, BigDecimal amount) {
// 确认扣款,释放冻结资金
Account account = accountRepository.findById(accountId);
account.setFrozenAmount(account.getFrozenAmount().subtract(amount));
accountRepository.save(account);
}
@Override
public void cancelDeduct(String accountId, BigDecimal amount) {
// 取消扣款,恢复冻结资金
Account account = accountRepository.findById(accountId);
account.setBalance(account.getBalance().add(amount));
account.setFrozenAmount(account.getFrozenAmount().subtract(amount));
accountRepository.save(account);
}
}
6.事务隔离级别
消息可见性控制
@Component
public class MessageIsolationExample {
// 设置消息预取数量,控制并发处理
@RabbitListener(queues = "business.queue",
containerFactory = "prefetchOneContainerFactory")
public void handleMessageWithIsolation(Message message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
// 处理消息(事务性操作)
processMessageInTransaction(message);
// 确认消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("Message processing failed", e);
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException ioException) {
log.error("Failed to nack message", ioException);
}
}
}
}
@Configuration
public class RabbitMQConfig {
@Bean
public SimpleRabbitListenerContainerFactory prefetchOneContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setPrefetchCount(1); // 每次只预取一条消息,保证处理顺序
return factory;
}
}
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。


