Appearance
RabbitMQ 消息可靠性
本文介绍如何保证消息可靠性,即尽可能地保证消息从生产者发出,经过消息队列,最终被消费者成功处理,不丢失、不重复、不遗漏。
1. 面临的问题及解决思路
在消息可靠性工程中,面临的问题如下:
生产者:有可能由于网络等原因,生产者没有将消息发送到消息队列,然后生产者认为自己成功发送了消息,最终导致消息丢失。
解决思路:增加确认机制,即生产者发送消息后,等待消息队列的确认(ACK)。一旦收到确认,表明消息已成功写入队列;如果收到 NACK 或超时,则表明消息可能丢失,需要重试。确认过程可以通过回调函数异步确认。
队列:当队列接收到消息后,突发故障崩溃下线,会丢失消息;
解决思路:可以通过队列持久化机制,将消息保存到硬盘;也可以集群化部署消息队列,保证高可用。
消费者:
如果某个消费者取走了消息,但是由于消费者所在服务器崩溃等原因,导致消费者未成功处理完消息就下线了,就会导致该条消息丢失;
解决思路:增加消费者确认机制,即消费者消费消息后,消息队列并不会立即删除该条消息,而是将其标记为未确认状态,当收到消费者ACK确认信号,才会将这条消息删除,如果未收到信号或收到NACK信号,则继续保留这条消息;
如果增加了消费者确认机制,那么有可能导致一条消息被重复消费,为了解决重复消费的问题,需要保证消费者接口(业务功能)是幂等性的。所谓幂等性,即多次处理同一条消息,其结果与处理一次是相同的,不会造成数据不一致或业务逻辑错误。
为了防止无限重试或阻塞队列,通常会引入死信队列。当消息达到最大重试次数或无法被消费时,会被发送到死信队列。死信队列中的消息可以被人工干预、分析错误原因,或者重新投递(经过修复后)。
2. 生产者确认
2.1 两个回调函数
我们可以设置回调函数,分别处理消息发送到交换机和消息从交换机发送到队列失败的情况。
要启用回调函数,需要在application.properties开启以下参数:
properties
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true然后在在RabbitTemplate实体中,设置以下两个回调函数:
rabbitTemplate.setConfirmCallback():用于处理消息发送到交换机的情况,其接收一个函数式接口作为参数,该函数式接口中的函数有三个参数:CorrelationData correlationData:该消息相关的数据,在发送消息时由生产者设置,如果没有设置则为null;该参数对于实现可靠消息投递(如消息重试、状态更新)至关重要。
boolean ack:表示消息是否成功到达 RabbitMQ 交换机 (Exchange)。如果
ack为true:表示消息已成功被 RabbitMQ 交换机接收并处理(至少已进入交换机,可以被路由或等待路由)。如果
ack为false:表示消息未成功到达 RabbitMQ 交换机。这通常发生在网络问题、RabbitMQ 服务不可用或交换机不存在等情况下。String cause:如果ack为false(即消息未被确认),cause参数会提供一个字符串描述,说明失败的原因。如果ack为true,则cause通常为null。
rabbitTemplate.setReturnsCallback():设置消息从交换机发送到队列失败后的回调,注意:这种回调只有在消息无法路由到任何队列时才触发。该方法接收一个函数式接口作为参数,该函数式接口有一个参数:ReturnMessage returnMessage:返回的消息,其中包含replyCode、replyText、exchange、routingKey和Message,可以通过这些参数来确定失败原因以及交换机等信息。
下面通过配置类来设置RabbitTemplate的回调函数:
java
@Configuration
@Slf4j
public class RabbitMQConfirmConfig {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void setRabbitTemplateCallback() {
setConfirmCallback();
setReturnCallback();
}
private void setConfirmCallback(){
this.rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) -> {
log.info("correlationData:{}", correlationData);
log.info("ack:{}", ack);
log.info("cause:{}", cause);
}));
}
private void setReturnCallback(){
this.rabbitTemplate.setReturnsCallback((returnedMessage)->{
log.info("replyCode:{}", returnedMessage.getReplyCode());
log.info("replyText:{}", returnedMessage.getReplyText());
log.info("message:{}", new String(returnedMessage.getMessage().getBody()));
log.info("exchange:{}", returnedMessage.getExchange());
log.info("routeKey:{}", returnedMessage.getRoutingKey());
});
}
}
@PostConstruct用于标记一个方法,指示该方法在依赖注入完成之后、初始化方法被调用之前执行。一个类中可以有多个
@PostConstruct方法,但它们的执行顺序不保证。如果顺序很重要,应该将逻辑合并到一个方法中。
首先执行正常的发送消息:


