Skip to content

Redis 分布式锁

本文介绍如何利用Redis实现分布式锁、Lua脚本编程与Redisson的使用。

1. 分布式锁介绍

对于并发场景,我们常常使用锁来同步线程,例如,在Java中的synchronized关键字和Lock实现类。但是,这只是针对于单体应用,如果有多个服务(多个JVM),使用synchronizedLock实现类,只会同步各个JVM里的线程,不能协调多服务(多进程)。

也就是说,不同的服务实例运行在不同的机器上,它们各自拥有独立的内存空间。本地锁只能限制单个进程内的并发,无法跨越进程或机器。所以,我们需要分布式锁,目的是确保在任何给定时间,只有一个进程或线程能够访问共享资源。

分布式锁的实现可以分为如下三类:

  • 基于关系型数据库:利用数据库的唯一索引或悲观锁(SELECT ... FOR UPDATE)来实现,这种方式实现简单,但性能和可用性通常不如其他方式。
  • 基于缓存(如 Redis):这是目前非常流行的方式。利用 Redis 的 SETNX (SET if Not eXists) 命令或更高级的 SET key value NX PX milliseconds 命令来尝试设置一个键。如果设置成功,表示获取到锁;如果失败,则表示锁已经被其他进程持有。为了防止死锁,通常会给锁设置一个过期时间(TTL)。
  • 基于分布式协调服务(如 ZooKeeper 或 etcd):这些服务天生就是为分布式协调而设计的。它们通过创建临时有序节点或使用乐观锁机制来管理分布式锁。这类方案通常更健壮,能处理更复杂的分布式一致性问题,但也相对更复杂,有更高的学习和维护成本。

下面介绍基于Redis的分布式锁实现。

2. 自定义分布式锁(初始版本)

2.1 锁接口定义

一个锁的实现,最基本的就是获取锁和释放锁。所以可以定义接口如下:

java
public interface ILock {
  	// 获取锁
    boolean lock();
		// 释放锁
    void unlock();
}

2.2 锁的实现

一个最基本的锁实现如下:

  • 获取锁:利用 Redis 的 SETNX (SET if Not eXists) 命令,如果设置成功,表示成功获取到锁,如果设置失败,表示有其他进程或线程已持有了锁;
  • 释放锁:直接删除代表锁的key;
java
public class RedisLock implements ILock{
    private static final String LOCK_PREFIX = "lock:";

    private RedisTemplate redisTemplate;
    private String lockName;

    public RedisLock(String lockName, RedisTemplate redisTemplate){
        this.lockName = lockName;
        this.redisTemplate = redisTemplate;
    }

    @Override
    public boolean lock() {
        String value = String.valueOf(Thread.currentThread().threadId());
        // 使用setnx,即不存在锁时(其他进程或线程没有获取锁)才能获取锁
        Boolean lockSuccess = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + lockName, value);
        // 防止NPE
        return Boolean.TRUE.equals(lockSuccess);
    }

    @Override
    public void unlock() {
        redisTemplate.delete(LOCK_PREFIX + lockName);
    }
}

2.3 问题分析

在上面的简单实现中,存在以下问题:

  • 释放锁:任何一个进程或线程都可以直接释放锁,没有判断该锁是不是自己加的,所以在释放锁时,需要判断该锁是不是自己加的,可以给该锁增加一个标志,这个标志是该进程+线程特有的,要标识进程,可以使用UUID,要标识线程,可以使用线程ID。因此,在加锁时,将锁的内容设置成UUID+线程ID,在释放锁时,判断锁的内容是不是与自己的相同,如果是则可以成功释放锁。
  • 如果加锁后,进程崩溃了,那么会导致该锁始终存在,无法得到释放,可以使用Redis的键过期机制,在加锁时,通常会给锁设置一个过期时间,即使进程崩溃了,也能保证锁得到释放。

解决上面两个问题,锁的实现修改如下:

java
public class RedisLock implements ILock{
    private static final String LOCK_PREFIX = "lock:";

