Skip to content

RabbitMQ 使用案例

本文介绍RabbitMQ的5种使用方式。

1. 引入依赖

本文只讲解RabbitMQ单独使用的例子,所以引入下面依赖:

xml
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.25.0</version>
</dependency>

在讲解下面的案例之前,先明确以下观点:

  • 生产者可以把消息放进交换机(exchange),也可以把消息直接放进队列(queue);需要注意的是,交换机不存放消息,放进交换机的消息最终会路由到队列;
  • 消费者消费消息,是由消息队列将消息推送过来的,而不是消费者去拉取消息;

2. Hello World

最简单的模型就是一个生产者往队列中发送消息,一个消费者消费队列的消息:

image-20250620163101209

  • P:生产者
  • C:消费者
  • hello:消息,黄色方框标识队列

生产者代码:

java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    // 队列名
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        // 1. 创建连接工程,设置消息队列地址
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        // 2. 获取连接、通道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 3. 创建队列【非必要】
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 4. 发送消息
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消费者代码:

java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Consumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 当收到消息后,业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        // 开始监听,接收消息
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

    }
}

运行消费者代码,会发现在控制台成功打印出生产者产生的消息,并且消费者程序不会退出,持续运行接收消息。

3. Work Queues

Work Queues案例是指多个消费者监听同一个队列,他们是竞争对手,即一条消息只能被一个消费者消费:

image-20250620164448774

生产者代码:

java
public class Producer {
    // 队列名
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        // 1. 创建连接工程,设置消息队列地址
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        // 2. 获取连接、通道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 3. 创建队列【非必要】
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 4. 发送消息
            for (int i = 0; i < 10; i++) {
                String message = "["+ (i+1) +"] Hello World!";
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            }
        }
    }
}

生产者代码只略微做了一点改动,改为发送10条消息。

消费者代码不做改动,但是需要有两个消费者:

image-20250620165009529

先启动两个消费者程序,然后启动生产者程序。

在消费者1的控制台,打印内容如下:

txt
 [consumer 1] Received '[1] Hello World!'
 [consumer 1] Received '[3] Hello World!'
 [consumer 1] Received '[5] Hello World!'
 [consumer 1] Received '[7] Hello World!'
 [consumer 1] Received '[9] Hello World!'

在消费者2的控制台,打印内容如下:

txt
 [consumer 2] Received '[2] Hello World!'
 [consumer 2] Received '[4] Hello World!'
 [consumer 2] Received '[6] Hello World!'
 [consumer 2] Received '[8] Hello World!'
 [consumer 2] Received '[10] Hello World!'

可以发现,同一个队列中的一条消息只能被一个消费者消费

4. Publish/Subscribe

在发布订阅案例中,引入了交换机(Exchange,图中的X),生产者首先将消息发送给交换机,然后交换机将消息广播到其关联的队列中,消费者便可以收到消息。

image-20250620165342063

在这种模式下,一条消息可以被多个消费者接受。

生产者代码,创建一个交换机和两个队列,并将这两个队列都绑定到交换机上:

java
public class Producer {
    // 队列名
    private final static String QUEUE_NAME_1 = "test_queue_1";
    private final static String QUEUE_NAME_2 = "test_queue_2";
    // 交换机名称
    private static final String EXCHANGE_NAME = "exchange_test";

    public static void main(String[] argv) throws Exception {
        // 1. 创建连接工程,设置消息队列地址
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        // 2. 获取连接、通道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 3. 创建队列【非必要】
            channel.queueDeclare(QUEUE_NAME_1, false, false, false, null);
            channel.queueDeclare(QUEUE_NAME_2, false, false, false, null);
            // 4. 创建交换机【非必要】
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            // 5. 绑定交换机和队列【非必要】
            //     第一个参数:队列名称
            //     第二个参数:交换机名称
            //     第三个参数:路由键,如果交换机类型为FANOUT,那么路由键为空
            channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "");
            channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "");

            // 6. 发送消息
            for (int i = 0; i < 5; i++) {
                String message = (i+1) + ": Hello World! exchange";
                // 第一个参数:指定交换机名称
                // 第二个参数:路由键,为空
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

            }

        }
    }
}

启动两个消费者程序,对于消费者来说,只需要监听队列即可,所以改变监听的队列名称:

java
public class Consumer1 {
    private final static String QUEUE_NAME = "test_queue_1";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 当收到消息后,业务逻辑
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [consumer 1] Received '" + message + "'");
        };
        // 开始监听,接收消息
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

    }
}

5. Routing

Routing在Publish/Subscribe基础上,增加了队列的选择性,即一条消息到达交换机时,根据精准路由,将消息发送到指定的队列,而不是发送到全部队列:

image-20250620184948010

如上图,当一条消息到达交换机时,如果该消息的路由是info,则交换机会将该消息发送到下面的队列,不会发送到上面的队列;如果该消息的路由是error,则会将该消息发送到两个队列。