可以看到ConfirmCallback正常执行。
然后执行异常的发送消息,路由键设置错误:


可以看到消息正常到达交换机(红色线条标注,ack:true),但是,消息并没有被正常路由到队列(蓝色线条标注),原因是NO_ROUTE,即没有匹配的路由键。
2.2 发送消息重试
在上一节中,我们只是了解到RabbitTemplate可以设置异步回调函数,该设置是针对所有消息的,无法精细配置,这一节介绍如何在回调函数中,根据消息配置进行重试,即如果该消息需要进行生产者确认,并且发送消息失败,如何进行重试或保存”消息发送失败“的记录写到数据库。
首先,自定义CorrelationData,增加三个参数用于重试或记录:
boolean requiresConfirmation:用于指示该消息是否需要确认回调;MQSendMessage originalMessage:原始消息,便于调试或重发;int retryNumber:重试次数;
java
@Data
public class MyCorrelationData extends CorrelationData {
// 最大重试次数
private static final int MAX_RETRY_NUMBER = 3;
// 用于指示该消息是否需要确认回调
private boolean requiresConfirmation;
// 原始消息,便于调试或重发
private MQSendMessage originalMessage;
// 重试次数
private int retryNumber;
public MyCorrelationData(boolean requiresConfirmation, MQSendMessage originalMessage){
super();
this.requiresConfirmation = requiresConfirmation;
this.originalMessage = originalMessage;
this.retryNumber = 0;
}
public boolean canRetry(){
return this.retryNumber < MAX_RETRY_NUMBER;
}
public void retry(){
this.retryNumber++;
}
}java
@Data
public class MQSendMessage {
private String exchange;
private String routeKey;
private String message;
}然后,在发送消息时,如果重要消息需要确保消息可靠性,可以同时发送相关消息:
java
@RestController
public class RabbitMQController {
@Resource
private RabbitTemplate rabbitTemplate;
@PostMapping("/sendMessage")
public String sendMessage(@RequestBody MQSendMessage message){
MyCorrelationData myCorrelationData = new MyCorrelationData(true, message);
rabbitTemplate.convertAndSend(message.getExchange(),
message.getRouteKey(),
message.getMessage(),
myCorrelationData
);
return "ok";
}
}TIP
注意,使用RabbitTemplate对象
最后,在回调函数中,通过CorrelationData来判断消息是否需要可靠性保证:
java
@Configuration
@Slf4j
public class RabbitMQConfirmConfig {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void setRabbitTemplateCallback() {
setConfirmCallback();
}
private void setConfirmCallback(){
this.rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) -> {
// 相关数据为null,或者不是自定义的相关消息,默认不进行确认
if(correlationData == null || !(correlationData instanceof MyCorrelationData)){
return;
}
MyCorrelationData myCorrelationData = (MyCorrelationData)correlationData;
// 不需要确认
if(!myCorrelationData.isRequiresConfirmation()){
return;
}
if (ack){
// 消息已成功到达交换机,判断是否到达队列
if(myCorrelationData.getReturned() != null){
// 未到达队列,则记录到日志中
log.error("消息未到达队列,returnedMessage: {}", myCorrelationData.getReturned());
}
}else{
// 发送到交换机失败,需要重试或记录
if(myCorrelationData.canRetry()) {
// 未超过重试次数
myCorrelationData.retry();
rabbitTemplate.convertAndSend(
myCorrelationData.getOriginalMessage().getExchange(),
myCorrelationData.getOriginalMessage().getRouteKey(),
myCorrelationData.getOriginalMessage().getMessage(),
myCorrelationData
);
}else{
// 已超过重试次数
log.error("消息未到达交换机,correlationData: {}", myCorrelationData);
}
}
}));
}
}::: warn
注意,这里只设置ConfirmCallback回调,returnCallback通过CorrelationData.getReturned()来判断,如果CorrelationData.getReturned()不为空,表示该消息到达了交换机但是没有到达队列
:::
2.3 测试
首先测试消息到达交换机,但是没有到达队列,只会打印出日志消息:


正常打印出日志。
然后测试消息没有正常到达交换机,只需要在启动Spring Boot程序后,手动删除交换机:

然后在Postman中正常发送消息,查看Spring Boot程序后台日志:

可以看到有四次发送消息的记录(第一次发送,其余三次是重试的),最终发送失败,记录在日志中。
3. 消息持久化
3.1 交换机的持久化
在Spring Boot中声明交换机时,可以指定其持久化特性,例如:
java
public DirectExchange(String name, boolean durable, boolean autoDelete) {
super(name, durable, autoDelete);
}其中第二个参数durable表示是否持久化。
如果使用指定交换机名称的构造方法,默认就是持久化的交换机:
java
public DirectExchange(String name) {
super(name);
}
// super.name()
public AbstractExchange(String name) {
this(name, true, false);
}3.2 队列持久化
在声明队列时,同样可以指定是否为持久化的队列:
java
public Queue(String name, boolean durable) {
this(name, durable, false, false, null);
}3.3 消息的持久化
默认情况下,使用rabbitTemplate.convertAndSend()方法发送的消息默认就是持久化的,跟踪方法进入:
java
protected Message convertMessageIfNecessary(final Object object) {
if (object instanceof Message msg) {
return msg;
}
return getRequiredMessageConverter().toMessage(object, new MessageProperties());
}在new MessageProperties()中创建消息属性,其中就有DeliveryMode:
java
public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;默认就是持久化的。
4. 消费者确认
消息确认机制是保证消息可靠性的核心:
- 没有确认:如果消费者在处理消息前崩溃,或者处理过程中出现异常,消息可能会丢失,因为它已经被 RabbitMQ 从队列中移除(假设是自动确认)。
- 有了确认:只有当消费者明确告诉 RabbitMQ 消息已经处理完毕后,RabbitMQ 才会将该消息从队列中移除。这确保了即使消费者在处理过程中失败,消息也能被重新投递给其他消费者,或者在消费者恢复后再次处理。
默认情况下,消费者端是自动确认的,在这种模式下,一旦消息被成功投递给消费者(即消费者方法执行完毕且没有抛出异常),Spring AMQP 会自动向 RabbitMQ 发送一个 Acknowledge (ACK) 信号。如果消费者在处理消息时抛出异常,消息会重新入队。
不过,最好还是将确认模式改为手动确认,即在代码中根据需要返回确认信息。
首先需要在配置文件中修改确认模式:
properties
spring.rabbitmq.listener.simple.acknowledge-mode=manual然后,在 @RabbitListener 方法需要额外接收一个参数:Channel
com.rabbitmq.client.Channel: RabbitMQ 客户端的通道,通过它来发送确认指令
Channel 对象提供了几个核心方法来管理消息确认:
channel.basicAck(long deliveryTag, boolean multiple)- 用途:消费者成功处理消息后,向 RabbitMQ 发送确认信号。
deliveryTag:消息的唯一标识符。multiple:布尔值。如果为true,则表示确认deliveryTag之前(包括deliveryTag本身)所有未确认的消息。如果为false,则只确认deliveryTag对应的这条消息。通常建议设置为false,以避免批量确认带来的潜在问题。
channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)- 用途:消费者无法处理消息时(例如,业务逻辑错误),向 RabbitMQ 发送否定确认信号。
deliveryTag:消息的唯一标识符。multiple:同basicAck,通常设为false。requeue:布尔值。- 如果为
true:消息会被重新放入队列,以便其他消费者或当前消费者再次处理。 - 如果为
false:消息不会被重新入队。如果配置了死信交换机(DLX),消息会发送到死信队列;否则,消息将被直接丢弃。
- 如果为
channel.basicReject(long deliveryTag, boolean requeue)- 用途: 功能与
basicNack类似,但它不支持multiple参数,即只能拒绝单条消息。 deliveryTag:消息的唯一标识符。requeue:同basicNack。
- 用途: 功能与
消息的唯一标识符可以通过消息配置获取:
java
// 获取消息唯一标识符
long deliveryTag = message.getMessageProperties().getDeliveryTag();因此,最终代码如下:
java
@Slf4j
@Component
public class RabbitMQListener {
@RabbitListener(queues = {RabbitMQConfig.QUEUE_NAME})
public void processMessage(Message message, Channel channel) throws IOException {
// 获取消息唯一标识符
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 执行业务逻辑
// 业务逻辑执行完成,消息确认消费
channel.basicAck(deliveryTag, false);
}catch (Exception e){
// 业务逻辑执行失败,消费消息失败,使消息重新入队
channel.basicNack(deliveryTag, false, true);
}
}
}- 对于无法处理的消息,通常不建议无限次地
requeue,因为这可能导致消息循环(Poison Message)。更好的做法是将其发送到死信队列,以便后续人工干预或分析。 - 如果消息可能因为
requeue而被多次处理,确保消费者业务逻辑是幂等的,即多次执行相同操作不会产生不同的结果。