Skip to content

RabbitMQ 队列特性介绍

本文将介绍队列相关特性,包括队列长度、消息超时、死信与死信队列、延迟队列和优先级队列。

1. 队列长度

我们在创建队列时,可以设置队列最大长度或容量:

  • 队列长度:表示该队列最多可以容纳多少条消息;
  • 队列容量:表示该队列消息所占有的最大空间,以字节为单位;

当达到队列最大长度或容量时,默认的溢出处理是抛弃队列头的消息,即入队时间最早的,我们也可以更改溢出处理策略。

两者可以同时设置。

在Java中,可以在创建队列时,提供参数指定队列的最大长度或容量,如下:

java
@Bean
public Queue queue() {
    Map<String, Object> args = new HashMap<>();
    // 队列最大长度
    args.put("x-max-length", 10);
    // 队列最大容量
    args.put("x-max-length-bytes", 1024*10);
    // 队列溢出处理策略
    args.put("x-overflow", "drop-head");

    // 创建队列
    return new Queue(QUEUE_NAME, true, false, false, args);
}

队列溢出处理策略x-overflow的可选值如下:

  • drop-head:默认值,队列头部的(最旧的)消息会被丢弃,消息发送方不会收到任何错误或通知,因为消息已经成功到达 RabbitMQ 并被添加到队列中。

  • reject-publish:当队列达到最大长度限制时,任何尝试将新消息发布到该队列的请求都会被 拒绝

    对于生产者:如果生产者使用了事务(transactions)或者发布确认(publisher confirms),它会收到一个指示消息无法被路由到队列的信号。

    对于RabbitMQ:消息不会进入队列。

  • reject-publish-dlx:当队列达到最大长度限制时,尝试发布新消息到该队列的请求同样会被拒绝,但这些被拒绝的消息会被路由到死信交换机 (Dead Letter Exchange, DLX),而不是直接丢弃。

    对于生产者:与 reject-publish 类似,生产者会收到失败通知(通过事务或发布确认)。

    对于RabbitMQ:被拒绝的消息会被 RabbitMQ 发送到队列配置的死信交换机,进而进入死信队列。

启动Spring Boot程序,可以看到设置成功:

image-20250623135649401

测试如下:

java
@PostMapping("/sendMessage")
public String sendMessage(@RequestBody MQSendMessage message){

    MyCorrelationData myCorrelationData = new MyCorrelationData(true, message);

    int number = 1;
    if(message.getNumber() > 1){
        number = message.getNumber();
    }
    for (int i = 1; i <= number; i++) {
        rabbitTemplate.convertAndSend(message.getExchange(),
                message.getRouteKey(),
                i + ": " + message.getMessage(),
                myCorrelationData
        );
    }

    return "ok";
}
java
@Data
public class MQSendMessage {
    private String exchange;
    private String routeKey;
    private String message;
    private int number;
}

TIP

在向队列发送消息前,关闭消费者

向队列发送15条消息:

image-20250623141007167

结果发现只有10条消息进入了队列。

2. 消息超时

消息超时是指当一条消息在队列中存在超过一定时长(TTL,即Time-To-Live)后,该条消息就会过期并抛弃。

TTL的设置分为两个维度:

  • 队列维度:进入该队列的消息都会有一个TTL时长;
  • 消息维度:针对单条消息,只有设置了TTL的消息才会过期;

如果要为队列设置TTL,在创建队列时指定相关参数:

java
@Bean
public Queue queue() {
    Map<String, Object> args = new HashMap<>();
    // 队列最大长度
    args.put("x-max-length", 10);
    // 队列最大容量
    args.put("x-max-length-bytes", 1024*10);
    // 队列溢出处理
    args.put("x-overflow", "drop-head");
    // 队列TTL,以毫秒为单位
    args.put("x-message-ttl", 6000);

    // 创建队列
    return new Queue(QUEUE_NAME, true, false, false, args);
}

如果要为消息设置TTL,在发送消息时指定MessagePostProcessor

java
@PostMapping("/sendMessage")
public String sendMessage(@RequestBody MQSendMessage mqSendMessage){

    MyCorrelationData myCorrelationData = new MyCorrelationData(true, mqSendMessage);

    MessagePostProcessor messagePostProcessor = message -> {
        // 指定该消息的TTL
        message.getMessageProperties().setExpiration("12000");

        return message;
    };

    rabbitTemplate.convertAndSend(mqSendMessage.getExchange(),
            mqSendMessage.getRouteKey(),
            mqSendMessage.getMessage(),
            messagePostProcessor,
            myCorrelationData
    );

    return "ok";
}

