java中如何保证RabbitMq的消息不丢失呢?
下文笔者讲述RabbitMq消息不丢失的方法分享,如下所示
RabbitMQ消息持久化的实现思路
如果你想保证RabbitMQ消息不丢失
这里面则涉及很多方面
如:消息持久化、确认机制、队列和交换机的配置等
具体的操作方式,如下所示
1.消息持久化
确保消息在发送到RabbitMQ时
被标记为持久化
即使 RabbitMQ 服务器重启,消息也不会丢失。
生产者端配置
在生产者端
使用 `delivery_mode=2` 来标记消息为持久化
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private final static String QUEUE_NAME = "durable_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明队列为持久化队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello World!";
// 发送持久化消息
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
队列持久化
确保队列在声明时被标记为持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
交换机持久化
如果使用交换机,确保交换机也被声明为持久化。
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
消费者确认机制
使用消费者确认机制(acknowledgments)来确保消息被正确处理
消费者在处理完消息后发送确认信号给RabbitMQ
RabbitMQ收到确认后才会删除消息
消费者端配置
在消费者端
启用手动确认模式
import com.rabbitmq.client.*;
public class Consumer {
private final static String QUEUE_NAME = "durable_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
// 处理消息
// 发送确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 启用手动确认模式
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
}
}
高可用性配置
配置RabbitMQ高可用性(HA)集群
以确保在某个节点故障时,消息不会丢失
配置HA集群
1.启用HA插件
rabbitmq-plugins enable rabbitmq_ha_policy
2.设置HA策略:
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
监控和日志
定期监控RabbitMQ状态和日志
及时发现和解决问题。
查看RabbitMQ状态
rabbitmqctl status
查看RabbitMQ日志
RabbitMQ日志通常位于/var/log/rabbitmq/目录下
使用以上步骤
可显著提高RabbitMQ消息可靠性
减少消息丢失风险
例:
生产者
消息确定机制
import com.rabbitmq.client.*;
public class PersistentProducer {
private final static String QUEUE_NAME = "persistent_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明一个持久化队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 启用生产者确认
channel.confirmSelect();
String message = "Persistent message with producer confirm!";
channel.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
// 检查消息是否成功发送
if (channel.waitForConfirms()) {
System.out.println("Message sent successfully!");
} else {
System.out.println("Message failed to send!");
}
}
}
}
channel.queueDeclare(QUEUE_NAME, true, false, false, null):队列是持久化的,确保 RabbitMQ 重启后队列不会丢失。 MessageProperties.PERSISTENT_TEXT_PLAIN:确保消息持久化存储。 channel.confirmSelect():启用生产者确认,确保消息成功送达 RabbitMQ。 channel.waitForConfirms():等待确认,如果生产者发送消息时发生失败,会捕获错误。
交换机
选择合适交换机类型:
常见的交换机类型包括direct、fanout、topic 和 headers
选择正确类型可确保消息路由正确
使用死信队列(Dead Letter Exchange, DLX):
当消息因某些原因无法被消费
可将消息转发到死信队列进行进一步处理
import com.rabbitmq.client.*;
public class DirectExchangeProducer {
private final static String EXCHANGE_NAME = "direct_logs";
private final static String QUEUE_NAME = "persistent_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明交换机和队列
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
String message = "Hello, Direct Exchange!";
channel.basicPublish(EXCHANGE_NAME, "info",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
System.out.println("Sent: " + message);
}
}
}
public class DeadLetterConsumer {
private final static String DLX_QUEUE = "dlx_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明死信队列
channel.queueDeclare(DLX_QUEUE, true, false, false, null);
// 创建消费者回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Dead Letter Queue Received: " + message);
};
// 设置死信队列消费者
channel.basicConsume(DLX_QUEUE, true, deliverCallback, consumerTag -> {});
}
}
}
import com.rabbitmq.client.*;
public class AckConsumer {
private final static String QUEUE_NAME = "persistent_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明一个持久化队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 创建一个消费者回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Received: " + message);
try {
// 模拟消息处理
if (message.contains("error")) {
throw new Exception("Error while processing message");
}
// 手动确认消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println("Message processed and acknowledged");
} catch (Exception e) {
// 如果消息处理失败,可以将消息重新放回队列
System.out.println("Error processing message, requeueing: " + e.getMessage());
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
};
// 设置手动确认
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
}
}
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false)
消息处理成功后确认消息
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true)
当消费失败
重新将消息投递到队列中
供其他消费者处理
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。


