Skip to content

RabbitMQ 消费端

本文主要介绍RabbitMQ消费者有关的基础知识和细节。

1. 准备工作

为了更清楚地介绍RabbitMQ消费者的相关特性,这里使用RabbitMQ客户端,而不使用Spring Boot AMQP:

xml
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.25.0</version>
</dependency>

并且,准备名为queue.consumer.test的队列。

2. 工作模式

RabbitMQ消息队列和消费端的工作模式如下图:

image-20250624165337294

  • 在RabbitMQ实例中,可以声明多个消息队列Queue
  • 在单个应用程序中,可以声明多个消费者Consumer
  • 消费者和RabbitMQ之间的联系是通过连接Connection实现的(底层是TCP协议),在Connection之上,还有通道Channel,所以,消费者和RabbitMQ实际是通过通道连接的;
  • 一个通道可以供多个消费者使用;

通过上面的工作模式图,我们可以知道:

  • 如果连接Connection关闭了,那么在该连接Connection上的通道都会变得不可用;
  • 如果通道Channel关闭了,那么使用该通道Channel的消费者也将无法接收到消息;

3. 消费者工作流程

根据上面的工作模式图,我们可以得出下面的工作流程:

  1. 在应用程序中,创建到RabbitMQ的连接Connection

    java
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("127.0.0.1");
    factory.setPort(5672);
    
    Connection connection = factory.newConnection();
  2. 通过连接Connection得到该连接上的通道Channel

    java
    Channel channel = connection.createChannel();
  3. 声明消费者Consumer

    在Java中,我么可以实现Consumer接口来获得消费者实例,但是,为了简单期间,RabbitMQ客户端实现了一个默认的消费者DefaultConsumer,我们只需要重写该默认消费者中的方法即可:

    java
    Consumer consumer = new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body)
                throws IOException
        {
            String message = new String(body, "UTF-8");
            String threadName = Thread.currentThread().getName();
            System.out.println(threadName + " [consumer] Received : '" + message + "'");
    
            this.getChannel().basicAck(envelope.getDeliveryTag(), false);
        }
    };
    • 注意默认消费者的构造方法需要通道实例;
    • handleDelivery()即表示接收到消息的处理方法,其中使用this.getChannel()获取通道用于向RabbitMQ发送确认消息;
  4. 指定消费者监听哪个队列,需要三个参数:队列名Queue、消费者Consumer和通道Channel

    java
    String consumerTag = channel.basicConsume(
            QUEUE_NAME,
            false,
            consumer
    );
    System.out.println("consumerTag:" + consumerTag);
    • 第二个参数表示是否自动确认,这里设置为false表示手动确认;

    consumerTag是消费者标识,可以理解为ID。

完整代码如下:

java
import com.rabbitmq.client.*;
import java.io.IOException;

public class ConsumerTest {
    private final static String QUEUE_NAME = "queue.consumer.test";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException
            {
                String message = new String(body, "UTF-8");
                String threadName = Thread.currentThread().getName();
                System.out.println(threadName + " [consumer] Received : '" + message + "'");

                this.getChannel().basicAck(envelope.getDeliveryTag(), false);
            }
        };

        // 开始监听,接收消息
        String consumerTag = channel.basicConsume(
                QUEUE_NAME,
                false,
                consumer
        );
        System.out.println("consumerTag:" + consumerTag);
    }
}

当启动上述程序,可以在RabbitMQ后台管理界面的队列详情中,看到连接到该队列的消费者:

image-20250624172443168

并且其中的Consumer Tag与Java控制台中打印的相同。

向队列中发送消息,在消费端控制台打印内容如下:

txt
consumerTag:amq.ctag-IqYv-YiWXS5vL9p891NRXA
pool-1-thread-4 [consumer] Received : '123'
pool-1-thread-5 [consumer] Received : '1234'

4. 消费者接口详解

在上面的案例中,是通过重写DefaultConsumer部分方法来创建消费者的,在RabbitMQ 客户端实现库中,消费者是由接口Consumer定义的:

java
public interface Consumer {

    void handleConsumeOk(String consumerTag);

    void handleCancelOk(String consumerTag);

    void handleCancel(String consumerTag) throws IOException;

