Skip to content

Redis 事务、管道和发布订阅

本文介绍Redis中的事务、管道和发布订阅机制。

1. 事务

1.1 简介

在Redis中,事务是指一组命令串行化执行,不允许其他客户端的命令插入在这组命令中执行。

有关事务的基础操作命令如下:

  • multi:开启事务;
  • exec:开始执行事务中的命令;
  • discard:放弃执行事务;

1.2 案例演示

下面以几个案例演示事务的特性。

1.2.1 成功提交事务

在下面的例子中,我们可以看到:

  • 使用multi开启事务后,在事务中的命令返回结果是QUEUED,表示命令被放入到队列中;
  • 使用exec开始执行事务中的命令,会以列表的形式返回每个命令的响应;

image-20250609111412390

1.2.2 放弃执行事务

当我们使用discard放弃执行事务时,返回是OK,并没有执行事务中的指令。

image-20250609111649208

1.2.3 命令入队出错

如果在调用exec之前,由于语法错误等原因,造成命令入队失败,则调用exec后,并不会成功执行事务中的指令:

image-20250609111926939

1.2.4 命令执行出错

如果命令入队成功,但是调用exec后执行时失败,并不会造成事务中的其他命令回滚或失败,即Redis事务不保证原子性:

Redis does not support rollbacks of transactions since supporting rollbacks would have a significant impact on the simplicity and performance of Redis.

为了Redis的简单和性能,并不支持回滚操作。

image-20250609112419051

1.2.5 事务不可嵌套

在Redis中,不能在一个事务中开启另一个事务:

image-20250609112826254

1.2.6 事务的隔离性

如下,Redis的事务隔离级别可以理解为读已提交:

image-20250609115121868

1.3 小结

综合以上的案例,我们可以理解为Redis为每个客户端连接维护一个独立的命令队列。

  • 当开启事务后,事务中的命令就会依次进入队列;
  • 如果命令入队失败,则有标志(假设 trxValidflagfalse)表示该事务已失效;
  • 调用exec
    • 先检查trxValidflag,如果为false,则清空队列,返回失败;
    • 如果为true,则从队列中取出命令执行,其中某个命令失败不影响其他命令执行;
  • 调用discard:直接清空命令队列;

Redis的事务不保证原子性、一致性。

持久性:当启用AOF时,Redis会确保使用一个单独的 write(2) 系统调用将事务写入磁盘。然而,如果 Redis 服务器崩溃或被系统管理员以某种粗暴方式(如 kill -9)终止,那么 AOF 文件中可能只会记录部分操作。Redis 在重启时会检测到这种不完整的情况,并会报错退出。此时,可以使用 redis-check-aof 工具来修复这个 AOF 文件,它会移除不完整的操作,从而让服务器能够重新启动。

隔离性:Redis的事务隔离级别可以理解为读已提交,只有事务执行后,命令才生效,其他客户端才能读到命令结果。

1.4 watch命令

watch命令可以让事务执行exec命令有条件地执行:如果被监视的键(key)在事务提交前被修改了,那么事务不会执行。这里的修改包括客户端执行写操作,也包括Redis执行键过期操作。

watch命令提供了乐观锁,另一种形式的CAS。

例如:

image-20250609134215635

可以看到,exec返回NULL,表示事务没有执行。

2. 管道

2.1 问题背景

Redis使用基于TCP的CS架构,也就是说客户端发送一条命令到服务端,服务端处理后返回结果给客户端:

可以看到,即便Redis服务端处理一条命令很快,但是客户端也不得不等待命令发送到服务端以及结果从服务端返回客户端,这取决于网络延迟,称为Round Trip Time,RTT。假设RTT时间为250毫秒,服务端处理命令时间为5毫秒,那么客户端每秒可执行的命令数也就只有4条。

假设Redis有机制可以一次性发送多条命令到服务端,然后服务端批处理再返回结果到客户端,就可以减少Round Trip次数了,从而提高命令执行效率,这就是管道。例如,RTT时间仍然为250毫秒,那么1秒内,服务端可以处理命令的时间为750毫秒,每条命令处理时间5毫秒,即客户端可以一次性发送150条命令,也就是说客户端每秒可执行的命令数提高到150条。

2.2 管道演示

管道需根据使用的客户端进行使用,这里以Java中的Jedis为例:

java
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;

import java.util.List;

