Appearance
Redis 分布式锁
本文介绍如何利用Redis实现分布式锁、Lua脚本编程与Redisson的使用。
1. 分布式锁介绍
对于并发场景,我们常常使用锁来同步线程,例如,在Java中的synchronized关键字和Lock实现类。但是,这只是针对于单体应用,如果有多个服务(多个JVM),使用synchronized或Lock实现类,只会同步各个JVM里的线程,不能协调多服务(多进程)。
也就是说,不同的服务实例运行在不同的机器上,它们各自拥有独立的内存空间。本地锁只能限制单个进程内的并发,无法跨越进程或机器。所以,我们需要分布式锁,目的是确保在任何给定时间,只有一个进程或线程能够访问共享资源。
分布式锁的实现可以分为如下三类:
- 基于关系型数据库:利用数据库的唯一索引或悲观锁(
SELECT ... FOR UPDATE)来实现,这种方式实现简单,但性能和可用性通常不如其他方式。 - 基于缓存(如 Redis):这是目前非常流行的方式。利用 Redis 的
SETNX(SET if Not eXists) 命令或更高级的SET key value NX PX milliseconds命令来尝试设置一个键。如果设置成功,表示获取到锁;如果失败,则表示锁已经被其他进程持有。为了防止死锁,通常会给锁设置一个过期时间(TTL)。 - 基于分布式协调服务(如 ZooKeeper 或 etcd):这些服务天生就是为分布式协调而设计的。它们通过创建临时有序节点或使用乐观锁机制来管理分布式锁。这类方案通常更健壮,能处理更复杂的分布式一致性问题,但也相对更复杂,有更高的学习和维护成本。
下面介绍基于Redis的分布式锁实现。
2. 自定义分布式锁(初始版本)
2.1 锁接口定义
一个锁的实现,最基本的就是获取锁和释放锁。所以可以定义接口如下:
java
public interface ILock {
// 获取锁
boolean lock();
// 释放锁
void unlock();
}2.2 锁的实现
一个最基本的锁实现如下:
- 获取锁:利用 Redis 的
SETNX(SET if Not eXists) 命令,如果设置成功,表示成功获取到锁,如果设置失败,表示有其他进程或线程已持有了锁; - 释放锁:直接删除代表锁的key;
java
public class RedisLock implements ILock{
private static final String LOCK_PREFIX = "lock:";
private RedisTemplate redisTemplate;
private String lockName;
public RedisLock(String lockName, RedisTemplate redisTemplate){
this.lockName = lockName;
this.redisTemplate = redisTemplate;
}
@Override
public boolean lock() {
String value = String.valueOf(Thread.currentThread().threadId());
// 使用setnx,即不存在锁时(其他进程或线程没有获取锁)才能获取锁
Boolean lockSuccess = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + lockName, value);
// 防止NPE
return Boolean.TRUE.equals(lockSuccess);
}
@Override
public void unlock() {
redisTemplate.delete(LOCK_PREFIX + lockName);
}
}2.3 问题分析
在上面的简单实现中,存在以下问题:
- 释放锁:任何一个进程或线程都可以直接释放锁,没有判断该锁是不是自己加的,所以在释放锁时,需要判断该锁是不是自己加的,可以给该锁增加一个标志,这个标志是该进程+线程特有的,要标识进程,可以使用UUID,要标识线程,可以使用线程ID。因此,在加锁时,将锁的内容设置成UUID+线程ID,在释放锁时,判断锁的内容是不是与自己的相同,如果是则可以成功释放锁。
- 如果加锁后,进程崩溃了,那么会导致该锁始终存在,无法得到释放,可以使用Redis的键过期机制,在加锁时,通常会给锁设置一个过期时间,即使进程崩溃了,也能保证锁得到释放。
解决上面两个问题,锁的实现修改如下:
java
public class RedisLock implements ILock{
private static final String LOCK_PREFIX = "lock:";
private RedisTemplate redisTemplate;
private String lockName;
private static final String UUID = java.util.UUID.randomUUID().toString() + ":";
public RedisLock(String lockName, RedisTemplate redisTemplate){
this.lockName = lockName;
this.redisTemplate = redisTemplate;
}
@Override
public boolean lock(long timeout, TimeUnit timeUnit) {
String value = UUID + Thread.currentThread().threadId();
// 使用setnx,即不存在锁时(其他进程或线程没有获取锁)才能获取锁
Boolean lockSuccess = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + lockName, value, timeout, timeUnit);
// 防止NPE
return Boolean.TRUE.equals(lockSuccess);
}
@Override
public void unlock() {
String value = UUID + Thread.currentThread().threadId();
Object id = redisTemplate.opsForValue().get(LOCK_PREFIX + lockName);
if(id != null && value.equals(id.toString())){
redisTemplate.delete(LOCK_PREFIX + lockName);
}
throw new RuntimeException("未持有该锁");
}
}java
public interface ILock {
boolean lock(long timeout, TimeUnit timeUnit);
void unlock();
}3. 释放锁的原子性问题
在上面的实现中,我们释放锁分为两步:
- 获取锁标识,判断是不是自己加的锁;
- 如果是自己加的锁,释放锁;
这两步操作不是原子的,有可能存在一种极端情况:
- 首先线程1成功获取了锁,然后执行业务,开始锁放锁,当线程获取锁标识后,由于GC等原因,造成线程阻塞;
- 此后由于Redis键超时自动释放锁;
- 线程2此时尝试加锁,由于此时没有锁,所以可以加锁成功,线程2开始执行业务;
- 线程1阻塞结束,判断获取到的锁标识是自己加的锁,所以执行解锁操作,所以此时线程1删除了线程2加的锁;
- 线程3此时来加锁,可以成功加锁;
- 最后,线程2和线程3并行执行任务,出现问题;
上述流程如下图所示:

