Skip to content

RabbitMQ 消息可靠性

本文介绍如何保证消息可靠性,即尽可能地保证消息从生产者发出,经过消息队列,最终被消费者成功处理,不丢失、不重复、不遗漏。

1. 面临的问题及解决思路

在消息可靠性工程中,面临的问题如下:

  • 生产者:有可能由于网络等原因,生产者没有将消息发送到消息队列,然后生产者认为自己成功发送了消息,最终导致消息丢失。

    解决思路:增加确认机制,即生产者发送消息后,等待消息队列的确认(ACK)。一旦收到确认,表明消息已成功写入队列;如果收到 NACK 或超时,则表明消息可能丢失,需要重试。确认过程可以通过回调函数异步确认。

  • 队列:当队列接收到消息后,突发故障崩溃下线,会丢失消息;

    解决思路:可以通过队列持久化机制,将消息保存到硬盘;也可以集群化部署消息队列,保证高可用。

  • 消费者:

    • 如果某个消费者取走了消息,但是由于消费者所在服务器崩溃等原因,导致消费者未成功处理完消息就下线了,就会导致该条消息丢失;

      解决思路:增加消费者确认机制,即消费者消费消息后,消息队列并不会立即删除该条消息,而是将其标记为未确认状态,当收到消费者ACK确认信号,才会将这条消息删除,如果未收到信号或收到NACK信号,则继续保留这条消息;

    • 如果增加了消费者确认机制,那么有可能导致一条消息被重复消费,为了解决重复消费的问题,需要保证消费者接口(业务功能)是幂等性的。所谓幂等性,即多次处理同一条消息,其结果与处理一次是相同的,不会造成数据不一致或业务逻辑错误。

    • 为了防止无限重试或阻塞队列,通常会引入死信队列。当消息达到最大重试次数或无法被消费时,会被发送到死信队列。死信队列中的消息可以被人工干预、分析错误原因,或者重新投递(经过修复后)。

2. 生产者确认

2.1 两个回调函数

我们可以设置回调函数,分别处理消息发送到交换机和消息从交换机发送到队列失败的情况。

要启用回调函数,需要在application.properties开启以下参数:

properties
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true

然后在在RabbitTemplate实体中,设置以下两个回调函数:

  • rabbitTemplate.setConfirmCallback():用于处理消息发送到交换机的情况,其接收一个函数式接口作为参数,该函数式接口中的函数有三个参数:

    • CorrelationData correlationData:该消息相关的数据,在发送消息时由生产者设置,如果没有设置则为null

      该参数对于实现可靠消息投递(如消息重试、状态更新)至关重要。

    • boolean ack:表示消息是否成功到达 RabbitMQ 交换机 (Exchange)。

      如果 acktrue:表示消息已成功被 RabbitMQ 交换机接收并处理(至少已进入交换机,可以被路由或等待路由)。

      如果 ackfalse:表示消息未成功到达 RabbitMQ 交换机。这通常发生在网络问题、RabbitMQ 服务不可用或交换机不存在等情况下。

    • String cause:如果 ackfalse(即消息未被确认),cause 参数会提供一个字符串描述,说明失败的原因。如果 acktrue,则 cause 通常为 null

  • rabbitTemplate.setReturnsCallback():设置消息从交换机发送到队列失败后的回调,注意:这种回调只有在消息无法路由到任何队列时才触发。该方法接收一个函数式接口作为参数,该函数式接口有一个参数:

    • ReturnMessage returnMessage:返回的消息,其中包含replyCodereplyTextexchangeroutingKeyMessage,可以通过这些参数来确定失败原因以及交换机等信息。

下面通过配置类来设置RabbitTemplate的回调函数:

java
@Configuration
@Slf4j
public class RabbitMQConfirmConfig {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void setRabbitTemplateCallback() {
        setConfirmCallback();
        setReturnCallback();
    }

    private void setConfirmCallback(){
        this.rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) -> {
            log.info("correlationData:{}", correlationData);
            log.info("ack:{}", ack);
            log.info("cause:{}", cause);
        }));
    }

    private void setReturnCallback(){
        this.rabbitTemplate.setReturnsCallback((returnedMessage)->{
            log.info("replyCode:{}", returnedMessage.getReplyCode());
            log.info("replyText:{}", returnedMessage.getReplyText());
            log.info("message:{}", new String(returnedMessage.getMessage().getBody()));
            log.info("exchange:{}", returnedMessage.getExchange());
            log.info("routeKey:{}", returnedMessage.getRoutingKey());
        });
    }
}

@PostConstruct用于标记一个方法,指示该方法在依赖注入完成之后、初始化方法被调用之前执行。

一个类中可以有多个 @PostConstruct 方法,但它们的执行顺序不保证。如果顺序很重要,应该将逻辑合并到一个方法中。