public class JedisPipelineExample {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379); // 连接 Redis 服务器

        try {
            // 1. 获取管道对象
            Pipeline pipeline = jedis.pipelined();

            // 2. 将命令放入管道 (此时命令不会立即执行,而是排队)
            Response<String> setResponse = pipeline.set("pipeline_key1", "value1");
            Response<String> setResponse2 = pipeline.set("pipeline_key2", "value2");
            Response<Long> incrResponse = pipeline.incr("mycounter");
            Response<String> getResponse = pipeline.get("pipeline_key1");
            Response<Long> delResponse = pipeline.del("pipeline_key2");

            // 3. 执行管道中的所有命令,并获取所有结果
            // sync() 方法会阻塞直到所有命令执行完毕并返回结果
            List<Object> results = pipeline.syncAndReturnAll();

            // 4. 解析结果 (通过 Response 对象或直接从 List 中获取)
            System.out.println("SET pipeline_key1 result: " + setResponse.get());
            System.out.println("SET pipeline_key2 result: " + setResponse2.get());
            System.out.println("INCR mycounter result: " + incrResponse.get());
            System.out.println("GET pipeline_key1 result: " + getResponse.get());
            System.out.println("DEL pipeline_key2 result: " + delResponse.get());

            // 也可以通过 results 列表按顺序获取
            System.out.println("\nResults from syncAndReturnAll:");
            System.out.println("SET pipeline_key1 result from list: " + results.get(0));
            System.out.println("SET pipeline_key2 result from list: " + results.get(1));
            System.out.println("INCR mycounter result from list: " + results.get(2));
            System.out.println("GET pipeline_key1 result from list: " + results.get(3));
            System.out.println("DEL pipeline_key2 result from list: " + results.get(4));

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            jedis.close(); // 关闭连接
        }
    }
}

3. 发布订阅

3.1 简介

发布订阅用于进程间通信机制,即客户端A可以订阅一个或多个频道,客户端B可以向某个频道发消息,那么订阅这个频道的客户端就会接收到该消息。

有关发布订阅的命令如下:

  • SUBSCRIBE channel [channel ...]:订阅频道;
  • UNSUBSCRIBE [channel [channel ...]]:取消订阅指定的频道,如果没有频道指定,那么取消订阅所有的频道;
  • PUBLISH channel message:向指定的频道发送消息,返回值表示接到消息的订阅者数量;
  • PSUBSCRIBE pattern [pattern ...]:订阅一个或多个符合给定模式的频道。模式支持通配符 *(匹配任意零个或多个字符)和 ?(匹配任意一个字符);
  • PUNSUBSCRIBE [pattern [pattern ...]]:取消订阅一个或多个符合给定模式的频道。如果没有指定模式,则取消订阅所有之前通过 PSUBSCRIBE 订阅的模式;
  • PUBSUB CHANNELS [pattern]:列出当前所有活跃的频道(即至少有一个客户端订阅的频道)。可以根据可选的 pattern 进行过滤;
  • PUBSUB NUMSUB [channel [channel ...]]:返回给定频道名称的当前订阅者数量;

3.2 案例演示

在Navicat中,有专门的Pub/Sub标签页:

image-20250609143147653

点击订阅即可创建频道并订阅:

image-20250609143248667

在命令行中发布消息:

image-20250609143341165

返回值表示接受到消息的订阅者数量,可以看到c3频道没有客户端订阅,所以返回值为0。

回到订阅者界面,可以发现正常接收到消息:

image-20250609143435185

3.3 注意事项

无持久化: Redis 的 Pub/Sub 消息是即时性的,不进行持久化。如果订阅者在消息发布时没有在线或没有订阅该频道,那么它将错过该消息。

客户端模式: 当客户端通过 SUBSCRIBEPSUBSCRIBE 进入订阅模式后,它就变成了一个专门的订阅连接。除非使用 UNSUBSCRIBEPUNSUBSCRIBE 退出订阅模式,否则该连接不能再用于执行常规的 Redis 命令(除了少数几个命令)。

性能: Redis 的 Pub/Sub 性能非常高,可以处理大量的消息吞吐。

适用场景: 实时聊天、通知系统、事件驱动架构、消息广播等。不适用于需要消息持久化、消息确认或复杂路由的场景(这些场景更适合使用 RabbitMQ、Kafka 等专业消息队列)。

Redis的Pub/Sub了解即可。

参考资料

[1] 事务:https://redis.io/docs/latest/develop/interact/transactions/

[2] 管道:https://redis.io/docs/latest/develop/use/pipelining/

[3] 发布订阅:https://redis.io/docs/latest/develop/interact/pubsub/

[4] https://redis.io/blog/get-redis-cli-without-installing-redis-server/

[5] 尚硅谷视频教程:https://www.bilibili.com/video/BV13R4y1v7sP

附录

单独安装Redis-cli

使用node.js和npm安装Redis-cli:

bash
npm install -g redis-cli

使用:

bash
rdcli -h your.redis.host -a yourredispassword -p port

但是,这个库很久没更新了,执行multi有问题,可以执行一些简单命令。