Skip to content

JUC ReentrantReadWriteLock和StampedLock

本文介绍读写锁、邮戳锁以及他们的原理。

DANGER

注意,以下源码分析基于OpenJDK 17。

1. ReentrantReadWriteLock

1.1 介绍

在并发编程中,当多个线程访问共享资源时,为了保证数据的一致性,通常需要使用锁机制。常见的锁,比如 synchronized 或者 ReentrantLock,都是排它锁(也称为独占锁),这意味着在任何时刻,只有一个线程能够获得锁并访问共享资源,其他线程必须等待。

然而,在许多应用场景中,对共享资源的读操作远多于写操作。如果使用排它锁,那么即使是多个线程同时进行读操作(读操作本身是线程安全的),也需要排队等待锁的释放,这会极大地降低程序的并发性能。

为了解决这个问题,Java 提供了读写锁。读写锁允许多个线程同时读取共享资源,但在有线程进行写操作时,会阻塞所有的读线程和其他写线程。

读写锁的核心原则:

  1. 读读不互斥: 多个线程可以同时持有读锁。
  2. 读写互斥: 读锁和写锁之间是互斥的,持有读锁时,其他线程无法获得写锁;持有写锁时,其他线程无法获得读锁。
  3. 写写互斥: 多个线程不能同时持有写锁。

在Java中,读写锁的实现是由ReentrantReadWriteLock实现的。

1.2 使用案例

1.2.1 获取读写锁

在Java中,如果要获取读写锁,可以先创建ReentrantReadWriteLock对象,然后通过readLock()writeLock()分别获取读锁和写锁:

java
// 创建读写锁
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
// 获取读锁
ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
// 获取写锁
ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();

1.2.2 使用读写锁

  • 当我们需要对共享资源进行读操作时,使用读锁加锁;

  • 当我们需要对共享资源进行写操作时,使用写锁加锁;

例如,现在有一个共享资源整数data,读取data时加读锁,修改data时加写锁:

java
@Slf4j
class SharedData{
    private int data;

    // 创建读写锁
    ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
    // 获取读锁
    ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
    // 获取写锁
    ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();

    public SharedData(int data){
        this.data = data;
    }

    public int getData(){
        // 获取读锁
        readLock.lock();
        try {
            log.info("正在读取数据...");
            Sleeper.sleep(new Random().nextInt(1000));  // 模拟读取操作
            log.info("读取数据完成:{}", this.data);
            return data;
        }finally {
            // 释放锁
            readLock.unlock();
        }
    }

    public void setData(int data){
        writeLock.lock();
        try {
            log.info("正在修改数据...");
            Sleeper.sleep(new Random().nextInt(1000));  // 模拟修改操作
            log.info("修改数据完成,修改前:{},修改后:{}", this.data, data);
            this.data = data;
        }finally {
            writeLock.unlock();
        }
    }
}

编写测试代码入戏:

java
public static void main(String[] args) {
    SharedData sharedData = new SharedData(1);

    for (int i = 0; i < 10; i++) {
        int j = i;
        new Thread(()->{
            if(j % 2 != 0){
                int data = sharedData.getData();
            }else{
                sharedData.setData(j);
            }
        }).start();
    }
}

结果如下:

txt
11:32:28.453 [Thread-1] INFO  : 正在读取数据...
11:32:28.453 [Thread-3] INFO  : 正在读取数据...
11:32:28.518 [Thread-1] INFO  : 读取数据完成:1
11:32:28.881 [Thread-3] INFO  : 读取数据完成:1
11:32:28.882 [Thread-2] INFO  : 正在修改数据...
11:32:29.369 [Thread-2] INFO  : 修改数据完成,修改前:1,修改后:2
11:32:29.370 [Thread-0] INFO  : 正在修改数据...
11:32:29.982 [Thread-0] INFO  : 修改数据完成,修改前:2,修改后:0
11:32:29.983 [Thread-4] INFO  : 正在修改数据...
11:32:30.451 [Thread-4] INFO  : 修改数据完成,修改前:0,修改后:4
11:32:30.453 [Thread-5] INFO  : 正在读取数据...
11:32:30.603 [Thread-5] INFO  : 读取数据完成:4
11:32:30.604 [Thread-6] INFO  : 正在修改数据...
11:32:31.457 [Thread-6] INFO  : 修改数据完成,修改前:4,修改后:6
11:32:31.458 [Thread-7] INFO  : 正在读取数据...
11:32:31.652 [Thread-7] INFO  : 读取数据完成:6
11:32:31.653 [Thread-8] INFO  : 正在修改数据...
11:32:31.954 [Thread-8] INFO  : 修改数据完成,修改前:6,修改后:8
11:32:31.954 [Thread-9] INFO  : 正在读取数据...
11:32:32.646 [Thread-9] INFO  : 读取数据完成:8