问题出在释放锁是分为两步的,所以我们要将释放锁合并成一个原子操作。
4. Lua脚本
Lua是一种轻量级、高效、可嵌入的脚本语言,可以与Redis一起使用:也就是说,在Lua脚本中,可以调用Redis命令,在Redis中,可以执行Lua脚本。
4.1 在Lua中调用Redis命令
在Lua脚本中,可以直接使用如下函数调用Redis命令:
lua
redis.call(command [,arg...])command:即Redis的命令;arg:即命令所需的参数;
例如:
lua
redis.call('set', 'k1', 'v1')4.2 在Redis中执行脚本
在Redis中可以使用如下指令执行Lua脚本:
txt
EVAL script numkeys [key [key ...]] [arg [arg ...]]script:脚本内容,即用Lua脚本语言编写的程序;numKeys:脚本内容中涉及键的数量;key [key ...]:脚本内容中用到的键名称,在脚本中以KEYS代表键列表,KEYS[1]代表第一个键;arg [arg ...]:脚本内容中用到的值,在脚本中以ARGV代表值列表,ARGV[1]代表第一个值;
例如:

4.3 Lua脚本的好处
Redis 之所以选择集成 Lua 脚本,主要是为了解决以下几个核心问题:
- 原子性: 在 Redis 中,单个命令是原子性的。但当需要执行多个命令组成的复合操作时,如果不加以处理,这些命令在执行过程中可能会被其他客户端的请求打断,导致数据不一致。 例如,前面提到的分布式锁释放逻辑(判断当前客户端是否持有锁,然后删除),需要这两个步骤作为一个不可分割的整体来执行。 **Lua 脚本在 Redis 中是原子性执行的。**这意味着,当一个 Lua 脚本在 Redis 服务器上运行时,它会作为一个整体被执行,期间不会被其他命令中断,也不会被其他客户端的请求打断。这完美解决了多命令操作的原子性问题。
- 减少网络延迟:如果一个操作需要执行多个 Redis 命令,客户端需要多次往返于 Redis 服务器。每次往返都会产生网络延迟。 通过将多个命令封装到一个 Lua 脚本中,客户端只需发送一次请求到 Redis 服务器,服务器就会执行整个脚本,然后返回最终结果。这大大减少了网络往返次数,提高了效率。
- 简化操作:对于一些复杂的、需要条件判断或循环的业务逻辑,如果只使用 Redis 的原生命令很难实现。Lua 脚本提供了更强大的编程能力,可以在 Redis 服务器端直接实现这些复杂逻辑,降低了客户端的实现难度。
5. 释放锁(Lua脚本版)
5.1 使用Lua脚本释放锁
由于Lua脚本的原子性,我们可以将释放锁的逻辑放在脚本中,脚本编写如下:
lua
-- KEYS[1] 是锁的键名
-- ARGV[1] 是当前客户端的唯一标识(当初获取锁时设置的值)
if(redis.call("get", KEYS[1]) == ARGV[1]) then
return redis.call("del", KEYS[1])
else
return 0
endjava
public class RedisLock implements ILock{
private static final String LOCK_PREFIX = "lock:";
private RedisTemplate redisTemplate;
private String lockName;
private static final String UUID = java.util.UUID.randomUUID().toString() + ":";
private static final String UNLOCK_SCRIPT = """
if(redis.call("get", KEYS[1]) == ARGV[1]) then
return redis.call("del", KEYS[1])
else
return 0
end
""";
private static final DefaultRedisScript<Long> DEFAULT_REDIS_SCRIPT = new DefaultRedisScript();
static {
DEFAULT_REDIS_SCRIPT.setScriptText(UNLOCK_SCRIPT);
DEFAULT_REDIS_SCRIPT.setResultType(Long.class);
}
public RedisLock(String lockName, RedisTemplate redisTemplate){
this.lockName = lockName;
this.redisTemplate = redisTemplate;
}
@Override
public boolean lock(long timeout, TimeUnit timeUnit) {
String value = UUID + Thread.currentThread().threadId();
// 使用setnx,即不存在锁时(其他进程或线程没有获取锁)才能获取锁
Boolean lockSuccess = redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + lockName, value, timeout, timeUnit);
// 防止NPE
return Boolean.TRUE.equals(lockSuccess);
}
@Override
public void unlock() {
String value = UUID + Thread.currentThread().threadId();
Long executed = (Long) redisTemplate.execute(
DEFAULT_REDIS_SCRIPT,
Collections.singletonList(LOCK_PREFIX + lockName),
value);
//System.out.println(executed);
}
}使用如下代码,可以测试释放锁成功:
java
@SpringBootTest
public class RedisLockTest {
@Resource
private RedisTemplate redisTemplate;
@Test
void test01() throws InterruptedException {
RedisLock redisLock = new RedisLock("order", redisTemplate);
boolean lock = redisLock.lock(30, TimeUnit.SECONDS);
System.out.println(lock);
Thread.sleep(10*1000);
redisLock.unlock();
}
}5.2 Redis事务释放锁实现
推荐使用Lua脚本原子性地释放锁,我们也可以使用Redis事务机制+WATCH命令释放锁:
java
@Override
public void unlock() {
redisTemplate.execute(new SessionCallback() {
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
String key = LOCK_PREFIX + lockName;
String value = UUID + Thread.currentThread().threadId();
// 监视锁,防止在释放锁期间被别人修改或过期
operations.watch(key);
// 获取锁标识
Object id = operations.opsForValue().get(key);
// 是自己的锁
if(id != null && value.equals(id.toString())){
// 开启事务释放锁
operations.multi();
operations.delete(key);
operations.exec();
}
// 不是自己的锁,取消监视
operations.unwatch();
return null;
}
});
}好像没人使用事务+WATCH的机制来释放锁,这里写一个自己的实现,也不知道有没有问题。
6. Redisson
6.1 问题
在上面我们自己实现的Redis分布式锁,存在以下问题:
- 不可重入:同一个线程,无法多次获取同一把锁;
- 不可重试:只会尝试一次获取锁,如果失败就不再进行重试;
- 超时释放:如果业务执行时间超长,超过了锁的过期时间,会导致锁自动释放,从而引起多个线程并行执行;
- 主从一致性问题:如果主节点意外下线,而从节点还没有同步到锁信息,也就是意外释放了锁,从而引起多个线程并行执行;
如果要自己解决上面问题,是一大挑战。在开发中,我们不会自己手动实现,而是使用成熟库,例如Redisson。
Redisson是一个非常强大且功能丰富的 Java 客户端,用于简化 Redis 在分布式环境中的使用,让开发者能够像使用本地 Java 对象一样操作 Redis,其中就包括了分布式锁。
6.2 引入Redisson
首先引入依赖:
xml
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.50.0</version>
</dependency>然后创建配置类:
java
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redisClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
// 如果是集群
// config.useClusterServers().addNodeAddress(
// "redis://127.0.0.1:6379",
// "redis://127.0.0.1:6380"
// );
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}之后,就可以使用依赖注入的方式使用RedissonClient了。
6.3 使用Redisson的分布式锁
使用Redisson提供的分布式锁分为三步:
- 通过
RedissonClient获取锁对象; - 加锁;
- 解锁;
如下:
java
@SpringBootTest
@Slf4j
public class RedissonTest {
@Resource
private RedissonClient redissonClient;
@Test
public void test() throws InterruptedException {
String lockName = "lock:product:1";
// 获取锁
RLock lock = redissonClient.getLock(lockName);
// 加锁
lock.lock();
// 模拟执行业务代码
Thread.sleep(100*1000);
// 解锁
lock.unlock();
}
}在加锁期间查看Redis数据库:

可以看到,Redisson加的锁类型是Hash。
再反复查看过期时间:

可以发现,Redisson加的锁,会定期刷新过期时间。
6.4 Redisson分布式锁特性
6.4.1 可重入性
Redisson分布式锁是可重入的。示例代码:
java
@Test
public void test() throws InterruptedException {
String lockName = "lock:product:1";
// 获取锁
RLock lock = redissonClient.getLock(lockName);
// 加锁
lock.lock();
try{
log.info("第一次获取锁成功");
lock.lock();
try{
log.info("第二次获取锁成功");
Thread.sleep(100*1000);
}finally {
lock.unlock();
}
}finally {
// 解锁
lock.unlock();
}
}控制台成功打印出以下内容:
txt
第一次获取锁成功
第二次获取锁成功说明Redisson分布式锁是可重入的。
在加锁期间,查看Redis中的锁:

6.4.2 超时重试
在我们实现的分布式锁中,如果第一次获取不到锁,则直接返回,并没有陷入阻塞状态,而Redisson实现的分布式锁中,如果未获取到锁,则会
java
@Test
public void test03(){
String lockName = "lock:product:1";
// 获取锁
RLock lock = redissonClient.getLock(lockName);
Thread t1 = new Thread(()->{
lock.lock();
try{
log.info("获取到锁");
Sleeper.sleep(100, TimeUnit.SECONDS);
}finally {
lock.unlock();
}
},"t1");
t1.start();
// 让t1线程先运行
Sleeper.sleep(1, TimeUnit.SECONDS);
Thread t2 = new Thread(()->{
lock.lock();
try{
log.info("获取到锁");
}finally {
lock.unlock();
}
},"t2");
t2.start();
// 防止整个程序结束
Sleeper.sleep(100000, TimeUnit.SECONDS);
}可以看到,线程t2并没有立即打印出获取到锁内容,说明线程t2第一时间没有获取到锁,陷入了阻塞状态。
6.4.3 超时释放
在之前的案例中,我们已经发现Redisson的锁过期时间,会不断刷新,从21秒到30秒,即线程没有结束,锁就不会过期:

6.4.4 主从不一致
针对于单节点锁容易崩溃丢失的问题,Redis的作者提出了RedLock算法:https://redis.io/docs/latest/develop/use/patterns/distributed-locks/#the-redlock-algorithm,在Redisson中也有RedLock算法的实现。
准备三台独立的Redis主机:
java
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redisClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
return redisson;
}
@Bean
public RedissonClient redisClient2(){
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6380");
RedissonClient redisson = Redisson.create(config);
return redisson;
}
@Bean
public RedissonClient redisClient3(){
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6381");
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}java
@SpringBootTest
@Slf4j
public class RedissonTest {
@Resource
private RedissonClient redissonClient;
@Resource
private RedissonClient redissonClient2;
@Resource
private RedissonClient redissonClient3;
@Test
public void test10(){
String lockName = "lock";
RLock lock1 = redissonClient.getLock(lockName);
RLock lock2 = redissonClient2.getLock(lockName);
RLock lock3 = redissonClient3.getLock(lockName);
RLock redLock = redissonClient.getRedLock(lock1, lock2, lock3);
redLock.lock();
try{
// 业务代码
}finally {
redLock.unlock();
}
}
}6.5 Redisson原理
6.5.1 可重入性原理
Redisson分布式锁可重入的关键在于存储锁标识的结构变为了Hash,如下:

整体流程如下:

在代码中,主要体现在Lua脚本:
java
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, command,
"if ((redis.call('exists', KEYS[1]) == 0) " +
"or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}java
protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {
return evalWriteSyncedNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"local val = redis.call('get', KEYS[3]); " +
"if val ~= false then " +
"return tonumber(val);" +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
"redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +
"return 1; " +
"end; ",
Arrays.asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime,
getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);
}6.5.2 超时重试
Redisson的超时重试机制,主要是通过Redis的发布订阅机制和JUC的信号量。
首先,在加锁时,如果加锁失败,会订阅Redis频道,然后使用信号量阻塞线程;
当其他线程解锁后,会向指定频道发送解锁消息;
订阅频道的线程接收到解锁消息后,释放信号量
java// 当接收到频道消息后 @Override protected void onMessage(RedissonLockEntry value, Long message) { // 解锁消息 if (message.equals(UNLOCK_MESSAGE)) { value.tryRunListener(); // value.getLatch()就是信号量,释放信号量 value.getLatch().release(); } else if (message.equals(READ_UNLOCK_MESSAGE)) { value.tryRunAllListeners(); value.getLatch().release(value.getLatch().getQueueLength()); } }在加锁过程中由于信号量阻塞的线程,此时可以继续执行,争抢锁:
javaprivate void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); // 第一次尝试获取锁 Long ttl = tryAcquire(-1, leaseTime, unit, threadId); // 获取锁成功 if (ttl == null) { return; } // 获取锁失败,订阅频道 CompletableFuture<RedissonLockEntry> future = subscribe(threadId); pubSub.timeout(future); RedissonLockEntry entry; if (interruptibly) { entry = commandExecutor.getInterrupted(future); } else { entry = commandExecutor.get(future); } try { while (true) { ttl = tryAcquire(-1, leaseTime, unit, threadId); // 获取锁成功 if (ttl == null) { break; } // 等待解锁消息 // entry.getLatch() 就是信号量, // 如果其他线程没解锁,tryAcquire()和acquire()都会阻塞该线程 if (ttl >= 0) { try { entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { entry.getLatch().acquire(); } else { entry.getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(entry, threadId); } }
6.5.3 超时释放
针对于业务执行时间过长,导致锁过期的问题,Redisson提出了WatchDog机制,即在业务执行过程中,自动刷新锁过期时间。
注意,这项机制只有在自己没有指定锁过期时间才会生效:
java
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// internalLockLeaseTime 默认30秒
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
ttlRemainingFuture = new CompletableFutureWrapper<>(s);
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
if (ttlRemaining == null) {
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 过期时间自动刷新
scheduleExpirationRenewal(threadId);
}
}
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}继续跟踪scheduleExpirationRenewal()方法,找到如下方法:
java
final void add(String rawName, String lockName, long threadId, LockEntry entry) {
addSlotName(rawName);
LockEntry oldEntry = name2entry.putIfAbsent(rawName, entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId, lockName);
} else {
if (tryRun()) {
schedule();
}
}
}java
public void schedule() {
if (!running.get()) {
return;
}
long internalLockLeaseTime = executor.getServiceManager().getCfg().getLockWatchdogTimeout();
executor.getServiceManager().newTimeout(this, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
}可以看到,过了三分之一的internalLockLeaseTime时间,就会自动执行this任务(这里的this是指RenewalTask类对象,其中有run()方法)
java
@Override
public void run(Timeout timeout) {
if (executor.getServiceManager().isShuttingDown()) {
return;
}
CompletionStage<Void> future = execute();
future.whenComplete((result, e) -> {
if (e != null) {
log.error("Can't update locks {} expiration", name2entry.keySet(), e);
schedule();
return;
}
schedule();
});
}java
final CompletionStage<Void> execute() {
if (name2entry.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
if (!executor.getServiceManager().getCfg().isClusterConfig()) {
return renew(name2entry.keySet().iterator(), chunkSize);
}
return renewSlots(slot2names.values().iterator(), chunkSize);
}由于不是集群,我们查看renew()方法,这是一个抽象方法,我们找到LockTask实现:
java
CompletionStage<Void> renew(Iterator<String> iter, int chunkSize) {
if (!iter.hasNext()) {
return CompletableFuture.completedFuture(null);
}
Map<String, Long> name2threadId = new HashMap<>(chunkSize);
List<Object> args = new ArrayList<>(chunkSize + 1);
args.add(internalLockLeaseTime);
List<String> keys = new ArrayList<>(chunkSize);
while (iter.hasNext()) {
String key = iter.next();
LockEntry entry = name2entry.get(key);
if (entry == null) {
continue;
}
Long threadId = entry.getFirstThreadId();
if (threadId == null) {
continue;
}
keys.add(key);
args.add(entry.getLockName(threadId));
name2threadId.put(key, threadId);
if (keys.size() == chunkSize) {
break;
}
}
if (keys.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
String firstName = keys.get(0);
CompletionStage<List<String>> f = executor.syncedEval(firstName, LongCodec.INSTANCE,
new RedisCommand<>("EVAL", new ContainsDecoder<>(keys)),
"local result = {} " +
"for i = 1, #KEYS, 1 do " +
"if (redis.call('hexists', KEYS[i], ARGV[i + 1]) == 1) then " +
"redis.call('pexpire', KEYS[i], ARGV[1]); " +
"table.insert(result, 1); " +
"else " +
"table.insert(result, 0); " +
"end; " +
"end; " +
"return result;",
new ArrayList<>(keys),
args.toArray());
return f.thenCompose(existingNames -> {
keys.removeAll(existingNames);
for (String key : keys) {
cancelExpirationRenewal(key, name2threadId.get(key));
}
return renew(iter, chunkSize);
});
}其中最主要的就是40-49行的Lua脚本,其中会刷新锁过期时间。
7. RedLock
7.1 RedLock介绍
为了解决单点故障,Redis的作者提出了RedLock算法,内容如下:
- 首先准备N个独立的Redis节点(N为奇数);
- 在开始获取RedLock时,先获取开始时间(毫秒值);
- 然后按顺序获取N个Redis节点锁(使用相同的键值对),在获取锁时,要设置一个较短的超时时间,以防止客户端阻塞在一个下线的Redis节点,从而尽快去获取下一个Redis节点的锁。例如,假设锁过期时间为10秒中,那么获取锁的超时时间应该在5-50毫秒。
- 如果客户端能在获取锁的超时时间内,获取到
(即大部分锁),即视为获取锁成功,仍然采用WatchDog机制延长锁的过期时间; - 如果不能在获取锁的超时时间内获取到大部分锁,则认为获取锁失败,需要释放已获取的锁;
虽然RedLock算法听起来很好,但是,在实际应用中仍有问题,问题出在WatchDog机制上。
如果JVM有GC导致长时间STW(包括实行WatchDog机制的线程),或者由于网络延迟,那么Redis中的锁会自动过期,而客户端仍然认为自己持有锁,但是此时其他客户端也可以去获取锁。这在单节点锁和RedLock中都有问题。
在RedLock中出问题的情况:
- 假设现在有5个Redis节点,分别称为A,B,C,D,E;
- 客户端1先获取锁,在节点A、B、C上设置了锁标识,D和E节点由于网络不可达,没有设置标识;
- 由于网络波动,或者STW暂停,导致客户端1没有续上C节点的锁过期时间,导致C节点的锁标识过期;
- 此时,客户端2去获取锁,可以获取到节点C、D、E上的锁,由于获取到大部分锁,客户端2成功获取到锁;
- 最终,客户端1和客户端2都成功获取到了锁,造成并行执行;
7.2 FencedLock
为了解决RedLock的问题,Martin Kleppmann在文章中提出了FencedLock,流程图如下:

即客户端获取锁的时候,也会得到一个Token(可以理解为版本),当去数据库(Storage)更新数据时,会拒绝掉较小的Token(较老的版本),也就是说旧数据不能覆盖掉新数据。
在Redisson中也有FencedLock的实现:
java
@Test
public void test11(){
RFencedLock lock = redissonClient.getFencedLock("lock");
Long token = lock.lockAndGetToken();
System.out.println(token);
lock.unlock();
Sleeper.sleep(1, TimeUnit.SECONDS);
new Thread(()->{
Long token1 = lock.lockAndGetToken();
System.out.println(token1);
lock.unlock();
}).start();
Sleeper.sleep(1000, TimeUnit.SECONDS);
}运行上述代码,查看Redis数据:

可以发现,不仅有锁标识,还有锁的Token。
参考资料
[1] Redis中执行Lua脚本:https://redis.io/docs/latest/develop/interact/programmability/eval-intro/
[2] RedLock:https://redis.io/docs/latest/develop/use/patterns/distributed-locks/#the-redlock-algorithm
[3] RedLock的反对:https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html