    void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);

    void handleRecoverOk(String consumerTag);

    void handleDelivery(String consumerTag,
                        Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte[] body)
        throws IOException;
}
  • handleConsumeOk:当消费者通过 Channel.basicConsume() 方法成功注册到 RabbitMQ 队列时被调用。这个方法通知消费者已经成功启动并开始监听队列,可以在这里执行一些初始化操作,比如记录日志,或者确认消费者已就绪;

  • handleCancelOk:当消费者通过显式调用 Channel.basicCancel() 方法成功取消订阅(即停止消费)时被调用。这个方法表示主动发起的消费者取消操作已经成功完成,可以在这里执行一些清理操作,比如释放资源。

  • handleCancel:当消费者被非显式地取消订阅时被调用,这意味着消费者不是通过调用 Channel.basicCancel() 而被取消的。常见原因: 队列被删除(Queue Deleted)、绑定被解除、或者其他服务器端的原因导致消费者被移除;

    这是一个重要的回调,它表示消费者意外地停止了,需要在这里处理这种情况,可能需要重新注册消费者,或者记录详细日志以进行故障排除。

  • handleShutdownSignal:当通道(Channel)或底层连接(Connection)因为某些原因被关闭时被调用。其中的参数 ShutdownSignalException 对象,它包含了关闭的原因(例如,连接丢失、通道被服务器关闭、认证失败等)。

    这是处理连接或通道中断的核心方法,当连接或通道出现问题时,此方法会提供详细的关闭原因,应该在此处进行错误日志记录,并根据 sig 中的信息判断是否需要进行重连或其他恢复操作。

  • handleRecoverOk:当收到 basic.recover-ok 命令时被调用,这是对之前发送的 basic.recover 命令的响应。

    basic.recover 命令用于请求 RabbitMQ 重新发送所有未确认的消息。当 RabbitMQ 成功处理了 basic.recover 命令并开始重新投递消息时,会返回``basic.recover-ok,消费者就会调用此方法。这意味着在此之前收到的所有未确认消息都重新投递了。

    流程如下:

    txt
    消费者端调用Channel.basicRecover()   --->  RabbitMQ收到basic.recover命令,重新投递消息,返回basic.recover-ok    ---->   消费者端接收到basic.recover-ok,handleRecoverOk方法被调用
  • handleDelivery:当消费者收到一条来自队列的消息时被调用,这是处理业务逻辑的核心方法。

    参数解释如下:

    • consumerTag: 收到消息的消费者的标签。

    • envelope: 包含消息的封装信息,例如 deliveryTag(消息的唯一标识,用于确认)、redeliver 标志(是否为重投递消息)、exchangeroutingKey 等。

    • properties: 消息的 AMQP 基本属性,例如 contentTypeheaderscorrelationIdreplyTo 等。

    • body: 消息的实际内容,以字节数组的形式表示。

    这是编写消息处理业务逻辑的地方,将在这里解析 body,根据 propertiesenvelope 中的信息进行相应的处理。在处理完消息后,如果使用的是手动确认模式(AcknowledgeMode.MANUAL),需要在方法内部调用 channel.basicAck()channel.basicNack() 来确认或拒绝消息。

5. 消费者注销

我们可以调用如下方法注销消费者,这样就不会再接收到消息:

java
channel.basicCancel(consumerTag);

传入消费者标记。

最后,关闭通道和连接即可:

java
channel.close();
connection.close();

6. 消费者确认超时

当确认模式改为手动MANUAL后,如果消费者一直没有返回确认消息(代码中忘记写了、网络超时等原因),RabbitMQ会关闭通道,并发送PRECONDITION_FAILED通道异常信息给消费者,具体信息如下:

txt
channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 61000 ms. This timeout value can be configured, see consumers doc guide to learn more, class-id=0, method-id=0)

如果返回这种消息,那么表示通道已关闭,之后消费者就接收不到消息了。

在amqp-client依赖库中,有自动重连机制,但这是针对连接Connection的,所以通道关闭并不会触发重连

默认的超时时间是30分钟,1分钟以下的超时时间不被支持,不建议设置5分钟以下的超时时间。在RabbitMQ中,是以每分钟的间隔来检查消息确认是否超时的。

rabbitmq.conf中可以修改超时时间:

properties
# 30 minutes in milliseconds
consumer_timeout = 1800000

在创建队列时,也可以设置超时时间,参数为x-consumer-timeout

7. prefetch

在 RabbitMQ 中,prefetch(预取计数)是一个非常重要的 QoS (Quality of Service) 设置,它决定了 RabbitMQ 服务器一次性发送给单个消费者的未确认消息的最大数量。

在Java中,配置prefetch的方法如下:

  • channel.basicQos(int prefetchCount):配置消费者的预取数量;
  • channel.basicQos(int prefetchCount, boolean global):如果global为true,表示配置通道的预取数量;

注意,prefetchCount的值应该在0-65535之间,0表示不限制预取消息数量。

例1:

java
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // 每个消费者的限制
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

例2:

java
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // 每个消费者的限制
channel.basicQos(15, true);  // 整个通道的限制
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

如果同时配置了消费者和通道的限制,那么会同时生效。假设多个消费者使用同一条通道,那么某个消费者获取的消息数量过多,那么其他消费者可能就获取不到消息了。

当消费者从 RabbitMQ 队列中接收消息时,prefetch 值控制了以下行为:

  1. 限制未确认消息的数量:
    • prefetch 设置了一个上限,表示一个消费者在尚未对已接收的消息进行确认 (acknowledgement) 之前,最多可以从队列中获取多少条消息。
    • 只有当消费者确认(或拒绝)了部分消息,使得未确认消息的数量低于 prefetch 限制时,RabbitMQ 才会继续向该消费者发送新的消息。
  2. 实现公平的消息分发 (Fair Dispatch):
    • 当有多个消费者连接到同一个队列时,prefetch 值对于消息的均匀分发至关重要。
    • 如果 prefetch 设置为 1 这是最公平的设置。RabbitMQ 会在每个消费者处理并确认完一条消息后,才向它发送下一条消息。这确保了每条消息都被立即发送给“空闲”的消费者,从而在处理时间不均匀的消息时,实现最佳的负载均衡。任何一个慢的消费者都不会“霸占”大量的消息,导致其他快速的消费者无事可做。
    • 如果 prefetch 设置得很高 (或默认的 0 即无限制): RabbitMQ 会尽可能快地向任何有能力的消费者推送消息,直到通道的缓冲区或消费者自身的内存满载。这可能导致一个消费者一次性拉取了队列中的大部分甚至所有消息,即使它处理速度很慢,而其他消费者却处于空闲状态,无法分担负载。这被称为 "Fast Consumer Starvation" (快消费者饥饿)
  3. 防止消费者内存溢出 (OOM) 或资源耗尽:
    • 如果没有 prefetch 限制,一个快速的生产者可能在短时间内向队列发送大量消息,而一个消费者可能会一次性拉取所有这些消息到其本地内存中。如果消息体很大或者消息数量巨大,这很容易导致消费者应用程序内存溢出 (OOM) 或其他资源耗尽。
    • prefetch 限制了消费者本地缓冲的消息数量,从而有效控制了内存使用和资源消耗。
  4. 优化网络流量和吞吐量:
    • 较小的 prefetch 值(如 1)可能导致更多的网络往返(每次确认一条消息后拉取一条),从而可能在某些高吞吐量场景下引入一些延迟。
    • 适当增大 prefetch 值(例如 1020),可以让 RabbitMQ 一次发送多条消息,减少了网络往返次数,从而在消费者处理能力足够强的情况下,提高整体吞吐量。但过高的值又会回到负载不均的问题。

prefetch 的作用只在 手动确认模式 (Manual AcknowledgeMode) 下才能真正体现出来。

  • AcknowledgeMode.MANUAL (手动确认): 这是 prefetch 发挥作用的场景。消费者必须显式调用 channel.basicAck() 来确认消息。prefetch 限制了在这些确认发生之前可以发送的消息数量。
  • AcknowledgeMode.AUTO (自动确认): 在自动确认模式下,RabbitMQ 在消息发送给消费者后立即将其标记为已处理,无需等待消费者确认。因此,prefetch 设置将失去意义,因为消息一发送就被“确认”了,prefetch 机制无法限制未确认消息的数量。

在Spring Boot中,设置prefetch如下:

properties
spring.rabbitmq.listener.simple.prefetch=1