首先执行正常的发送消息:

image-20250622173503336

image-20250622173530884

可以看到ConfirmCallback正常执行。

然后执行异常的发送消息,路由键设置错误:

image-20250622173643676

image-20250622173829640

可以看到消息正常到达交换机(红色线条标注,ack:true),但是,消息并没有被正常路由到队列(蓝色线条标注),原因是NO_ROUTE,即没有匹配的路由键。

2.2 发送消息重试

在上一节中,我们只是了解到RabbitTemplate可以设置异步回调函数,该设置是针对所有消息的,无法精细配置,这一节介绍如何在回调函数中,根据消息配置进行重试,即如果该消息需要进行生产者确认,并且发送消息失败,如何进行重试或保存”消息发送失败“的记录写到数据库。

首先,自定义CorrelationData,增加三个参数用于重试或记录:

  • boolean requiresConfirmation:用于指示该消息是否需要确认回调;
  • MQSendMessage originalMessage:原始消息,便于调试或重发;
  • int retryNumber:重试次数;
java
@Data
public class MyCorrelationData extends CorrelationData {

    // 最大重试次数
    private static final int MAX_RETRY_NUMBER = 3;

    // 用于指示该消息是否需要确认回调
    private boolean requiresConfirmation;
    // 原始消息,便于调试或重发
    private MQSendMessage originalMessage;
    // 重试次数
    private int retryNumber;

    public MyCorrelationData(boolean requiresConfirmation, MQSendMessage originalMessage){
        super();
        this.requiresConfirmation = requiresConfirmation;
        this.originalMessage = originalMessage;
        this.retryNumber = 0;
    }

    public boolean canRetry(){
        return this.retryNumber < MAX_RETRY_NUMBER;
    }

    public void retry(){
        this.retryNumber++;
    }
}
java
@Data
public class MQSendMessage {
    private String exchange;
    private String routeKey;
    private String message;
}

然后,在发送消息时,如果重要消息需要确保消息可靠性,可以同时发送相关消息:

java
@RestController
public class RabbitMQController {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostMapping("/sendMessage")
    public String sendMessage(@RequestBody MQSendMessage message){

        MyCorrelationData myCorrelationData = new MyCorrelationData(true, message);

        rabbitTemplate.convertAndSend(message.getExchange(),
                message.getRouteKey(),
                message.getMessage(),
                myCorrelationData
        );
        return "ok";
    }
}

TIP

注意,使用RabbitTemplate对象

最后,在回调函数中,通过CorrelationData来判断消息是否需要可靠性保证:

java
@Configuration
@Slf4j
public class RabbitMQConfirmConfig {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void setRabbitTemplateCallback() {
        setConfirmCallback();
    }

    private void setConfirmCallback(){
        this.rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) -> {
            // 相关数据为null,或者不是自定义的相关消息,默认不进行确认
            if(correlationData == null || !(correlationData instanceof MyCorrelationData)){
                return;
            }
            MyCorrelationData myCorrelationData = (MyCorrelationData)correlationData;
            // 不需要确认
            if(!myCorrelationData.isRequiresConfirmation()){
                return;
            }

            if (ack){
                // 消息已成功到达交换机,判断是否到达队列
                if(myCorrelationData.getReturned() != null){
                    // 未到达队列,则记录到日志中
                    log.error("消息未到达队列,returnedMessage: {}", myCorrelationData.getReturned());
                }

            }else{
                // 发送到交换机失败,需要重试或记录
                if(myCorrelationData.canRetry()) {
                    // 未超过重试次数
                    myCorrelationData.retry();
                    rabbitTemplate.convertAndSend(
                            myCorrelationData.getOriginalMessage().getExchange(),
                            myCorrelationData.getOriginalMessage().getRouteKey(),
                            myCorrelationData.getOriginalMessage().getMessage(),
                            myCorrelationData
                    );
                }else{
                    // 已超过重试次数
                    log.error("消息未到达交换机,correlationData: {}", myCorrelationData);
                }
            }
        }));
    }
}

::: warn

注意,这里只设置ConfirmCallback回调,returnCallback通过CorrelationData.getReturned()来判断,如果CorrelationData.getReturned()不为空,表示该消息到达了交换机但是没有到达队列

:::

2.3 测试

首先测试消息到达交换机,但是没有到达队列,只会打印出日志消息:

image-20250622190252789

image-20250622190403709

正常打印出日志。

然后测试消息没有正常到达交换机,只需要在启动Spring Boot程序后,手动删除交换机:

image-20250622190640257

然后在Postman中正常发送消息,查看Spring Boot程序后台日志:

image-20250622190739857

可以看到有四次发送消息的记录(第一次发送,其余三次是重试的),最终发送失败,记录在日志中。

3. 消息持久化