从第1-4行可以看出,同时有两个线程执行读操作,说明允许有多个线程对共享资源加读锁;

第7-8行可以看出,当线程在执行写操作时,其他线程不能进行其他操作,只有等该线程完成后才能操作共享资源,说明写锁是排他的、独占的。

1.2.3 锁升级与锁降级

  • 锁升级:即线程在持有读锁的情况下,去获取写锁,在ReentrantReadWriteLock不支持锁升级
  • 锁降级:即线程在持有写锁的情况下,去获取读锁,在ReentrantReadWriteLock支持锁降级

演示不支持锁升级:

java
public static void main(String[] args) {
    ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
    ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
    ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();

    readLock.lock();
    try {
        // 在获取读锁后获取写锁,会卡住
        writeLock.lock();
        try {
            log.info("锁升级");
        }finally {
            writeLock.unlock();
        }

    }finally {
        readLock.unlock();
    }
}

运行上述程序,会发现“锁升级”没有输出,程序卡住,说明不支持锁升级。

演示支持锁降级:

java
public static void main(String[] args) {
    ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
    ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
    ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();

    writeLock.lock();
    try {
        // 在持有写锁的情况下获取读锁,线程不会阻塞
        readLock.lock();
        try {
            log.info("锁降级");
        }finally {
            readLock.unlock();
        }

    }finally {
        writeLock.unlock();
    }
}

运行上述程序,会发现“锁降级”正常输出,说明支持锁降级。

下面是一个来自oracle的例子演示锁升级与锁降级的使用,来源:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.html

java
class CachedData {
    Object data;
    volatile boolean cacheValid;
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    void processCachedData() {
        // 加读锁获取缓存数据
        rwl.readLock().lock();
        // 如果发现缓存失效,则更新缓存数据
        if (!cacheValid) {
            // 在更新前释放读锁,否则加写锁会失败
            rwl.readLock().unlock();
            // 加写锁
            rwl.writeLock().lock();
            try {
                // 再次检查缓存是否有效,避免其他线程已经更新了数据
                if (!cacheValid) {
                    data = "";
                    cacheValid = true;
                }
                // 锁降级,在释放写锁前获取读锁
                rwl.readLock().lock();
            } finally {
                // 释放写锁,仍然持有读锁
                rwl.writeLock().unlock();
            }
        }

        try {
            // 使用数据
            use(data);
        } finally {
            // 释放读锁
            rwl.readLock().unlock();
        }
    }

    public void use(Object data){

    }
}

1.2.4 锁的可重入性

读锁和写锁均支持重入。

写锁重入演示:

java
public static void main(String[] args) {
    ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
    ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
    ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();

    // 第一次加写锁
    writeLock.lock();
    try {
        // 第二次加写锁
        writeLock.lock();
        try {
            log.info("写锁重入");
        }finally {
            writeLock.unlock();
        }

    }finally {
        writeLock.unlock();
    }
}

读锁重入演示:

java
public static void main(String[] args) {
    ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
    ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
    ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();

    // 第一次加读锁
    readLock.lock();
    try {
        // 第二次加读锁
        readLock.lock();
        try {
            log.info("读锁重入");
        }finally {
            readLock.unlock();
        }

    }finally {
        readLock.unlock();
    }
}

1.2.5 条件变量

读锁不支持条件变量,写锁支持条件变量。

下面代码演示获取读锁的条件变量:

java
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
Condition condition = rwl.readLock().newCondition();

结果如下:

java
Exception in thread "main" java.lang.UnsupportedOperationException
	at java.base/java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.newCondition(ReentrantReadWriteLock.java:907)

查看源码,发现方法没有实现,直接抛出异常:

java
/**
 * Throws {@code UnsupportedOperationException} because
 * {@code ReadLocks} do not support conditions.
 *
 * @throws UnsupportedOperationException always
 */
public Condition newCondition() {
    throw new UnsupportedOperationException();
}

下面代码演示写锁的条件变量:

java
public static void main(String[] args) {
    ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    Condition condition = rwl.writeLock().newCondition();
    System.out.println(condition);
}
// 结果:
// java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@40f08448

发现写锁可以正常获取条件变量。

1.3 原理

1.3.1 创建读写锁

ReentrantReadWriteLock中,存在如下字段:

java
public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
  	// 读锁
    private final ReentrantReadWriteLock.ReadLock readerLock;
  	// 写锁
    private final ReentrantReadWriteLock.WriteLock writerLock;
  	// 同步器
    final Sync sync;
}

ReadLockWriteLockSync都是ReentrantReadWriteLock的内部类。其中,Sync是AQS的子类。

当我们调用构造方法时,会创建内部字段:

java
public ReentrantReadWriteLock() {
    this(false);
}

public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

可以发现,ReentrantReadWriteLock也是支持公平与非公平模式的,默认是非公平模式。

ReadLockWriteLock中,也维护着Sync对象的引用:

java
public static class ReadLock implements Lock, java.io.Serializable {
  
    private final Sync sync;

    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
}
java
public static class WriteLock implements Lock, java.io.Serializable {
  
    private final Sync sync;

    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }
}

当我们调用writeLock()readLock()时,就是把内部字段返回出去:

java
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

1.3.2 state状态解析

在读写锁实现中,AQS中的state仍然表示锁状态,但不同的是,写锁状态占了state的低16位(写锁的重入次数),读锁状态占了state的高16位(多少个线程持有了读锁)。

txt
00000000 00000000 00000000 00000000
-------读锁------- -------写锁------

Sync中定义了如下两个工具方法,用来获取读锁和写锁数量:

java
static final int SHARED_SHIFT   = 16;
// EXCLUSIVE_MASK二进制表示:00000000 00000000 11111111 11111111
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

// 获取读锁数量
// c就是state
static int sharedCount(int c)    
{ 
  // 无符号右移16位
  return c >>> SHARED_SHIFT; 
}

// 获取写锁数量
// c就是state
static int exclusiveCount(int c) 
{ 
  // state和EXCLUSIVE_MASK做按位与运算
  return c & EXCLUSIVE_MASK; 
}

1.3.3 获取写锁

WriteLock中调用lock()方法,实际就是调用sync的方法:

java
public void lock() {
    sync.acquire(1);
}

acquire()是AQS中的方法,里面内容如下:

java
public final void acquire(int arg) {
    if (!tryAcquire(arg))
        acquire(null, arg, false, false, false, 0L);
}

tryAcquire()需要子类实现,在Sync中如下:

java
protected final boolean tryAcquire(int acquires) {
  	// 获取当前线程
    Thread current = Thread.currentThread();
  	// 获取state
    int c = getState();
  	// 获取写锁数量
    int w = exclusiveCount(c);
  	// 如果state不等于0
    if (c != 0) {
        // 如果 c != 0 并且 w == 0 ,说明读锁数量不为0,此时不能加写锁
        if (w == 0)
            return false;
      	// 如果c != 0 并且 w != 0,
      	// 说明此时写锁数量不为0,读锁数量为0(因为写锁是独占的,加了写锁,就不能加读锁)
      	// 判断加写锁的线程和本线程是否相同,不同则返回false,不能加锁
      	if (current != getExclusiveOwnerThread())
          return false;
      	// 判断加锁数量
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 直接加锁,可重入
        setState(c + acquires);
        return true;
    }
  	// 此时 state 等于0,说明读锁和写锁都没有,此时可以直接加锁
  	// writerShouldBlock()判断本线程是否应该阻塞,有公平和非公平两种模式实现
  	// CAS修改state写锁状态
    if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
        return false;
  	// 获取写锁成功,设置写锁持有者线程
    setExclusiveOwnerThread(current);
    return true;
}

注意writerShouldBlock()有两种实现:

java
// 非公平模式下,始终返回false,说明试图加写锁的线程不应该阻塞
final boolean writerShouldBlock() {
    return false; 
}