    private RedisTemplate redisTemplate;
    private String lockName;
    private static final String UUID = java.util.UUID.randomUUID().toString() + ":";

    public RedisLock(String lockName, RedisTemplate redisTemplate){
        this.lockName = lockName;
        this.redisTemplate = redisTemplate;
    }

    @Override
    public boolean lock(long timeout, TimeUnit timeUnit) {
        String value = UUID + Thread.currentThread().threadId();
        // 使用setnx,即不存在锁时(其他进程或线程没有获取锁)才能获取锁
        Boolean lockSuccess = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + lockName, value, timeout, timeUnit);
        // 防止NPE
        return Boolean.TRUE.equals(lockSuccess);
    }

    @Override
    public void unlock() {
        String value = UUID + Thread.currentThread().threadId();
        Object id = redisTemplate.opsForValue().get(LOCK_PREFIX + lockName);
        if(id != null && value.equals(id.toString())){
            redisTemplate.delete(LOCK_PREFIX + lockName);
        }

        throw new RuntimeException("未持有该锁");
    }
}
java
public interface ILock {
    boolean lock(long timeout, TimeUnit timeUnit);

    void unlock();
}

3. 释放锁的原子性问题

在上面的实现中,我们释放锁分为两步:

  • 获取锁标识,判断是不是自己加的锁;
  • 如果是自己加的锁,释放锁;

这两步操作不是原子的,有可能存在一种极端情况:

  1. 首先线程1成功获取了锁,然后执行业务,开始锁放锁,当线程获取锁标识后,由于GC等原因,造成线程阻塞;
  2. 此后由于Redis键超时自动释放锁;
  3. 线程2此时尝试加锁,由于此时没有锁,所以可以加锁成功,线程2开始执行业务;
  4. 线程1阻塞结束,判断获取到的锁标识是自己加的锁,所以执行解锁操作,所以此时线程1删除了线程2加的锁
  5. 线程3此时来加锁,可以成功加锁;
  6. 最后,线程2和线程3并行执行任务,出现问题;

上述流程如下图所示:

image-20250618123649437

问题出在释放锁是分为两步的,所以我们要将释放锁合并成一个原子操作。

4. Lua脚本

Lua是一种轻量级、高效、可嵌入的脚本语言,可以与Redis一起使用:也就是说,在Lua脚本中,可以调用Redis命令,在Redis中,可以执行Lua脚本。

4.1 在Lua中调用Redis命令

在Lua脚本中,可以直接使用如下函数调用Redis命令:

lua
redis.call(command [,arg...])
  • command:即Redis的命令;
  • arg:即命令所需的参数;

例如:

lua
redis.call('set', 'k1', 'v1')

4.2 在Redis中执行脚本

在Redis中可以使用如下指令执行Lua脚本:

txt
EVAL script numkeys [key [key ...]] [arg [arg ...]]
  • script:脚本内容,即用Lua脚本语言编写的程序;
  • numKeys:脚本内容中涉及键的数量;
  • key [key ...]:脚本内容中用到的键名称,在脚本中以KEYS代表键列表,KEYS[1]代表第一个键;
  • arg [arg ...]:脚本内容中用到的值,在脚本中以ARGV代表值列表,ARGV[1]代表第一个值;

例如:

image-20250618143304010

4.3 Lua脚本的好处