3.1 交换机的持久化

在Spring Boot中声明交换机时,可以指定其持久化特性,例如:

java
public DirectExchange(String name, boolean durable, boolean autoDelete) {
  super(name, durable, autoDelete);
}

其中第二个参数durable表示是否持久化。

如果使用指定交换机名称的构造方法,默认就是持久化的交换机:

java
public DirectExchange(String name) {
  super(name);
}

// super.name()
public AbstractExchange(String name) {
  this(name, true, false);
}

3.2 队列持久化

在声明队列时,同样可以指定是否为持久化的队列:

java
public Queue(String name, boolean durable) {
  this(name, durable, false, false, null);
}

3.3 消息的持久化

默认情况下,使用rabbitTemplate.convertAndSend()方法发送的消息默认就是持久化的,跟踪方法进入:

java
protected Message convertMessageIfNecessary(final Object object) {
  if (object instanceof Message msg) {
    return msg;
  }
  return getRequiredMessageConverter().toMessage(object, new MessageProperties());
}

new MessageProperties()中创建消息属性,其中就有DeliveryMode

java
public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;

默认就是持久化的。

4. 消费者确认

消息确认机制是保证消息可靠性的核心:

  • 没有确认:如果消费者在处理消息前崩溃,或者处理过程中出现异常,消息可能会丢失,因为它已经被 RabbitMQ 从队列中移除(假设是自动确认)。
  • 有了确认:只有当消费者明确告诉 RabbitMQ 消息已经处理完毕后,RabbitMQ 才会将该消息从队列中移除。这确保了即使消费者在处理过程中失败,消息也能被重新投递给其他消费者,或者在消费者恢复后再次处理。

默认情况下,消费者端是自动确认的,在这种模式下,一旦消息被成功投递给消费者(即消费者方法执行完毕且没有抛出异常),Spring AMQP 会自动向 RabbitMQ 发送一个 Acknowledge (ACK) 信号。如果消费者在处理消息时抛出异常,消息会重新入队。

不过,最好还是将确认模式改为手动确认,即在代码中根据需要返回确认信息。

首先需要在配置文件中修改确认模式:

properties
spring.rabbitmq.listener.simple.acknowledge-mode=manual

然后,在 @RabbitListener 方法需要额外接收一个参数:Channel

  • com.rabbitmq.client.Channel: RabbitMQ 客户端的通道,通过它来发送确认指令

Channel 对象提供了几个核心方法来管理消息确认:

  • channel.basicAck(long deliveryTag, boolean multiple)
    • 用途:消费者成功处理消息后,向 RabbitMQ 发送确认信号。
    • deliveryTag:消息的唯一标识符。
    • multiple:布尔值。如果为 true,则表示确认 deliveryTag 之前(包括 deliveryTag 本身)所有未确认的消息。如果为 false,则只确认 deliveryTag 对应的这条消息。通常建议设置为 false,以避免批量确认带来的潜在问题。
  • channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
    • 用途:消费者无法处理消息时(例如,业务逻辑错误),向 RabbitMQ 发送否定确认信号。
    • deliveryTag:消息的唯一标识符。
    • multiple:同 basicAck,通常设为 false
    • requeue:布尔值。
      • 如果为 true:消息会被重新放入队列,以便其他消费者或当前消费者再次处理。
      • 如果为 false:消息不会被重新入队。如果配置了死信交换机(DLX),消息会发送到死信队列;否则,消息将被直接丢弃。
  • channel.basicReject(long deliveryTag, boolean requeue)
    • 用途: 功能与 basicNack 类似,但它不支持 multiple 参数,即只能拒绝单条消息
    • deliveryTag:消息的唯一标识符。
    • requeue:同 basicNack

消息的唯一标识符可以通过消息配置获取:

java
// 获取消息唯一标识符
long deliveryTag = message.getMessageProperties().getDeliveryTag();

因此,最终代码如下:

java
@Slf4j
@Component
public class RabbitMQListener {

    @RabbitListener(queues = {RabbitMQConfig.QUEUE_NAME})
    public void processMessage(Message message, Channel channel) throws IOException {
        // 获取消息唯一标识符
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            // 执行业务逻辑

            // 业务逻辑执行完成,消息确认消费
            channel.basicAck(deliveryTag, false);
        }catch (Exception e){

            // 业务逻辑执行失败,消费消息失败,使消息重新入队
            channel.basicNack(deliveryTag, false, true);
        }
    }
}
  • 对于无法处理的消息,通常不建议无限次地 requeue,因为这可能导致消息循环(Poison Message)。更好的做法是将其发送到死信队列,以便后续人工干预或分析。
  • 如果消息可能因为 requeue 而被多次处理,确保消费者业务逻辑是幂等的,即多次执行相同操作不会产生不同的结果。