Appearance
RabbitMQ 消费端
本文主要介绍RabbitMQ消费者有关的基础知识和细节。
1. 准备工作
为了更清楚地介绍RabbitMQ消费者的相关特性,这里使用RabbitMQ客户端,而不使用Spring Boot AMQP:
xml
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.25.0</version>
</dependency>并且,准备名为queue.consumer.test的队列。
2. 工作模式
RabbitMQ消息队列和消费端的工作模式如下图:

- 在RabbitMQ实例中,可以声明多个消息队列
Queue; - 在单个应用程序中,可以声明多个消费者
Consumer; - 消费者和RabbitMQ之间的联系是通过连接
Connection实现的(底层是TCP协议),在Connection之上,还有通道Channel,所以,消费者和RabbitMQ实际是通过通道连接的; - 一个通道可以供多个消费者使用;
通过上面的工作模式图,我们可以知道:
- 如果连接
Connection关闭了,那么在该连接Connection上的通道都会变得不可用; - 如果通道
Channel关闭了,那么使用该通道Channel的消费者也将无法接收到消息;
3. 消费者工作流程
根据上面的工作模式图,我们可以得出下面的工作流程:
在应用程序中,创建到RabbitMQ的连接
Connection;javaConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); Connection connection = factory.newConnection();通过连接
Connection得到该连接上的通道Channel;javaChannel channel = connection.createChannel();声明消费者
Consumer;在Java中,我么可以实现
Consumer接口来获得消费者实例,但是,为了简单期间,RabbitMQ客户端实现了一个默认的消费者DefaultConsumer,我们只需要重写该默认消费者中的方法即可:javaConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); String threadName = Thread.currentThread().getName(); System.out.println(threadName + " [consumer] Received : '" + message + "'"); this.getChannel().basicAck(envelope.getDeliveryTag(), false); } };- 注意默认消费者的构造方法需要通道实例;
handleDelivery()即表示接收到消息的处理方法,其中使用this.getChannel()获取通道用于向RabbitMQ发送确认消息;
指定消费者监听哪个队列,需要三个参数:队列名
Queue、消费者Consumer和通道Channel;javaString consumerTag = channel.basicConsume( QUEUE_NAME, false, consumer ); System.out.println("consumerTag:" + consumerTag);- 第二个参数表示是否自动确认,这里设置为
false表示手动确认;
consumerTag是消费者标识,可以理解为ID。- 第二个参数表示是否自动确认,这里设置为
完整代码如下:
java
import com.rabbitmq.client.*;
import java.io.IOException;
public class ConsumerTest {
private final static String QUEUE_NAME = "queue.consumer.test";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String message = new String(body, "UTF-8");
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " [consumer] Received : '" + message + "'");
this.getChannel().basicAck(envelope.getDeliveryTag(), false);
}
};
// 开始监听,接收消息
String consumerTag = channel.basicConsume(
QUEUE_NAME,
false,
consumer
);
System.out.println("consumerTag:" + consumerTag);
}
}当启动上述程序,可以在RabbitMQ后台管理界面的队列详情中,看到连接到该队列的消费者:

并且其中的Consumer Tag与Java控制台中打印的相同。
向队列中发送消息,在消费端控制台打印内容如下:
txt
consumerTag:amq.ctag-IqYv-YiWXS5vL9p891NRXA
pool-1-thread-4 [consumer] Received : '123'
pool-1-thread-5 [consumer] Received : '1234'4. 消费者接口详解
在上面的案例中,是通过重写DefaultConsumer部分方法来创建消费者的,在RabbitMQ 客户端实现库中,消费者是由接口Consumer定义的:
java
public interface Consumer {
void handleConsumeOk(String consumerTag);
void handleCancelOk(String consumerTag);
void handleCancel(String consumerTag) throws IOException;
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
void handleRecoverOk(String consumerTag);
void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException;
}handleConsumeOk:当消费者通过Channel.basicConsume()方法成功注册到 RabbitMQ 队列时被调用。这个方法通知消费者已经成功启动并开始监听队列,可以在这里执行一些初始化操作,比如记录日志,或者确认消费者已就绪;handleCancelOk:当消费者通过显式调用Channel.basicCancel()方法成功取消订阅(即停止消费)时被调用。这个方法表示主动发起的消费者取消操作已经成功完成,可以在这里执行一些清理操作,比如释放资源。handleCancel:当消费者被非显式地取消订阅时被调用,这意味着消费者不是通过调用Channel.basicCancel()而被取消的。常见原因: 队列被删除(Queue Deleted)、绑定被解除、或者其他服务器端的原因导致消费者被移除;这是一个重要的回调,它表示消费者意外地停止了,需要在这里处理这种情况,可能需要重新注册消费者,或者记录详细日志以进行故障排除。
handleShutdownSignal:当通道(Channel)或底层连接(Connection)因为某些原因被关闭时被调用。其中的参数ShutdownSignalException对象,它包含了关闭的原因(例如,连接丢失、通道被服务器关闭、认证失败等)。这是处理连接或通道中断的核心方法,当连接或通道出现问题时,此方法会提供详细的关闭原因,应该在此处进行错误日志记录,并根据
sig中的信息判断是否需要进行重连或其他恢复操作。handleRecoverOk:当收到basic.recover-ok命令时被调用,这是对之前发送的basic.recover命令的响应。basic.recover命令用于请求 RabbitMQ 重新发送所有未确认的消息。当 RabbitMQ 成功处理了basic.recover命令并开始重新投递消息时,会返回``basic.recover-ok,消费者就会调用此方法。这意味着在此之前收到的所有未确认消息都重新投递了。流程如下:
txt消费者端调用Channel.basicRecover() ---> RabbitMQ收到basic.recover命令,重新投递消息,返回basic.recover-ok ----> 消费者端接收到basic.recover-ok,handleRecoverOk方法被调用handleDelivery:当消费者收到一条来自队列的消息时被调用,这是处理业务逻辑的核心方法。参数解释如下:
consumerTag: 收到消息的消费者的标签。envelope: 包含消息的封装信息,例如deliveryTag(消息的唯一标识,用于确认)、redeliver标志(是否为重投递消息)、exchange和routingKey等。properties: 消息的 AMQP 基本属性,例如contentType、headers、correlationId、replyTo等。body: 消息的实际内容,以字节数组的形式表示。
这是编写消息处理业务逻辑的地方,将在这里解析
body,根据properties和envelope中的信息进行相应的处理。在处理完消息后,如果使用的是手动确认模式(AcknowledgeMode.MANUAL),需要在方法内部调用channel.basicAck()或channel.basicNack()来确认或拒绝消息。
5. 消费者注销
我们可以调用如下方法注销消费者,这样就不会再接收到消息:
java
channel.basicCancel(consumerTag);传入消费者标记。
最后,关闭通道和连接即可:
java
channel.close();
connection.close();6. 消费者确认超时
当确认模式改为手动MANUAL后,如果消费者一直没有返回确认消息(代码中忘记写了、网络超时等原因),RabbitMQ会关闭通道,并发送PRECONDITION_FAILED通道异常信息给消费者,具体信息如下:
txt
channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 61000 ms. This timeout value can be configured, see consumers doc guide to learn more, class-id=0, method-id=0)如果返回这种消息,那么表示通道已关闭,之后消费者就接收不到消息了。
在amqp-client依赖库中,有自动重连机制,但这是针对连接Connection的,所以通道关闭并不会触发重连
默认的超时时间是30分钟,1分钟以下的超时时间不被支持,不建议设置5分钟以下的超时时间。在RabbitMQ中,是以每分钟的间隔来检查消息确认是否超时的。
在rabbitmq.conf中可以修改超时时间:
properties
# 30 minutes in milliseconds
consumer_timeout = 1800000在创建队列时,也可以设置超时时间,参数为x-consumer-timeout。
7. prefetch
在 RabbitMQ 中,prefetch(预取计数)是一个非常重要的 QoS (Quality of Service) 设置,它决定了 RabbitMQ 服务器一次性发送给单个消费者的未确认消息的最大数量。
在Java中,配置prefetch的方法如下:
channel.basicQos(int prefetchCount):配置消费者的预取数量;channel.basicQos(int prefetchCount, boolean global):如果global为true,表示配置通道的预取数量;
注意,prefetchCount的值应该在0-65535之间,0表示不限制预取消息数量。
例1:
java
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // 每个消费者的限制
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);例2:
java
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // 每个消费者的限制
channel.basicQos(15, true); // 整个通道的限制
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);如果同时配置了消费者和通道的限制,那么会同时生效。假设多个消费者使用同一条通道,那么某个消费者获取的消息数量过多,那么其他消费者可能就获取不到消息了。
当消费者从 RabbitMQ 队列中接收消息时,prefetch 值控制了以下行为:
- 限制未确认消息的数量:
prefetch设置了一个上限,表示一个消费者在尚未对已接收的消息进行确认 (acknowledgement) 之前,最多可以从队列中获取多少条消息。- 只有当消费者确认(或拒绝)了部分消息,使得未确认消息的数量低于
prefetch限制时,RabbitMQ 才会继续向该消费者发送新的消息。
- 实现公平的消息分发 (Fair Dispatch):
- 当有多个消费者连接到同一个队列时,
prefetch值对于消息的均匀分发至关重要。 - 如果
prefetch设置为1: 这是最公平的设置。RabbitMQ 会在每个消费者处理并确认完一条消息后,才向它发送下一条消息。这确保了每条消息都被立即发送给“空闲”的消费者,从而在处理时间不均匀的消息时,实现最佳的负载均衡。任何一个慢的消费者都不会“霸占”大量的消息,导致其他快速的消费者无事可做。 - 如果
prefetch设置得很高 (或默认的0即无限制): RabbitMQ 会尽可能快地向任何有能力的消费者推送消息,直到通道的缓冲区或消费者自身的内存满载。这可能导致一个消费者一次性拉取了队列中的大部分甚至所有消息,即使它处理速度很慢,而其他消费者却处于空闲状态,无法分担负载。这被称为 "Fast Consumer Starvation" (快消费者饥饿)。
- 当有多个消费者连接到同一个队列时,
- 防止消费者内存溢出 (OOM) 或资源耗尽:
- 如果没有
prefetch限制,一个快速的生产者可能在短时间内向队列发送大量消息,而一个消费者可能会一次性拉取所有这些消息到其本地内存中。如果消息体很大或者消息数量巨大,这很容易导致消费者应用程序内存溢出 (OOM) 或其他资源耗尽。 prefetch限制了消费者本地缓冲的消息数量,从而有效控制了内存使用和资源消耗。
- 如果没有
- 优化网络流量和吞吐量:
- 较小的
prefetch值(如1)可能导致更多的网络往返(每次确认一条消息后拉取一条),从而可能在某些高吞吐量场景下引入一些延迟。 - 适当增大
prefetch值(例如10或20),可以让 RabbitMQ 一次发送多条消息,减少了网络往返次数,从而在消费者处理能力足够强的情况下,提高整体吞吐量。但过高的值又会回到负载不均的问题。
- 较小的
prefetch 的作用只在 手动确认模式 (Manual AcknowledgeMode) 下才能真正体现出来。
AcknowledgeMode.MANUAL(手动确认): 这是prefetch发挥作用的场景。消费者必须显式调用channel.basicAck()来确认消息。prefetch限制了在这些确认发生之前可以发送的消息数量。AcknowledgeMode.AUTO(自动确认): 在自动确认模式下,RabbitMQ 在消息发送给消费者后立即将其标记为已处理,无需等待消费者确认。因此,prefetch设置将失去意义,因为消息一发送就被“确认”了,prefetch机制无法限制未确认消息的数量。
在Spring Boot中,设置prefetch如下:
properties
spring.rabbitmq.listener.simple.prefetch=1