// 公平模式下,判断CLH队列中是否有其他等待线程,如果有,则表示该线程应该阻塞
final boolean writerShouldBlock() {
    return hasQueuedPredecessors();
}

如果获取写锁失败,则调用AQS中的acquire(null, arg, false, false, false, 0L);,进入CLH队列等待获取锁。

1.3.4 释放写锁

WriteLock中释放写锁,调用unlock()方法,内容如下:

java
public void unlock() {
    sync.release(1);
}

实际调用的是Sync(AQS)中的release()方法:

java
public final boolean release(int arg) {
    if (tryRelease(arg)) {
      	// 释放写锁成功后,通知CLH队列中第一个等待线程
        signalNext(head);
        return true;
    }
    return false;
}

Sync中,实现了tryRelease()方法:

java
protected final boolean tryRelease(int releases) {
  	// 判断当前获取锁的线程和本线程是否相同,如果不同直接抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
  	// state 下一个值
    int nextc = getState() - releases;
  	// free是否完全释放了写锁,如果完全释放了写锁,则将锁的持有者线程置为null
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
  	// 设置新的state值
    setState(nextc);
    return free;
}

1.3.5 读锁相关结构

在分析获取读锁相关源码前,我们先看看读锁相关字段。

Sync类中定义了两个内部类:

java
// 线程持有读锁数量
static final class HoldCounter {
  	// 读锁数量,初始化为0
    int count;          
  	// 线程ID
    final long tid = LockSupport.getThreadId(Thread.currentThread());
}

// ThreadLocal 类,线程私有
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
  	// 初始化方法
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}

Sync中有如下字段:

java
// 线程私有的,记录当前线程持有读锁的数量
private transient ThreadLocalHoldCounter readHolds;
// 缓存上一个成功获取读锁的非 firstReader 线程的 HoldCounter
private transient HoldCounter cachedHoldCounter;

// 第一个获取读锁的线程
private transient Thread firstReader;
// 第一个获取读锁的线程持有读锁的数量
private transient int firstReaderHoldCount;

为什么需要cachedHoldCounter

在某些访问模式下,同一个线程(非 firstReader)可能会连续多次获取读锁(无论是由于重入调用,还是在释放后立即再次获取)。cachedHoldCounter 通过缓存这个线程的 HoldCounter,使得该线程在后续的获取尝试中可以直接命中缓存,从而避免了重复的 ThreadLocal 查找开销,提供了一个微小的性能提升。

1.3.6 获取读锁

当我们调用ReadLocklock()方法时,实际调用的是SyncacquireShared()方法:

java
public void lock() {
    sync.acquireShared(1);
}

acquireShared()是AQS中实现的方法:

java
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        acquire(null, arg, true, false, false, 0L);
}

tryAcquireShared()需要子类实现,在Sync中的实现如下:

java
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
  	// 判断写锁数量以及获取写锁的线程和当前线程是否一样,如果有其他线程获取了写锁,则直接失败
    if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
        return -1;
  	// r为读锁数量
    int r = sharedCount(c);
  	// !readerShouldBlock(): 检查读线程是否应该被阻塞。
    // 在非公平模式下,只有CLH队列头部的线程是写线程时,读线程才会被阻塞。(为了防止写线程饿死)
    // 在公平模式下,如果队列中有任何等待的线程(无论是读还是写),当前读线程会被阻塞,以保证先到先得。
    // r < MAX_COUNT 读锁数量没有超过最大值
  	// compareAndSetState(c, c + SHARED_UNIT) 修改读锁数量
  	// SHARED_UNIT 是 (1 << 16),表示在 state 的高 16 位加 1。
    if (!readerShouldBlock() && r < MAX_COUNT && 
        compareAndSetState(c, c + SHARED_UNIT)) {
      	// 程序运行到这里,表示该线程已经成功获取到了读锁,此时需要记录一下线程获取读锁的情况
      
      	// 如果之前的读锁计数是 0,说明当前线程是第一个获取读锁的线程
        if (r == 0) {
          	// 记录第一个获取读锁的线程和数量
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
          	// 之前的读锁计数不为0,但是第一个获取读锁的线程就是本线程,
          	// 增加第一个读线程的重入计数
            firstReaderHoldCount++;
        } else {
          	// 有其他线程先于本线程获取了读锁
           	// cachedHoldCounter是上一个获取读锁的非 firstReader 线程的 HoldCounter 的缓存优化
            HoldCounter rh = cachedHoldCounter;
          	// 如果缓存为空(前一个线程是第一个获取读锁的线程),
          	// 或缓存不是当前线程的,
          	// 则初始化当前线程的读锁计数,并且重新设置读锁缓存
            if (rh == null || rh.tid != LockSupport.getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
              	// 如果缓存是当前线程的,但计数为 0,可能是因为之前释放了所有读锁,需要重新设置到 ThreadLocal
                readHolds.set(rh);
          	// 当前线程获取读锁的数量+1
            rh.count++;
        }
        return 1;
    }
  
  	// 如果前面的 if 条件不满足(readerShouldBlock 为 true,或读锁计数已满,或 CAS 失败)
    // 则调用 fullTryAcquireShared 方法进行完整的尝试获取过程
    return fullTryAcquireShared(current);
}