MessagePostProcessor是一个函数式接口,用于在消息转换完成后,修改消息或为消息增加属性,其方法如下,接收一个转换后的消息,并返回修改后的消息:

java
Message postProcessMessage(Message message) throws AmqpException;

队列的TTL和消息的TTL可以同时存在,以最短的TTL为有效TTL。

3. 死信与死信队列

3.1 概述

什么是死信?当一个消息无法被消费,它就变成了死信。

死信产生的原因有以下三种:

  • 拒绝:消费者拒绝消息,调用basicNack()basicReject()方法,并且不把消息重新放入原目标队列requeue=false,该消息成为死信;
  • 溢出:当超过队列最大长度时,队列头的消息成为死信;
  • 超时:消息到达超时时间未被消费,成为死信;

死信的处理方式有如下三种:

  • 丢弃:对不重要的消息直接丢弃,不做处理;
  • 入库:把死信写入数据库,日后处理;
  • 监听:消息变成死信后进入死信队列,专门设置消费者监听死信队列,做后续处理。这也是通常采用的做法;

3.2 创建与配置死信队列

首先,创建死信交换机与死信队列,并绑定死信交换机与死信队列,这与创建一般的交换机和队列没有什么不同:

java
public static final String DEAD_EXCHANGE_NAME = "exchange.dead.letter";
public static final String DEAD_QUEUE_NAME_USER = "queue.dead.letter.user";
public static final String ROUTING_KEY_DEAD_USER = "routing.dead.letter.user";

@Bean
public DirectExchange deadDirectExchange(){
    return new DirectExchange(DEAD_EXCHANGE_NAME);
}

@Bean
public Queue userDeadQueue(){
    return new Queue(DEAD_QUEUE_NAME_USER);
}

@Bean
public Binding userDeadQueueBinding(Queue userDeadQueue, DirectExchange deadDirectExchange) {
    return BindingBuilder.bind(userDeadQueue).to(deadDirectExchange).with(ROUTING_KEY_DEAD_USER);
}

TIP

一般使用DirectExchange作为死信交换机。

然后,创建业务队列时,通过参数配置死信交换机和死信路由:

java
@Bean
public Queue queue() {
    Map<String, Object> args = new HashMap<>();
    // 队列最大长度
    args.put("x-max-length", 10);
    // 队列最大容量
    args.put("x-max-length-bytes", 1024*10);
    // 队列溢出处理
    args.put("x-overflow", "drop-head");
    // 队列TTL,以毫秒为单位
    args.put("x-message-ttl", 6000);

    // 关键点1: 配置死信交换机
    args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
    // 关键点2: (可选) 配置死信路由键,如果不设置,则使用原消息的路由键
    args.put("x-dead-letter-routing-key", ROUTING_KEY_DEAD_USER);


    // 创建队列
    return new Queue(QUEUE_NAME, true, false, false, args);
}

这两项设置的意思是:当业务队列中的消息成为死信后,将这个死信发送给死信交换机,并且使用配置的死信路由键,这样死信交换机就可以将死信路由到不同的死信队列。

3.3 测试

3.3.1 超时

由于上面创建业务队列时,设置了队列的超时时间,所以只需要向业务队列中发送一条消息,然后一直不消费。

最后,该条消息会变为死信,此时查看死信队列:

image-20250623152224828

可以发现死信队列中有一条消息,并且就是我们发送的那条消息:

image-20250623152402545

3.3.2 拒绝

修改消费者确认逻辑,当消息未消费成功时,不再次进行入队:

java
@Slf4j
@Component
public class RabbitMQListener {

    @RabbitListener(queues = {RabbitMQConfig.QUEUE_NAME})
    public void processMessage(Message message, Channel channel) throws IOException {
        // 获取消息唯一标识符
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            // 执行业务逻辑
            int data = Integer.valueOf(new String(message.getBody()));
            int i = 10 / data;

            // 业务逻辑执行完成,消息确认消费
            channel.basicAck(deliveryTag, false);
        }catch (Exception e){
            log.error(e.getMessage(), e);
            // 业务逻辑执行失败,消费消息失败,不重新入队,最终消息会进入死信队列
            channel.basicNack(deliveryTag, false, false);
        }
    }

}