Redis 之所以选择集成 Lua 脚本,主要是为了解决以下几个核心问题:

  1. 原子性: 在 Redis 中,单个命令是原子性的。但当需要执行多个命令组成的复合操作时,如果不加以处理,这些命令在执行过程中可能会被其他客户端的请求打断,导致数据不一致。 例如,前面提到的分布式锁释放逻辑(判断当前客户端是否持有锁,然后删除),需要这两个步骤作为一个不可分割的整体来执行。 **Lua 脚本在 Redis 中是原子性执行的。**这意味着,当一个 Lua 脚本在 Redis 服务器上运行时,它会作为一个整体被执行,期间不会被其他命令中断,也不会被其他客户端的请求打断。这完美解决了多命令操作的原子性问题。
  2. 减少网络延迟:如果一个操作需要执行多个 Redis 命令,客户端需要多次往返于 Redis 服务器。每次往返都会产生网络延迟。 通过将多个命令封装到一个 Lua 脚本中,客户端只需发送一次请求到 Redis 服务器,服务器就会执行整个脚本,然后返回最终结果。这大大减少了网络往返次数,提高了效率。
  3. 简化操作:对于一些复杂的、需要条件判断或循环的业务逻辑,如果只使用 Redis 的原生命令很难实现。Lua 脚本提供了更强大的编程能力,可以在 Redis 服务器端直接实现这些复杂逻辑,降低了客户端的实现难度。

5. 释放锁(Lua脚本版)

5.1 使用Lua脚本释放锁

由于Lua脚本的原子性,我们可以将释放锁的逻辑放在脚本中,脚本编写如下:

lua
-- KEYS[1] 是锁的键名
-- ARGV[1] 是当前客户端的唯一标识(当初获取锁时设置的值)

if(redis.call("get", KEYS[1]) == ARGV[1]) then
    return redis.call("del", KEYS[1])
else
    return 0
end
java
public class RedisLock implements ILock{
    private static final String LOCK_PREFIX = "lock:";

    private RedisTemplate redisTemplate;
    private String lockName;
    private static final String UUID = java.util.UUID.randomUUID().toString() + ":";

    private static final String UNLOCK_SCRIPT = """
            if(redis.call("get", KEYS[1]) == ARGV[1]) then
                return redis.call("del", KEYS[1])
            else
                return 0
            end
            """;
    private static final DefaultRedisScript<Long> DEFAULT_REDIS_SCRIPT = new DefaultRedisScript();
    static {
        DEFAULT_REDIS_SCRIPT.setScriptText(UNLOCK_SCRIPT);
        DEFAULT_REDIS_SCRIPT.setResultType(Long.class);
    }

    public RedisLock(String lockName, RedisTemplate redisTemplate){
        this.lockName = lockName;
        this.redisTemplate = redisTemplate;
    }

    @Override
    public boolean lock(long timeout, TimeUnit timeUnit) {
        String value = UUID + Thread.currentThread().threadId();
        // 使用setnx,即不存在锁时(其他进程或线程没有获取锁)才能获取锁
        Boolean lockSuccess = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + lockName, value, timeout, timeUnit);
        // 防止NPE
        return Boolean.TRUE.equals(lockSuccess);
    }

    @Override
    public void unlock() {
        String value = UUID + Thread.currentThread().threadId();
        Long executed = (Long) redisTemplate.execute(
                DEFAULT_REDIS_SCRIPT,
                Collections.singletonList(LOCK_PREFIX + lockName),
                value);
        //System.out.println(executed);
    }
}

使用如下代码,可以测试释放锁成功:

java
@SpringBootTest
public class RedisLockTest {

    @Resource
    private RedisTemplate redisTemplate;

    @Test
    void test01() throws InterruptedException {
        RedisLock redisLock = new RedisLock("order", redisTemplate);

        boolean lock = redisLock.lock(30, TimeUnit.SECONDS);
        System.out.println(lock);
        Thread.sleep(10*1000);

        redisLock.unlock();
    }
}

5.2 Redis事务释放锁实现

推荐使用Lua脚本原子性地释放锁,我们也可以使用Redis事务机制+WATCH命令释放锁:

java
@Override
public void unlock() {
    redisTemplate.execute(new SessionCallback() {
        @Override
        public Object execute(RedisOperations operations) throws DataAccessException {
            String key = LOCK_PREFIX + lockName;
            String value = UUID + Thread.currentThread().threadId();

            // 监视锁,防止在释放锁期间被别人修改或过期
            operations.watch(key);

            // 获取锁标识
            Object id = operations.opsForValue().get(key);
            // 是自己的锁
            if(id != null && value.equals(id.toString())){
                // 开启事务释放锁
                operations.multi();
                operations.delete(key);
                operations.exec();
            }

            // 不是自己的锁,取消监视
            operations.unwatch();
            return null;
        }
    });
}