fullTryAcquireShared()中,主要是处理CAS失败的逻辑,主要通过无限循环for(;;)来实现,如果 compareAndSetState() 失败(多个线程获取读锁,竞争激烈),它不会立即返回。循环会继续执行,线程会在下一轮循环中再次尝试获取最新的 state 并进行 CAS。

总结来说:

  • tryAcquireShared() 是一个快速退出的方法。它只做最基本的检查和一次尝试,如果条件不完全满足或者 CAS 竞争失败,它就放弃本次尝试。

  • fullTryAcquireShared() 是一个坚持获取的方法。它在一个循环中反复执行获取逻辑,直到成功为止或者确定不能获取读锁时(例如目前有线程持有写锁),返回失败。

1.3.7 释放读锁

当我们调用ReadLockunlock()时,源码如下:

java
public void unlock() {
    sync.releaseShared(1);
}

AQS 中的releaseShared()如下:

java
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
      	// 完全释放读锁后,唤醒下一个等待的线程
        signalNext(head);
        return true;
    }
    return false;
}

tryReleaseShared()在子类Sync中实现:

java
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
  	// 以下处理线程读锁重入逻辑
  	// 如果该线程是第一个获取读锁的线程,将第一个获取读锁线程的锁计数减一
    if (firstReader == current) {
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
      	// 如果该线程不是第一个获取读锁的线程,则从ThreadLocal中获取读锁计数
        HoldCounter rh = cachedHoldCounter;
        if (rh == null ||
            rh.tid != LockSupport.getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
          	// 如果锁计数减为0,则直接移除ThreadLocal变量
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
  	// CAS更改state状态
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;  // 读锁计数减一
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

1.3.8 唤醒等待线程

如果释放完全释放读锁和写锁后,会调用signalNext()唤醒在CLH队列中第一个等待的线程(在acquire():六个参数版本中阻塞),当该线程获取到锁之后,会执行如下逻辑:

java
if (acquired) {
    if (first) {
        node.prev = null;
        head = node;
        pred.next = null;
        node.waiter = null;
        if (shared)
            signalNextIfShared(node);
        if (interrupted)
            current.interrupt();
    }
    return 1;
}

首先第3-6行会将本线程对应的Node节点置为哨兵节点,即下一个节点成为第一个等待的节点;

在第7-8行中,如果该线程是获取的共享锁(读锁),那么就会调用signalNextIfShared()唤醒下一个线程(如果下一个线程也是获取的共享锁(读锁)):

java
private static void signalNextIfShared(Node h) {
    Node s;
    if (h != null && (s = h.next) != null &&
        (s instanceof SharedNode) && s.status != 0) {
        s.getAndUnsetStatus(WAITING);
        LockSupport.unpark(s.waiter);
    }
}

也就是说,一个获取读锁的线程被唤醒后,那么它会唤醒紧排在其之后获取读锁的线程,依次类推,即最终会唤醒所有相连的获取读锁的线程,反应了读锁是可以并发的。

2. StampedLock

2.1 介绍

StampedLock 是 Java 在 JDK 8 中引入的一个新的锁机制,位于 java.util.concurrent.locks 包下。它提供了一种比 ReentrantReadWriteLock 更细粒度的控制,尤其在读多写少的并发场景下,通过引入“乐观读”模式,可以提供比 ReentrantReadWriteLock 更好的性能。

StampedLock 提供了三种模式来访问共享资源:

  1. 写锁:
    • 这是独占锁,一次只能有一个线程获取写锁。
    • 获取写锁使用 writeLock() 方法,成功获取会返回一个称为 stamp(邮戳)的 long 值。
    • 释放写锁使用 unlockWrite(long stamp) 方法,需要传入获取时返回的 stamp
  2. 读锁 :
    • 这是共享锁,多个线程可以同时持有读锁。
    • 获取读锁使用 readLock() 方法,成功获取会返回一个称为 stamplong 值。
    • 释放读锁使用 unlockRead(long stamp) 方法,需要传入获取时返回的 stamp
  3. 乐观读:
    • 这是 StampedLock 最具特色的地方,也是其性能优势的关键。
    • 乐观读是非阻塞的。在进行乐观读时,线程不加锁,直接去读取共享资源。
    • 执行流程:
      • a. 调用 tryOptimisticRead() 方法,这个方法会立即返回当前的 stamp 值(如果当前没有写锁被持有),表示一个乐观读的开始。
      • b. 线程不加锁地读取共享资源的数据。
      • c. 读取完数据后,调用 validate(long stamp) 方法,传入第一步获取的 stamp 值。
      • d. validate() 方法会检查在获取 stamp 到现在这段时间内,是否发生了写操作。
      • e. 如果 validate() 返回 true,说明在读取过程中没有写操作干扰,读取的数据是一致且有效的
      • f. 如果 validate() 返回 false,说明在读取过程中发生了写操作,读取的数据可能是不一致或无效的。此时,通常需要回退到获取读锁 (readLock()),然后在读锁的保护下重新读取数据。

2.2 使用案例

2.2.1 基本使用案例

现有一个共享资源整数data,其中有多个线程会对其进行读写操作,我们可以使用StampedLock来保证多线程操作下的安全性:

java
@Slf4j
class DataContainer{
    private int data;
    private StampedLock stampedLock = new StampedLock();

    public DataContainer(int data){
        this.data = data;
    }

    public int readData(int readCostTime){
        log.info("开始乐观读取数据");
        long optimisticReadStamp = stampedLock.tryOptimisticRead();
        Sleeper.sleep(readCostTime);
        log.info("乐观读数据完成,开始校验");
        // 校验在乐观读过程中,有没有其他线程修改了数据
        if(stampedLock.validate(optimisticReadStamp)){
            // 校验通过,说明没有线程修改数据,直接返回
            log.info("乐观读校验通过,乐观读取数据成功");
            return this.data;
        }

        // 校验不通过,说明有线程修改了数据,需要进行悲观读
        long readLockStamp = stampedLock.readLock();
        try{
            log.info("乐观读校验不通过,开始悲观读取数据");
            Sleeper.sleep(readCostTime);
            log.info("完成悲观读取数据");
            return this.data;
        }finally {
            // 释放读锁
            stampedLock.unlockRead(readLockStamp);
        }
    }

    public void writeData(int data){
        // 加写锁
        long writeLockStamp = stampedLock.writeLock();
        try {
            log.info("开始修改数据,修改前:{}", this.data);
            Sleeper.sleep(1000);
            this.data = data;
            log.info("修改数据完成,修改后:{}", this.data);
        }finally {
            // 释放写锁
            stampedLock.unlockWrite(writeLockStamp);
        }
    }
}

基于上面的代码,我们可以测试以下场景:

场景一:多个线程同时读取数据

java
public static void main(String[] args) {
    DataContainer dataContainer = new DataContainer(1);

    new Thread(()->{
        dataContainer.readData(1000);
    },"t1").start();

    new Thread(()->{
        dataContainer.readData(500);
    }, "t2").start();
}

结果:

txt
18:56:07.169 [t2] INFO  : 开始乐观读取数据
18:56:07.169 [t1] INFO  : 开始乐观读取数据
18:56:07.676 [t2] INFO  : 乐观读数据完成,开始校验
18:56:07.677 [t2] INFO  : 乐观读校验通过,乐观读取数据成功
18:56:08.174 [t1] INFO  : 乐观读数据完成,开始校验
18:56:08.175 [t1] INFO  : 乐观读校验通过,乐观读取数据成功

可以发现,多个线程同时读取数据时,都是使用的乐观读,可以并发执行。

场景二:一个线程先执行读操作(耗时2秒),然后另一个线程执行写操作(耗时1秒)

java
public static void main(String[] args) {
    DataContainer dataContainer = new DataContainer(1);

    new Thread(()->{
        dataContainer.readData(2000);
    },"t1").start();

    Sleeper.sleep(100);
    new Thread(()->{
        dataContainer.writeData(2);
    }, "t2").start();
}

结果:

java
18:56:37.163 [t1] INFO  : 开始乐观读取数据
18:56:37.265 [t2] INFO  : 开始修改数据,修改前:1
18:56:38.271 [t2] INFO  : 修改数据完成,修改后:2
18:56:39.169 [t1] INFO  : 乐观读数据完成,开始校验
18:56:39.170 [t1] INFO  : 乐观读校验不通过,开始悲观读取数据
18:56:41.173 [t1] INFO  : 完成悲观读取数据

可以看到,在乐观读期间,数据被修改了,所以乐观读校验失败,触发悲观读获取数据。

场景三:一个线程先执行读操作(0.5秒),然后另一个线程执行写操作(1秒)

java
public static void main(String[] args) {
    DataContainer dataContainer = new DataContainer(1);

    new Thread(()->{
        dataContainer.readData(500);
    },"t1").start();

    Sleeper.sleep(100);
    new Thread(()->{
        dataContainer.writeData(2);
    }, "t2").start();
}

结果:

txt
18:59:26.290 [t1] INFO  : 开始乐观读取数据
18:59:26.391 [t2] INFO  : 开始修改数据,修改前:1
18:59:26.797 [t1] INFO  : 乐观读数据完成,开始校验
18:59:27.397 [t2] INFO  : 修改数据完成,修改后:2
18:59:27.397 [t1] INFO  : 乐观读校验不通过,开始悲观读取数据
18:59:27.901 [t1] INFO  : 完成悲观读取数据

可以看到,线程t1先开始进行乐观读数据,但是读数据完成后,开始校验,由于此时t2线程已启动修改数据,t1线程校验失败,开始获取读锁,此时由于t2线程获取了写锁,所以t1线程需要等待t2线程释放了写锁后,才能获取读锁。

场景四:一个线程先执行写操作(1秒),然后另一个线程执行读操作(2秒)

java
public static void main(String[] args) {
    DataContainer dataContainer = new DataContainer(1);

    new Thread(()->{
        dataContainer.writeData(3);
    },"t1").start();

    Sleeper.sleep(100);
    new Thread(()->{
        dataContainer.readData(2000);
    }, "t2").start();
}

结果:

txt
19:05:41.925 [t1] INFO  : 开始修改数据,修改前:1
19:05:42.030 [t2] INFO  : 开始乐观读取数据
19:05:42.926 [t1] INFO  : 修改数据完成,修改后:3
19:05:44.035 [t2] INFO  : 乐观读数据完成,开始校验
19:05:44.036 [t2] INFO  : 乐观读校验不通过,开始悲观读取数据
19:05:46.039 [t2] INFO  : 完成悲观读取数据

可以看到,即使修改数据完成先于乐观读校验,也会导致乐观校验失败,从而触发读锁。

场景五:一个线程先执行写操作(1秒),然后另一个线程执行读操作(0.5秒)

java
public static void main(String[] args) {
    DataContainer dataContainer = new DataContainer(1);

    new Thread(()->{
        dataContainer.writeData(3);
    },"t1").start();

    Sleeper.sleep(100);
    new Thread(()->{
        dataContainer.readData(500);
    }, "t2").start();
}

结果:

txt
19:09:40.847 [t1] INFO  : 开始修改数据,修改前:1
19:09:40.952 [t2] INFO  : 开始乐观读取数据
19:09:41.458 [t2] INFO  : 乐观读数据完成,开始校验
19:09:41.854 [t1] INFO  : 修改数据完成,修改后:3
19:09:41.855 [t2] INFO  : 乐观读校验不通过,开始悲观读取数据
19:09:42.360 [t2] INFO  : 完成悲观读取数据

可以看到,乐观读校验先于修改完成发生,但是乐观读校验会阻塞当前线程,只有等释放了写锁后,才会完成校验,当然是校验不通过。