发送消息如下:

image-20250623152730511

最终在死信队列中找到我们发送的消息:

image-20250623152614830

3.3.3 溢出

由于之前设置了业务队列最大长度为10,所以我们一次性发送11条消息,查看是否有1条消息因为溢出,被放入了死信队列。

在死信队列中,确实有一条消息,是因为溢出的原因成为死信的:

image-20250623153311739

3.4 监听死信队列

为了监听死信队列,可以再开启一个监听器,用于处理死信消息:

java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DeadLetterConsumer {

    // 监听死信队列
    @RabbitListener(queues = RabbitMQDeadLetterConfig.DEAD_QUEUE_NAME_USER)
    public void receiveDeadLetterMessage(String deadMessage) {
        System.out.println("--- 收到死信消息: " + deadMessage + " ---");
        // 在这里处理死信消息,例如:
        // 1. 记录日志
        // 2. 发送告警通知
        // 3. 存储到数据库供后续分析
        // 4. 在修正问题后,重新发布到业务队列进行重试 (注意幂等性)
    }
}

4. 延迟队列

4.1 概念与方案

延迟队列,就是指一条消息进入队列后,消费者并不能马上进行处理,而是延迟一段时间后,消费者才能处理该条消息。

延迟队列的实现有两种方案:

  • 方案一:业务队列TTL+死信队列
  • 方案二:使用插件延时交换机rabbitmq_delayed_message_exchange

我们主要使用第二个方案,此处介绍第一个方案。

4.2 方案一介绍

使用业务队列TTL+死信队列实现延时队列,流程如下:

image-20250623161640698

  • 首先创建一个带超时时间的正常业务队列,然后创建一个死信队列,将正常队列的死信发送到死信队列;
  • 生产者发送消息到正常队列;
  • 消费者监听死信队列;
  • 正常队列中的消息由于超时,发送到死信队列,消费者才能接收到消息,从而实现了延迟的效果;

4.3 方案二实现

4.3.1 环境准备

首先下载插件,地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange,在Releases中可以下载。

下载完成后,需要将插件放入RabbitMQ插件目录中,使用下面的命令查看插件目录:

bash
rabbitmq-plugins directories -s

image-20250623160923209

可以看到插件目录在/opt/rabbitmq/plugins,由于我们是Docker启动的RabbitMQ,之前没有做目录映射,所以需要删除旧的,重新启动一个,命令如下:

bash
docker run -d --name rabbitmq -v /projects/docker/rabbitmq:/opt/rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4-management

启动完成后,将下载的插件放入插件目录即可。

然后进入Docker,运行如下命令启用插件:

bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

image-20250623160835258

查看已启动的插件:

image-20250623162421290

4.3.2 创建延迟交换机

java
@Configuration
public class RabbitDelayedQueueConfig {

    public static final String EXCHANGE_DELAYED_NAME = "exchange.delayed";
    public static final String QUEUE_DELAYED_NAME = "queue.delayed";
    public static final String ROUTING_DELAYED = "routing.delayed";

    @Bean
    public CustomExchange delayedExchange(){
        Map<String, Object> args = new HashMap<String, Object>();
     	  // 关键点:指定其底层交换机类型,例如 "direct", "topic", "fanout"
        args.put("x-delayed-type", "direct");
	
      	// 创建一个自定义的交换机 CustomExchange
      	// 第二个参数表示自定义交换机类型,如果是延迟交换机,则为 x-delayed-message
        return new CustomExchange(EXCHANGE_DELAYED_NAME,
                "x-delayed-message",
                true,
                false,
                args);
    }

    @Bean
    public Queue delayedQueue(){
        return new Queue(QUEUE_DELAYED_NAME);
    }


    @Bean
    public Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue)
                .to(delayedExchange)
                .with(ROUTING_DELAYED)
                .noargs();
    }
}

image-20250623165422635

4.3.3 发送延迟消息

如果要设置消息的延迟时间,同样在MessagePostProcessor中进行设置:

java
@PostMapping("/sendDelayedMessage")
public String sendDelayedMessage(@RequestBody MQSendMessage mqSendMessage){
    MessagePostProcessor messagePostProcessor = message -> {
        message.getMessageProperties().setDelayLong((long) mqSendMessage.getDelayed());
      return message;
    };

    rabbitTemplate.convertAndSend(mqSendMessage.getExchange(),
            mqSendMessage.getRouteKey(),
            LocalTime.now() + "  " + mqSendMessage.getMessage(),
            messagePostProcessor
    );
    return "ok";
}

此处为了显示延迟效果,在发送消息时,增加了当前时间。

java
@Data
public class MQSendMessage {
    private String exchange;
    private String routeKey;
    private String message;
    private int number;
    // 消息延迟时间,以毫秒为单位
    private int delayed;
}

4.3.4 监听延迟消息

java
@RabbitListener(queues = {RabbitDelayedQueueConfig.QUEUE_DELAYED_NAME})
public void processDelayedMessage(Message message, Channel channel) throws IOException {
    // 获取消息唯一标识符
    long deliveryTag = message.getMessageProperties().getDeliveryTag();

    try {
        // 执行业务逻辑
        System.out.println("---------------");
        System.out.println("消费者当前时间:" + LocalTime.now());
        System.out.println(new String(message.getBody()));
        System.out.println("---------------");

        // 业务逻辑执行完成,消息确认消费
        channel.basicAck(deliveryTag, false);
    }catch (Exception e){
        log.error(e.getMessage(), e);
        // 业务逻辑执行失败,消费消息失败,不重新入队
        channel.basicNack(deliveryTag, false, false);
    }
}

监听延迟队列的处理器,仅仅是简单打印当前时间和消息内容。

4.3.5 测试

image-20250623165842433

控制台打印结果如下:

txt
---------------
消费者当前时间:16:55:10.609444
16:55:05.583761  test delayed queue
---------------

可见确实有5秒钟的延迟。

5. 优先级队列

优先级队列(Priority Queue)是一种特殊的队列,它不像传统队列那样严格遵循“先进先出”(FIFO)的原则。相反,优先级队列中的元素会根据它们的优先级进行排列和处理。优先级最高的元素总是最先被取出

RabbitMQ中的优先级队列工作原理如下:

  • 队列级别配置:需要在声明队列时指定其支持的最大优先级数。例如,如果设置 x-max-priority 为 10,那么该队列将能处理从 0 到 10 的优先级(0 是最低优先级)。

  • 消息级别设置:生产者在发送消息时,可以在消息的属性中设置 priority 字段(一个整数值)。

  • 消费者优先获取:RabbitMQ 会尽力确保优先级较高的消息会比优先级较低的消息更早地被投递给消费者,即使优先级较低的消息先到达队列。

  • 资源消耗:实现优先级队列需要额外的计算和内存开销。RabbitMQ 会维护内部数据结构来高效地管理带有优先级的消息。因此,不建议将 x-max-priority 设置得过高,通常 1 到 10 已经足够满足大多数场景的需求。

首先创建队列时设置优先级:

java
@Bean
public Queue priorityQueue() {
    Map<String, Object> args = new HashMap<>();
    // 队列最大优先级
    args.put("x-max-priority", 10);

    // 创建队列
    return new Queue(QUEUE_PRIORITY_NAME, true, false, false, args);
}

然后,发送消息时,设置消息的优先级:

java
@PostMapping("/sendPriorityMessage")
public String sendPriorityMessage(@RequestBody MQSendMessage mqSendMessage){
    for (int i = 0; i < 5; i++) {
        int finalI = i;
        MessagePostProcessor messagePostProcessor = message -> {
            message.getMessageProperties().setPriority(finalI);
            return message;
        };

        rabbitTemplate.convertAndSend(mqSendMessage.getExchange(),
                mqSendMessage.getRouteKey(),
                LocalTime.now() + "优先级 [" + i + "]" + mqSendMessage.getMessage(),
                messagePostProcessor
        );
    }

    return "ok";
}

最后,在消息处理器中打印消息内容,查看获取消息的顺序,结果如下:

txt
17:17:36.547526优先级 [4]test priority queue
17:17:36.547436优先级 [3]test priority queue
17:17:36.543232优先级 [2]test priority queue
17:17:36.540100优先级 [1]test priority queue
17:17:36.529563优先级 [0]test priority queue

可以发现,优先级高的消息先被处理。