生产者代码:

java
public class Producer {
    // 队列名
    private final static String QUEUE_NAME_1 = "route_queue_1";
    private final static String QUEUE_NAME_2 = "route_queue_2";
    // 交换机名称
    private static final String EXCHANGE_NAME = "route_test";

    public static void main(String[] argv) throws Exception {
        // 1. 创建连接工程,设置消息队列地址
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        // 2. 获取连接、通道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 3. 创建队列【非必要】
            channel.queueDeclare(QUEUE_NAME_1, false, false, false, null);
            channel.queueDeclare(QUEUE_NAME_2, false, false, false, null);
            // 4. 创建交换机【非必要】
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // 5. 绑定交换机和队列【非必要】
            //     第一个参数:队列名称
            //     第二个参数:交换机名称
            //     第三个参数:路由键
            channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "error");
            channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "info");
            channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "warn");
            channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "error");

            // 6. 发送消息
            String[] level = new String[]{"error", "warn", "info"};
            Random random = new Random();
            for (int i = 0; i < 5; i++) {
                // 随机获取路由键
                String routeKey = level[random.nextInt(0, 3)];
                String message = String.format("%d 级别:%s 内容:xxxx",(i+1), routeKey);
                // 第一个参数:指定交换机名称
                // 第二个参数:路由键
                channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes());

            }
        }
    }
}

TIP

注意,如果是路由交换机,则类型应该为DIRECT

消费者代码只需要改变监听的队列即可。启动两个消费者,监听两个队列。

消费者1接受route_queue_1队列中的消息,结果如下:

txt
 [consumer 1] Received '1 级别:error 内容:xxxx'

消费者2接受route_queue_2队列中的消息,结果如下:

txt
 [consumer 2] Received '1 级别:error 内容:xxxx'
 [consumer 2] Received '2 级别:warn 内容:xxxx'
 [consumer 2] Received '3 级别:warn 内容:xxxx'
 [consumer 2] Received '4 级别:warn 内容:xxxx'
 [consumer 2] Received '5 级别:info 内容:xxxx'

可以看到,消息被路由到了不用的队列中。

6. Topics

Topics 是在Routing基础上,针对路由规则增加了通配符匹配。

Topic 交换机的路由规则基于通配符,允许绑定键进行灵活的匹配。以下是两个核心通配符:

  • * (星号): 匹配路由键中一个单词(由点分隔)。
  • # (井号): 匹配路由键中零个或多个单词。

image-20250620192041814

生产者代码:

java
public class Producer {
    // 队列名
    private final static String QUEUE_NAME_1 = "topic_queue_1";
    private final static String QUEUE_NAME_2 = "topic_queue_2";
    // 交换机名称
    private static final String EXCHANGE_NAME = "topic_test";

    public static void main(String[] argv) throws Exception {
        // 1. 创建连接工程,设置消息队列地址
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        // 2. 获取连接、通道
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 3. 创建队列【非必要】
            channel.queueDeclare(QUEUE_NAME_1, false, false, false, null);
            channel.queueDeclare(QUEUE_NAME_2, false, false, false, null);
            // 4. 创建交换机【非必要】
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            // 5. 绑定交换机和队列【非必要】
            //     第一个参数:队列名称
            //     第二个参数:交换机名称
            //     第三个参数:路由键
            channel.queueBind(QUEUE_NAME_1, EXCHANGE_NAME, "*.orange.*");
            channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "*.*.rabbit");
            channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "lazy.#");

            // 6. 发送消息
            String[] data = new String[]{
                    "*.orange.*", "*.*.rabbit", "lazy.#", "good.orange", "good.orange.good",
                    "a.white.rabbit", "a.rabbit", "laz", "lazy", "lazy.", "lazy.pig"
            };
            for (int i = 0; i < data.length; i++) {
                // 第一个参数:指定交换机名称
                // 第二个参数:路由键
                channel.basicPublish(EXCHANGE_NAME, data[i], null, data[i].getBytes());

            }
        }
    }
}

TIP

如果路由规则要使用通配符,则创建交换机时,类型选择TOPIC

消费者代码只需要调整队列名称。

消费者1监听topic_queue_1队列,结果如下:

txt
 [consumer 1] Received '*.orange.*'
 [consumer 1] Received 'good.orange.good'

消费者2监听topic_queue_2队列,结果如下:

txt
 [consumer 2] Received '*.*.rabbit'
 [consumer 2] Received 'lazy.#'
 [consumer 2] Received 'a.white.rabbit'
 [consumer 2] Received 'lazy'
 [consumer 2] Received 'lazy.'
 [consumer 2] Received 'lazy.pig'

可以看到,如果某条消息的路由键没有匹配到队列,那么该条消息就会丢失。