好像没人使用事务+WATCH的机制来释放锁,这里写一个自己的实现,也不知道有没有问题。

6. Redisson

6.1 问题

在上面我们自己实现的Redis分布式锁,存在以下问题:

  • 不可重入:同一个线程,无法多次获取同一把锁;
  • 不可重试:只会尝试一次获取锁,如果失败就不再进行重试;
  • 超时释放:如果业务执行时间超长,超过了锁的过期时间,会导致锁自动释放,从而引起多个线程并行执行;
  • 主从一致性问题:如果主节点意外下线,而从节点还没有同步到锁信息,也就是意外释放了锁,从而引起多个线程并行执行;

如果要自己解决上面问题,是一大挑战。在开发中,我们不会自己手动实现,而是使用成熟库,例如Redisson。

Redisson是一个非常强大且功能丰富的 Java 客户端,用于简化 Redis 在分布式环境中的使用,让开发者能够像使用本地 Java 对象一样操作 Redis,其中就包括了分布式锁。

6.2 引入Redisson

首先引入依赖:

xml
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.50.0</version>
</dependency>

然后创建配置类:

java
@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redisClient(){
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        // 如果是集群
//        config.useClusterServers().addNodeAddress(
//                "redis://127.0.0.1:6379",
//                "redis://127.0.0.1:6380"
//                );

        RedissonClient redisson = Redisson.create(config);
        return redisson;
    }

}

之后,就可以使用依赖注入的方式使用RedissonClient了。

6.3 使用Redisson的分布式锁

使用Redisson提供的分布式锁分为三步:

  • 通过RedissonClient获取锁对象;
  • 加锁;
  • 解锁;

如下:

java
@SpringBootTest
@Slf4j
public class RedissonTest {
    @Resource
    private RedissonClient redissonClient;

    @Test
    public void test() throws InterruptedException {
        String lockName = "lock:product:1";
        // 获取锁
        RLock lock = redissonClient.getLock(lockName);

        // 加锁
        lock.lock();
        // 模拟执行业务代码
        Thread.sleep(100*1000);
        // 解锁
        lock.unlock();
    }
}

在加锁期间查看Redis数据库:

image-20250618165028842

可以看到,Redisson加的锁类型是Hash。

再反复查看过期时间:

image-20250618165518157

可以发现,Redisson加的锁,会定期刷新过期时间。

6.4 Redisson分布式锁特性

6.4.1 可重入性

Redisson分布式锁是可重入的。示例代码:

java
@Test
public void test() throws InterruptedException {
    String lockName = "lock:product:1";
    // 获取锁
    RLock lock = redissonClient.getLock(lockName);

    // 加锁
    lock.lock();

    try{
        log.info("第一次获取锁成功");

        lock.lock();
        try{
            log.info("第二次获取锁成功");
            Thread.sleep(100*1000);
        }finally {
            lock.unlock();
        }

    }finally {
        // 解锁
        lock.unlock();
    }
}

控制台成功打印出以下内容:

txt
第一次获取锁成功
第二次获取锁成功

说明Redisson分布式锁是可重入的。

在加锁期间,查看Redis中的锁:

image-20250618165940657

6.4.2 超时重试

在我们实现的分布式锁中,如果第一次获取不到锁,则直接返回,并没有陷入阻塞状态,而Redisson实现的分布式锁中,如果未获取到锁,则会

java
@Test
public void test03(){
    String lockName = "lock:product:1";
    // 获取锁
    RLock lock = redissonClient.getLock(lockName);

    Thread t1 = new Thread(()->{
        lock.lock();
        try{
            log.info("获取到锁");
            Sleeper.sleep(100, TimeUnit.SECONDS);
        }finally {
            lock.unlock();
        }
    },"t1");
    t1.start();

    // 让t1线程先运行
    Sleeper.sleep(1, TimeUnit.SECONDS);

    Thread t2 = new Thread(()->{
        lock.lock();
        try{
            log.info("获取到锁");
        }finally {
            lock.unlock();
        }
    },"t2");
    t2.start();

    // 防止整个程序结束
    Sleeper.sleep(100000, TimeUnit.SECONDS);
}

