Appearance
RabbitMQ 使用案例
本文介绍RabbitMQ的5种使用方式。
1. 引入依赖
本文只讲解RabbitMQ单独使用的例子,所以引入下面依赖:
xml
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.25.0</version>
</dependency>在讲解下面的案例之前,先明确以下观点:
- 生产者可以把消息放进交换机(exchange),也可以把消息直接放进队列(queue);需要注意的是,交换机不存放消息,放进交换机的消息最终会路由到队列;
- 消费者消费消息,是由消息队列将消息推送过来的,而不是消费者去拉取消息;
2. Hello World
最简单的模型就是一个生产者往队列中发送消息,一个消费者消费队列的消息:

P:生产者C:消费者hello:消息,黄色方框标识队列
生产者代码:
java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
// 队列名
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// 1. 创建连接工程,设置消息队列地址
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
// 2. 获取连接、通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 3. 创建队列【非必要】
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4. 发送消息
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}消费者代码:
java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 当收到消息后,业务逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 开始监听,接收消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}运行消费者代码,会发现在控制台成功打印出生产者产生的消息,并且消费者程序不会退出,持续运行接收消息。
3. Work Queues
Work Queues案例是指多个消费者监听同一个队列,他们是竞争对手,即一条消息只能被一个消费者消费:

生产者代码:
java
public class Producer {
// 队列名
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// 1. 创建连接工程,设置消息队列地址
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
// 2. 获取连接、通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 3. 创建队列【非必要】
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4. 发送消息
for (int i = 0; i < 10; i++) {
String message = "["+ (i+1) +"] Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
}
}
}生产者代码只略微做了一点改动,改为发送10条消息。
消费者代码不做改动,但是需要有两个消费者:

先启动两个消费者程序,然后启动生产者程序。
在消费者1的控制台,打印内容如下:
txt
[consumer 1] Received '[1] Hello World!'
[consumer 1] Received '[3] Hello World!'
[consumer 1] Received '[5] Hello World!'
[consumer 1] Received '[7] Hello World!'
[consumer 1] Received '[9] Hello World!'在消费者2的控制台,打印内容如下:
txt
[consumer 2] Received '[2] Hello World!'
[consumer 2] Received '[4] Hello World!'
[consumer 2] Received '[6] Hello World!'
[consumer 2] Received '[8] Hello World!'
[consumer 2] Received '[10] Hello World!'可以发现,同一个队列中的一条消息只能被一个消费者消费
4. Publish/Subscribe
在发布订阅案例中,引入了交换机(Exchange,图中的X),生产者首先将消息发送给交换机,然后交换机将消息广播到其关联的队列中,消费者便可以收到消息。

在这种模式下,一条消息可以被多个消费者接受。
生产者代码,创建一个交换机和两个队列,并将这两个队列都绑定到交换机上:
java
public class Producer {
// 队列名
private final static String QUEUE_NAME_1 = "test_queue_1";
private final static String QUEUE_NAME_2 = "test_queue_2";
// 交换机名称
private static final String EXCHANGE_NAME = "exchange_test";
public static void main(String[] argv) throws Exception {
// 1. 创建连接工程,设置消息队列地址
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
// 2. 获取连接、通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 3. 创建队列【非必要】
channel.queueDeclare(QUEUE_NAME_1, false, false, false, null);
channel.queueDeclare(QUEUE_NAME_2, false, false, false, null);
// 4. 创建交换机【非必要】
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 5. 绑定交换机和队列【非必要】
// 第一个参数:队列名称
// 第二个参数:交换机名称
// 第三个参数:路由键,如果交换机类型为FANOUT,那么路由键为空
channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "");
channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "");
// 6. 发送消息
for (int i = 0; i < 5; i++) {
String message = (i+1) + ": Hello World! exchange";
// 第一个参数:指定交换机名称
// 第二个参数:路由键,为空
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
}
}
}
}启动两个消费者程序,对于消费者来说,只需要监听队列即可,所以改变监听的队列名称:
java
public class Consumer1 {
private final static String QUEUE_NAME = "test_queue_1";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 当收到消息后,业务逻辑
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [consumer 1] Received '" + message + "'");
};
// 开始监听,接收消息
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}5. Routing
Routing在Publish/Subscribe基础上,增加了队列的选择性,即一条消息到达交换机时,根据精准路由,将消息发送到指定的队列,而不是发送到全部队列:

如上图,当一条消息到达交换机时,如果该消息的路由是info,则交换机会将该消息发送到下面的队列,不会发送到上面的队列;如果该消息的路由是error,则会将该消息发送到两个队列。
生产者代码:
java
public class Producer {
// 队列名
private final static String QUEUE_NAME_1 = "route_queue_1";
private final static String QUEUE_NAME_2 = "route_queue_2";
// 交换机名称
private static final String EXCHANGE_NAME = "route_test";
public static void main(String[] argv) throws Exception {
// 1. 创建连接工程,设置消息队列地址
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
// 2. 获取连接、通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 3. 创建队列【非必要】
channel.queueDeclare(QUEUE_NAME_1, false, false, false, null);
channel.queueDeclare(QUEUE_NAME_2, false, false, false, null);
// 4. 创建交换机【非必要】
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 5. 绑定交换机和队列【非必要】
// 第一个参数:队列名称
// 第二个参数:交换机名称
// 第三个参数:路由键
channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "warn");
channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "error");
// 6. 发送消息
String[] level = new String[]{"error", "warn", "info"};
Random random = new Random();
for (int i = 0; i < 5; i++) {
// 随机获取路由键
String routeKey = level[random.nextInt(0, 3)];
String message = String.format("%d 级别:%s 内容:xxxx",(i+1), routeKey);
// 第一个参数:指定交换机名称
// 第二个参数:路由键
channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes());
}
}
}
}TIP
注意,如果是路由交换机,则类型应该为DIRECT
消费者代码只需要改变监听的队列即可。启动两个消费者,监听两个队列。
消费者1接受route_queue_1队列中的消息,结果如下:
txt
[consumer 1] Received '1 级别:error 内容:xxxx'消费者2接受route_queue_2队列中的消息,结果如下:
txt
[consumer 2] Received '1 级别:error 内容:xxxx'
[consumer 2] Received '2 级别:warn 内容:xxxx'
[consumer 2] Received '3 级别:warn 内容:xxxx'
[consumer 2] Received '4 级别:warn 内容:xxxx'
[consumer 2] Received '5 级别:info 内容:xxxx'可以看到,消息被路由到了不用的队列中。
6. Topics
Topics 是在Routing基础上,针对路由规则增加了通配符匹配。
Topic 交换机的路由规则基于通配符,允许绑定键进行灵活的匹配。以下是两个核心通配符:
*(星号): 匹配路由键中一个单词(由点分隔)。#(井号): 匹配路由键中零个或多个单词。

生产者代码:
java
public class Producer {
// 队列名
private final static String QUEUE_NAME_1 = "topic_queue_1";
private final static String QUEUE_NAME_2 = "topic_queue_2";
// 交换机名称
private static final String EXCHANGE_NAME = "topic_test";
public static void main(String[] argv) throws Exception {
// 1. 创建连接工程,设置消息队列地址
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
// 2. 获取连接、通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 3. 创建队列【非必要】
channel.queueDeclare(QUEUE_NAME_1, false, false, false, null);
channel.queueDeclare(QUEUE_NAME_2, false, false, false, null);
// 4. 创建交换机【非必要】
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 5. 绑定交换机和队列【非必要】
// 第一个参数:队列名称
// 第二个参数:交换机名称
// 第三个参数:路由键
channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "*.orange.*");
channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "lazy.#");
// 6. 发送消息
String[] data = new String[]{
"*.orange.*", "*.*.rabbit", "lazy.#", "good.orange", "good.orange.good",
"a.white.rabbit", "a.rabbit", "laz", "lazy", "lazy.", "lazy.pig"
};
for (int i = 0; i < data.length; i++) {
// 第一个参数:指定交换机名称
// 第二个参数:路由键
channel.basicPublish(EXCHANGE_NAME, data[i], null, data[i].getBytes());
}
}
}
}TIP
如果路由规则要使用通配符,则创建交换机时,类型选择TOPIC
消费者代码只需要调整队列名称。
消费者1监听topic_queue_1队列,结果如下:
txt
[consumer 1] Received '*.orange.*'
[consumer 1] Received 'good.orange.good'消费者2监听topic_queue_2队列,结果如下:
txt
[consumer 2] Received '*.*.rabbit'
[consumer 2] Received 'lazy.#'
[consumer 2] Received 'a.white.rabbit'
[consumer 2] Received 'lazy'
[consumer 2] Received 'lazy.'
[consumer 2] Received 'lazy.pig'可以看到,如果某条消息的路由键没有匹配到队列,那么该条消息就会丢失。