可以看到,线程t2并没有立即打印出获取到锁内容,说明线程t2第一时间没有获取到锁,陷入了阻塞状态。

6.4.3 超时释放

在之前的案例中,我们已经发现Redisson的锁过期时间,会不断刷新,从21秒到30秒,即线程没有结束,锁就不会过期:

image-20250618165518157

6.4.4 主从不一致

针对于单节点锁容易崩溃丢失的问题,Redis的作者提出了RedLock算法:https://redis.io/docs/latest/develop/use/patterns/distributed-locks/#the-redlock-algorithm,在Redisson中也有RedLock算法的实现。

准备三台独立的Redis主机:

java
@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redisClient(){
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        RedissonClient redisson = Redisson.create(config);
        return redisson;
    }

    @Bean
    public RedissonClient redisClient2(){
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6380");
        RedissonClient redisson = Redisson.create(config);
        return redisson;
    }

    @Bean
    public RedissonClient redisClient3(){
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6381");
        RedissonClient redisson = Redisson.create(config);
        return redisson;
    }

}
java
@SpringBootTest
@Slf4j
public class RedissonTest {
    @Resource
    private RedissonClient redissonClient;
    @Resource
    private RedissonClient redissonClient2;
    @Resource
    private RedissonClient redissonClient3;


    @Test
    public void test10(){
        String lockName = "lock";
      
        RLock lock1 = redissonClient.getLock(lockName);
        RLock lock2 = redissonClient2.getLock(lockName);
        RLock lock3 = redissonClient3.getLock(lockName);

        RLock redLock = redissonClient.getRedLock(lock1, lock2, lock3);

        redLock.lock();
        try{
            // 业务代码
        }finally {
            redLock.unlock();
        }
    }
}

6.5 Redisson原理

6.5.1 可重入性原理

Redisson分布式锁可重入的关键在于存储锁标识的结构变为了Hash,如下:

image-20250619125808955

整体流程如下:

image-20250619125938647

在代码中,主要体现在Lua脚本:

java
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, command,
            "if ((redis.call('exists', KEYS[1]) == 0) " +
                        "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
                "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
java
protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {
    return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
    			"local val = redis.call('get', KEYS[3]); " +
          "if val ~= false then " +
              "return tonumber(val);" +
          "end; " +

          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
              "return nil;" +
          "end; " +
          "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
          "if (counter > 0) then " +
              "redis.call('pexpire', KEYS[1], ARGV[2]); " +
              "redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +
              "return 0; " +
          "else " +
              "redis.call('del', KEYS[1]); " +
              "redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
              "redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +
              "return 1; " +
          "end; ",
                            Arrays.asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),
                            LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime,
                            getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);
}

6.5.2 超时重试

Redisson的超时重试机制,主要是通过Redis的发布订阅机制和JUC的信号量。

  • 首先,在加锁时,如果加锁失败,会订阅Redis频道,然后使用信号量阻塞线程;

  • 当其他线程解锁后,会向指定频道发送解锁消息;

  • 订阅频道的线程接收到解锁消息后,释放信号量

    java
    // 当接收到频道消息后
    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
      	// 解锁消息
        if (message.equals(UNLOCK_MESSAGE)) {
            value.tryRunListener();
          	// value.getLatch()就是信号量,释放信号量
            value.getLatch().release();
        } else if (message.equals(READ_UNLOCK_MESSAGE)) {
            value.tryRunAllListeners();
    
            value.getLatch().release(value.getLatch().getQueueLength());
        }
    }
  • 在加锁过程中由于信号量阻塞的线程,此时可以继续执行,争抢锁:

    java
    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
      	// 第一次尝试获取锁
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // 获取锁成功
        if (ttl == null) {
            return;
        }
    
      	// 获取锁失败,订阅频道
        CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
        pubSub.timeout(future);
        RedissonLockEntry entry;
        if (interruptibly) {
            entry = commandExecutor.getInterrupted(future);
        } else {
            entry = commandExecutor.get(future);
        }
    
        try {
            while (true) {
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // 获取锁成功
                if (ttl == null) {
                    break;
                }
    
                // 等待解锁消息
              	// entry.getLatch() 就是信号量,
              	// 如果其他线程没解锁,tryAcquire()和acquire()都会阻塞该线程
                if (ttl >= 0) {
                    try {
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        entry.getLatch().acquire();
                    } else {
                        entry.getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            unsubscribe(entry, threadId);
        }
    }

6.5.3 超时释放

针对于业务执行时间过长,导致锁过期的问题,Redisson提出了WatchDog机制,即在业务执行过程中,自动刷新锁过期时间。

注意,这项机制只有在自己没有指定锁过期时间才会生效:

java
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime > 0) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        // internalLockLeaseTime 默认30秒
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
    ttlRemainingFuture = new CompletableFutureWrapper<>(s);

    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        if (ttlRemaining == null) {
            if (leaseTime > 0) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
              	// 过期时间自动刷新
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

继续跟踪scheduleExpirationRenewal()方法,找到如下方法:

java
final void add(String rawName, String lockName, long threadId, LockEntry entry) {
    addSlotName(rawName);

    LockEntry oldEntry = name2entry.putIfAbsent(rawName, entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId, lockName);
    } else {
        if (tryRun()) {
            schedule();
        }
    }
}
java
public void schedule() {
    if (!running.get()) {
        return;
    }

    long internalLockLeaseTime = executor.getServiceManager().getCfg().getLockWatchdogTimeout();
    executor.getServiceManager().newTimeout(this, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
}

可以看到,过了三分之一的internalLockLeaseTime时间,就会自动执行this任务(这里的this是指RenewalTask类对象,其中有run()方法)

java
@Override
public void run(Timeout timeout) {
    if (executor.getServiceManager().isShuttingDown()) {
        return;
    }

    CompletionStage<Void> future = execute();
    future.whenComplete((result, e) -> {
        if (e != null) {
            log.error("Can't update locks {} expiration", name2entry.keySet(), e);
            schedule();
            return;
        }

        schedule();
    });
}
java
final CompletionStage<Void> execute() {
    if (name2entry.isEmpty()) {
        return CompletableFuture.completedFuture(null);
    }

    if (!executor.getServiceManager().getCfg().isClusterConfig()) {
        return renew(name2entry.keySet().iterator(), chunkSize);
    }

    return renewSlots(slot2names.values().iterator(), chunkSize);
}

由于不是集群,我们查看renew()方法,这是一个抽象方法,我们找到LockTask实现:

java
CompletionStage<Void> renew(Iterator<String> iter, int chunkSize) {
      if (!iter.hasNext()) {
          return CompletableFuture.completedFuture(null);
      }

      Map<String, Long> name2threadId = new HashMap<>(chunkSize);
      List<Object> args = new ArrayList<>(chunkSize + 1);
      args.add(internalLockLeaseTime);

      List<String> keys = new ArrayList<>(chunkSize);
      while (iter.hasNext()) {
          String key = iter.next();

          LockEntry entry = name2entry.get(key);
          if (entry == null) {
              continue;
          }
          Long threadId = entry.getFirstThreadId();
          if (threadId == null) {
              continue;
          }

          keys.add(key);
          args.add(entry.getLockName(threadId));
          name2threadId.put(key, threadId);

          if (keys.size() == chunkSize) {
              break;
          }
      }

      if (keys.isEmpty()) {
          return CompletableFuture.completedFuture(null);
      }

      String firstName = keys.get(0);

      CompletionStage<List<String>> f = executor.syncedEval(firstName, LongCodec.INSTANCE,
              new RedisCommand<>("EVAL", new ContainsDecoder<>(keys)),
            			"local result = {} " +
                  "for i = 1, #KEYS, 1 do " +
                      "if (redis.call('hexists', KEYS[i], ARGV[i + 1]) == 1) then " +
                          "redis.call('pexpire', KEYS[i], ARGV[1]); " +
                          "table.insert(result, 1); " +
                      "else " +
                          "table.insert(result, 0); " +
                      "end; " +
                  "end; " +
                  "return result;",
              new ArrayList<>(keys),
              args.toArray());

      return f.thenCompose(existingNames -> {
          keys.removeAll(existingNames);
          for (String key : keys) {
              cancelExpirationRenewal(key, name2threadId.get(key));
          }
          return renew(iter, chunkSize);
      });
}

其中最主要的就是40-49行的Lua脚本,其中会刷新锁过期时间。

7. RedLock

7.1 RedLock介绍

为了解决单点故障,Redis的作者提出了RedLock算法,内容如下:

  • 首先准备N个独立的Redis节点(N为奇数);
  • 在开始获取RedLock时,先获取开始时间(毫秒值);
  • 然后按顺序获取N个Redis节点锁(使用相同的键值对),在获取锁时,要设置一个较短的超时时间,以防止客户端阻塞在一个下线的Redis节点,从而尽快去获取下一个Redis节点的锁。例如,假设锁过期时间为10秒中,那么获取锁的超时时间应该在5-50毫秒。
  • 如果客户端能在获取锁的超时时间内,获取到N/2+1(即大部分锁),即视为获取锁成功,仍然采用WatchDog机制延长锁的过期时间;
  • 如果不能在获取锁的超时时间内获取到大部分锁,则认为获取锁失败,需要释放已获取的锁;

虽然RedLock算法听起来很好,但是,在实际应用中仍有问题,问题出在WatchDog机制上。

如果JVM有GC导致长时间STW(包括实行WatchDog机制的线程),或者由于网络延迟,那么Redis中的锁会自动过期,而客户端仍然认为自己持有锁,但是此时其他客户端也可以去获取锁。这在单节点锁和RedLock中都有问题。

在RedLock中出问题的情况:

  • 假设现在有5个Redis节点,分别称为A,B,C,D,E;
  • 客户端1先获取锁,在节点A、B、C上设置了锁标识,D和E节点由于网络不可达,没有设置标识;
  • 由于网络波动,或者STW暂停,导致客户端1没有续上C节点的锁过期时间,导致C节点的锁标识过期;
  • 此时,客户端2去获取锁,可以获取到节点C、D、E上的锁,由于获取到大部分锁,客户端2成功获取到锁;
  • 最终,客户端1和客户端2都成功获取到了锁,造成并行执行;

7.2 FencedLock

为了解决RedLock的问题,Martin Kleppmann文章中提出了FencedLock,流程图如下:

Using fencing tokens to make resource access safe

即客户端获取锁的时候,也会得到一个Token(可以理解为版本),当去数据库(Storage)更新数据时,会拒绝掉较小的Token(较老的版本),也就是说旧数据不能覆盖掉新数据。

在Redisson中也有FencedLock的实现:

java
@Test
public void test11(){
    RFencedLock lock = redissonClient.getFencedLock("lock");
    Long token = lock.lockAndGetToken();
    System.out.println(token);
    lock.unlock();

    Sleeper.sleep(1, TimeUnit.SECONDS);

    new Thread(()->{
        Long token1 = lock.lockAndGetToken();
        System.out.println(token1);
        lock.unlock();
    }).start();

    Sleeper.sleep(1000, TimeUnit.SECONDS);

}

运行上述代码,查看Redis数据:

image-20250620121336966

可以发现,不仅有锁标识,还有锁的Token。

参考资料

[1] Redis中执行Lua脚本:https://redis.io/docs/latest/develop/interact/programmability/eval-intro/

[2] RedLock:https://redis.io/docs/latest/develop/use/patterns/distributed-locks/#the-redlock-algorithm

[3] RedLock的反对:https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html