Skip to content

JUC AQS

本文进行AQS的源码解读。

注意

以下源码基于OpenJDK 17,为了方便理解,会做相关改动。

1. AQS是什么

AQS是AbstractQueuedSynchronizer类的简称,这是位于java.util.concurrent.locks包下的抽象类,AQS 提供了一个框架,用于实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关的同步器(信号量、事件等),子类通过继承 AQS 并重写特定的方法来管理同步状态以及线程的获取和释放。

简单来说,AQS是在Java层面上实现的Monitor机制。

2. AQS中的重要字段

在AQS中有一些重要字段,需要我们提前熟悉:

  • 同步状态state:状态字段,这是一个volatile int变量,用来表示同步状态。不同的同步器对 state 的使用方式不同,例如 ReentrantLock 用它来表示重入次数,Semaphore 用它来表示可用许可数量,CountDownLatch 用它来表示计数器的值。

  • CLH 队列(Craig, Landin, and Hagersten queue): AQS 使用一个变种的 CLH 双向链表作为等待队列,用于管理所有等待获取同步状态(锁)的线程。当线程尝试获取同步状态(锁)失败时,会被封装成一个 Node 节点并加入到队列中。

    CLH队列就像Monitor对象中的EntryList

    在AQS中,CLH队列表示为头尾指针,Node是AQS的内部类。

    java
    // 头尾指针,用来表示等待队列
    private transient volatile Node head;
    private transient volatile Node tail;

    其中Node结构如下:

    java
    abstract static class Node {
        volatile Node prev;       // 前驱节点
        volatile Node next;       // 后继节点
        Thread waiter;            // 线程
        volatile int status;      // Node状态
    }

    由于Node是抽象类,所以根据是否独占,又分为两个子类,均定义为AQS内部类:

    java
    static final class ExclusiveNode extends Node { }
    static final class SharedNode extends Node { }

    Node的状态以常量定义在AQS中,如下:

    java
    static final int WAITING   = 1;          // must be 1
    static final int CANCELLED = 0x80000000; // must be negative
    static final int COND      = 2;

    我们可以理解为三个状态:负数、1和2。

  • exclusiveOwnerThread:当前锁的持有线程,类似于Monitor中的Owner,这是定义在AQS父类中的字段:

    java
    public abstract class AbstractOwnableSynchronizer
        implements java.io.Serializable {
    
        private static final long serialVersionUID = 3737899427754241961L;
    
        protected AbstractOwnableSynchronizer() { }
    
        private transient Thread exclusiveOwnerThread;
    
        protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;
        }
    
        protected final Thread getExclusiveOwnerThread() {
            return exclusiveOwnerThread;
        }
    }
  • 条件变量ConditionObject:这是AQS的内部类,用来定义条件变量,类似于Monitor中的WaitSet,不同的是,AQS支持多个条件变量。

通过以上的分析,我们可以发现,AQS的结构与Monitor结构非常相似。

3. 源码分析

由于ReentrantLock是以AQS为基础的,所以我们以ReentrantLock为入口,分析AQS的源码。

3.1 ReentrantLock结构

RenentrantLock中,定义了一个内部抽象类SyncSync类继承了AQS,并且有两个子类:NonfairSyncFairSync。结构如下:

image-20250518093927804

ReentrantLock中,定义了一个Sync字段:

java
public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
  
    private final Sync sync;
}

RenentrantLock的同步方法都是调用sync的方法,其实就是基于AQS的。

下面我们根据加解锁流程来了解AQS的工作原理。

3.2 lock():初始加锁流程

首先我们调用ReentrantLock中的lock()方法,其内部调用了Synclock()方法:

java
public void lock() {
    sync.lock();
}

Synclock()方法如下:

  • 首先调用initialTryLock(),用于初始获取锁,这是一个抽象方法;
  • 如果初始获取锁失败,则调用acquire()方法,这是AQS提供的方法;
java
final void lock() {
    if (!initialTryLock())
        acquire(1);
}

我们先关注于非公平锁的initialTryLock()实现:

java
final boolean initialTryLock() {
  	// 获取当前线程
    Thread current = Thread.currentThread();
  	// 使用CAS,直接将AQS中的state从0改为1,试图获取锁
    if (compareAndSetState(0, 1)) { 
      	// 如果修改状态成功,表示获取锁成功,则直接设置锁的拥有线程,
        setExclusiveOwnerThread(current);
        return true;
    } else if (getExclusiveOwnerThread() == current) {
      	// 获取锁失败,则判断当前锁的持有者线程是不是当前线程,
      	// 如果是,则表示重入,则设置state的值+1,表示重入次数
        int c = getState() + 1;
        if (c < 0) 
            throw new Error("Maximum lock count exceeded");
        setState(c);
        return true;
    } else
      	// 获取锁失败
        return false;
}

假设现在Thread-0调用lock()方法加锁,由于没有竞争,调用initialTryLock()时直接将AQS中的state从0改为1,获取锁成功:

image-20250518095944625

3.3 acquire():竞争加锁失败

假设现在Thread - 1Thread - 0持有锁期间,也调用了同一把锁的lock()方法,执行initialTryLock()返回失败,则进入acquire()方法:

java
// arg 传入的是1 
public final void acquire(int arg) {
    if (!tryAcquire(arg))
        acquire(null, arg, false, false, false, 0L);
}

此时会先尝试调用tryAcquire()方法尝试获取锁,这个方法在AQS 中没有实现,需要子类实现,NonfairSync中该方法的实现如下:

java
protected final boolean tryAcquire(int acquires) {
    if (getState() == 0 && compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

NonfairSynctryAcquire()的实现很简单,就是判断AQS中的state是否为0,如果为0则使用CAS修改状态,获取锁成功后设置锁的持有者为当前线程。

很显然,由于Thread - 0还没有释放锁,所以这个方法调用失败,接着调用acquire()方法:

java
acquire(null, arg, false, false, false, 0L);
Details
java
final int acquire(Node node, 
                  int arg, 
                  boolean shared,
                  boolean interruptible, 
                  boolean timed, 
                  long time) {
  	// 当前线程
    Thread current = Thread.currentThread();
  	// 自旋重试次数
    byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
  	// 是否被中断
    boolean interrupted = false;
  	// 是否为等待队列中第一个节点
  	boolean first = false;
  	// 前驱节点
    Node pred = null;                // predecessor of node when enqueued

    for (;;) {
      	pred = null;
      	if(node != null){
          pred = node.prev;
        }
      	first = head == pred;
      	
        if (!first && pred != null) {
            if (pred.status < 0) {
                cleanQueue();           // predecessor cancelled
                continue;
            } else if (pred.prev == null) {
                Thread.onSpinWait();    // ensure serialization
                continue;
            }
        }
        if (first || pred == null) {
            boolean acquired;
            try {
                if (shared)
                    acquired = (tryAcquireShared(arg) >= 0);
                else
                    acquired = tryAcquire(arg);
            } catch (Throwable ex) {
                cancelAcquire(node, interrupted, false);
                throw ex;
            }
            if (acquired) {
                if (first) {
                    node.prev = null;
                    head = node;
                    pred.next = null;
                    node.waiter = null;
                    if (shared)
                        signalNextIfShared(node);
                    if (interrupted)
                        current.interrupt();
                }
                return 1;
            }
        }
        if (node == null) {                 // allocate; retry before enqueue
            if (shared)
                node = new SharedNode();
            else
                node = new ExclusiveNode();
        } else if (pred == null) {          // try to enqueue
            node.waiter = current;
            Node t = tail;
            node.setPrevRelaxed(t);         // avoid unnecessary fence
            if (t == null)
                tryInitializeHead();
            else if (!casTail(t, node))
                node.setPrevRelaxed(null);  // back out
            else
                t.next = node;
        } else if (first && spins != 0) {
            --spins;                        // reduce unfairness on rewaits
            Thread.onSpinWait();
        } else if (node.status == 0) {
            node.status = WAITING;          // enable signal and recheck
        } else {
            long nanos;
            spins = postSpins = (byte)((postSpins << 1) | 1);
            if (!timed)
                LockSupport.park(this);
            else if ((nanos = time - System.nanoTime()) > 0L)
                LockSupport.parkNanos(this, nanos);
            else
                break;
            node.clearStatus();
            if ((interrupted |= Thread.interrupted()) && interruptible)
                break;
        }
    }
    return cancelAcquire(node, interrupted, interruptible);
}

先解释这几个方法参数的含义:

  • node:当前尝试获取的线程对应的 Node 节点。如果线程是第一次尝试获取失败,node 可能为 null,会在后续创建。
  • arg: 尝试获取的资源数量。
  • shared: 是否是共享模式获取。
  • interruptible: 获取过程是否允许中断。
  • timed: 获取过程是否有超时时间(对应 tryLock(long, TimeUnit))。
  • time: 如果 timedtrue,表示获取的截止时间(纳秒)。

然后进入for(;;)循环中:

  1. 第一次循环:创建Node节点,由于是独占的(方法参数shared传的false),所以执行node = new ExclusiveNode();(上面第63行代码);

  2. 第二次循环:试图将node节点放入等待队列(满足上面第64行代码),在第二次循环中,由于头尾指针没有初始化(都为null),所以会先调用tryInitializeHead()初始化头尾指针,代码如下:

    java
    private void tryInitializeHead() {
        Node h = new ExclusiveNode();
      	// this 表示AQS对象
        if (U.compareAndSetReference(this, HEAD, null, h))
            tail = h;
    }

    结果示例图如下:

    image-20250518104846056

    CLH队列中的第一个节点称为Dummy(哑元)或哨兵,用来占位,并不关联线程。

  3. 第三次循环:还是试图将node节点放入等待队列(满足上面第64行代码),最终完成入队,示意图如下:

    image-20250518105925023

    这里有点难以理解,因为第三行代码中的casTail()会执行返回true,然后取反不满足if条件,接着执行最后的else逻辑:

    java
    if (t == null)
        tryInitializeHead();
    else if (!casTail(t, node))
        node.setPrevRelaxed(null);  // back out
    else
        t.next = node;
  4. 第四次循环:将node节点的状态设置为WAITING状态:

    image-20250518110234263

  5. 第五次循环:设置自旋次数为1(spins = postSpins = (byte)((postSpins << 1) | 1);),然后进入等待状态:

    java
    if (!timed)
        LockSupport.park(this);

    至此,Thread - 1进入等待状态,下图以灰色表示等待状态:

    image-20250518110714877

    spins 方法局部变量,是线程私有状态,不是Node节点状态

3.4 再看acquire()方法

在上面一小节中,我们主要关注node节点的入队流程(第59行代码之后),我们再看看第59行代码之前做了什么事。

首先在循环开始之前,都会执行以下代码:

java
pred = null;
if(node != null){
  pred = node.prev;
}
first = head == pred;

判断当前节点是否为队列中第一个的节点(排除哨兵节点),以及当前节点的前驱节点。

如果当前节点是队列中第一个节点或前驱节点为空,则满足第34行中的if条件。

  • 这个if代码块中主要就是去获取锁,因此,在上一小节的5次循环中,都会试图获取锁,即在进入等待状态前,不放过每一次获取锁的机会,这也算另一种程度的自旋优化吧。

如果当前节点不是队列中第一个节点,并且当前节点的前驱节点不为空,则满足第25行中的if条件。这个if代码块中主要完成一件事:清理已取消的节点。

  • if (pred.status < 0): 检查前驱节点的状态。如果状态小于 0(例如 CANCELLED),说明前驱节点取消了等待。这时会调用 cleanQueue() 方法清理队列中的已取消节点。然后 continue 跳到下一次循环,重新检查队列状态,尝试成为新的第一个等待节点。
  • else if (pred.prev == null): 这个分支处理一种特殊情况,可能是在 cleanQueue() 后前驱的 prev 还没完全设置好,或者 Head 节点刚刚初始化。Thread.onSpinWait() 是一个 JVM/CPU 级别的优化提示。continue 重试。

总结acquire()方法,主要是将node节点入队,在进入等待状态前判断是否为第一个等待节点,如果是则进行自旋优化获取锁,如果不是则进行清理等待队列工作。

假设现在Thread - 2Thread - 3都因为竞争锁失败进入等待队列,示意图如下:

3.5 unlock():释放锁流程

此时Thread - 0调用unlock()释放锁,ReentrantLockunlock()代码如下:

java
public void unlock() {
  sync.release(1);
}

Syncrelease()代码如下:

java
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        signalNext(head);
        return true;
    }
    return false;
}

tryRelease()在AQS中未实现,在Sync中实现如下:

java
protected final boolean tryRelease(int releases) {
  	// c 表示释放锁之后state的值
    int c = getState() - releases;
  	// 只有锁的持有者线程才能释放锁
  	// 如果试图释放锁的线程和持有锁的线程不同,则抛出异常
    if (getExclusiveOwnerThread() != Thread.currentThread())
        throw new IllegalMonitorStateException();
  	// 如果c为0,则表示完全释放了锁,则锁的持有者线程置为null
    boolean free = (c == 0);
    if (free)
        setExclusiveOwnerThread(null);
  	// 设置锁的状态
    setState(c);
  	// 只有完全释放了锁才返回true
    return free;
}

当完全释放锁之后,此时示意图如下:

此时调用signalNext(head)唤醒等待队列中的线程:

java
// h是头队列
private static void signalNext(Node h) {
    Node s = null;
  	if(h != null){
      s = h.next;
    }
    if (s != null && s.status != 0) {
      	// 将节点状态取反
        s.getAndUnsetStatus(WAITING);
      	// 唤醒线程
        LockSupport.unpark(s.waiter);
    }
}

当线程(Thread - 1)被唤醒后,还在acquire()的无限循环中,此时主要完成以下工作:

  • 清除节点状态:node.clearStatus(),由于唤醒时将节点状态置为了WAITING的取反状态,此时应该恢复为0;

  • 判断是否被中断:如果当前线程是可中断的等待,并且在等待过程中被打断,则直接退出循环,取消获取锁。

    注意,如果线程调用LockSupport.park()等待过程中,其他线程调用interrupt()方法打断等待中的线程,等待线程会即时恢复运行,所以这个工作主要是判断等待线程是否由其他线程打断,如果等待线程是被释放锁的线程唤醒的,那么基本不可能进入该段逻辑(即退出循环)。

然后进入下一次循环,成功获取到锁,并且调整CLH队列,将原先的哨兵节点抛弃,将当前节点置为新的哨兵节点:

3.6 不公平锁与公平锁的原理、自旋优化流程

我们先看看不公平锁的实现,假设现在Thread - 1线程释放了锁,此时应该唤醒Thread - 2线程,但是此时刚好又有Thread - 4线程来竞争锁,根据前面的流程,Thread - 4调用lock()方法时就会调用initialTryLock()方法获取锁,而Thread - 2被唤醒后还要执行一些检查工作,因此Thread - 4成功获取到锁,而Thread - 2执行tryAcquire()时获取锁失败,此时会进入自旋优化流程:

java
else if (first && spins != 0) {
    --spins;                        // reduce unfairness on rewaits
    Thread.onSpinWait();
}

// Thread.onSpinWait()是一个空方法
public static void onSpinWait() {}

自旋完成后,如果仍然没有获取到锁,则进入等待前,重新设置自旋优化次数:

java
// postSpins 之前是1,那么重新计算后,
// postSpins 和 spins 变为3
spins = postSpins = (byte)((postSpins << 1) | 1);

示意图如下:

假如我们使用公平锁FairSync,那么我们来看看公平锁获取锁的流程:

java
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final boolean initialTryLock() {
        Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedThreads() && 
                compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (getExclusiveOwnerThread() == current) {
            if (++c < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(c);
            return true;
        }
        return false;
    }

    protected final boolean tryAcquire(int acquires) {
        if (getState() == 0 && 
            !hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
}

可以看到,FairSync中的initialTryLock()tryAcquire()方法中,在获取锁的判断中增加了!hasQueuedThreads()方法,这个方法用来判断等待队列中是否有等待的线程:

  • 当等待队列为空,或者等待队列中第一个节点线程是当前线程,则返回false,表示没有比当前线程等待时间更久的线程了;
  • 反之则返回true
java
public final boolean hasQueuedPredecessors() {
    Thread first = null; 
  	Node h = head;
  	Node s = null;
  	if(h != null){
      s = h.next;
      first = s.waiter;
    }
    if (h != null && (s == null || first == null || s.prev == null))
        first = getFirstQueuedThread(); // retry via getFirstQueuedThread
    return first != null && first != Thread.currentThread();
}

所以公平锁的原理就是在获取锁的时候,判断等待队列是否有比当前线程等待更久的线程,如果有,则入队等待。

3.7 可打断原理

如果我们在获取锁时,调用的是可打断的方法,则源码如下:

java
public void lockInterruptibly() throws InterruptedException {
    sync.lockInterruptibly();
}

则会调用Sync中的lockInterruptibly()方法:

java
final void lockInterruptibly() throws InterruptedException {
  	// 首先判断一下线程是否被打断,如果是,直接抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
  	// 首先调用初始化获取锁方法,获取失败,调用AQS的acquireInterruptibly(1)方法
    if (!initialTryLock())
        acquireInterruptibly(1);
}

在AQS的acquireInterruptibly()中,源码如下:

java
public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted() ||
        (!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0))
        throw new InterruptedException();
}

最终仍然调用acqire()方法,只不过此时interruptible参数传的是true,在acquire()方法中,如果线程在LockSupport.park()等待期间被打断,则会判断是否被打断,如果是并且允许被打断,则直接退出循环,执行取消获取锁流程:cancelAcquire

java
if ((interrupted |= Thread.interrupted()) && interruptible)
    break;

3.8 限时等待原理

当我们调用带时间的tryLock()时,会将等待时间转换为纳秒,然后调用Sync类中的tryLockNanos()方法:

java
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryLockNanos(unit.toNanos(timeout));
}
java
final boolean tryLockNanos(long nanos) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return initialTryLock() || tryAcquireNanos(1, nanos);
}

最终调用AQS中的tryAcquireNanos()方法:

java
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
    throws InterruptedException {
  	// 线程没有被打断
    if (!Thread.interrupted()) {
      	// 第一次获取锁成功,则直接返回
        if (tryAcquire(arg))
            return true;
      	// 方法参数校验
        if (nanosTimeout <= 0L)
            return false;
      	// interruptible:true,可打断
      	// timed: true,限时等待
      	// time: System.nanoTime() + nanosTimeout,当前时间的纳秒值+需等待的纳秒值,
      	// 		time表示等待截止时间
        int stat = acquire(null, arg, false, true, true,
                           System.nanoTime() + nanosTimeout);
        if (stat > 0)
            return true;
        if (stat == 0)
            return false;
    }
    throw new InterruptedException();
}

acquire()方法中,如果是限时等待,则计算每次需要等待的时间nanos,计算逻辑就是等待截止时间减去当前时间,如果小于0,则表示已等待足够时间,直接退出循环。

java
if (!timed)
    LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
    LockSupport.parkNanos(this, nanos);
else
  break;

4. 条件变量实现原理

下面我们来看看AQS中如何实现条件变量。

4.1 创建条件变量

ReentrantLock中,创建条件变量调用的是newCondition()方法:

java
public Condition newCondition() {
    return sync.newCondition();
}

实际调用的是SyncnewCondition()方法:

java
final ConditionObject newCondition() {
    return new ConditionObject();
}

其实就是创建了一个ConditionObject对象,这是定义在AQS中的内部类。该类也定义了一个等待队列,通过首尾指针:

java
public class ConditionObject implements Condition, java.io.Serializable {
    /** First node of condition queue. */
    private transient ConditionNode firstWaiter;
    /** Last node of condition queue. */
    private transient ConditionNode lastWaiter;
}

ConditionNodeNode的子类,其中多定义了一个nextWaiter字段,指向条件变量等待队列中下一个节点:

java
static final class ConditionNode extends Node
    implements ForkJoinPool.ManagedBlocker {
  
    ConditionNode nextWaiter;            // 指向下一个节点

    public final boolean isReleasable() {
        return status <= 1 || Thread.currentThread().isInterrupted();
    }

    public final boolean block() {
        while (!isReleasable()) 
          LockSupport.park();
        return true;
    }
}

注意到 ConditionNode 实现了 ForkJoinPool.ManagedBlocker 接口,这是关键点。

当一个 ForkJoinPool 的工作线程遇到阻塞操作时,如果这个阻塞操作实现了 ManagedBlocker 接口并通过 ForkJoinPool.managedBlock(ManagedBlocker blocker) 方法执行,ForkJoinPool 可以感知到这个阻塞,并可能为了维持足够的并行度而创建或激活额外的线程。

ManagedBlocker 接口有两个方法:

  1. boolean isReleasable(): 检查当前阻塞是否可以解除(即是否可以停止阻塞)。

    在上面的实现中,当节点状态小于等于1(即为WAITINGCANCELLED状态)时、或者线程被打断时,表示可以解除阻塞状态。

  2. boolean block(): 执行实际的阻塞操作。

    如果当前线程不能解除阻塞状态,则执行LockSupport.park()进入等待状态。

4.2 await():进入条件变量等待(上)

如果某个线程因为不满足执行条件,需要释放锁进入条件变量时,会调用await()方法:

Details
java
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    ConditionNode node = new ConditionNode();
    int savedState = enableWait(node);
    boolean interrupted = false, cancelled = false, rejected = false;
    while (!canReacquire(node)) {
        if (interrupted |= Thread.interrupted()) {
            if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                break;              // else interrupted after signal
        } else if ((node.status & COND) != 0) {
            try {
                if (rejected)
                    node.block();
                else
                    ForkJoinPool.managedBlock(node);
            } catch (RejectedExecutionException ex) {
                rejected = true;
            } catch (InterruptedException ie) {
                interrupted = true;
            }
        } else
            Thread.onSpinWait();    // awoke while enqueuing
    }
    node.clearStatus();
    acquire(node, savedState, false, false, false, 0L);
    if (interrupted) {
        if (cancelled) {
            unlinkCancelledWaiters(node);
            throw new InterruptedException();
        }
        Thread.currentThread().interrupt();
    }
}

上面的逻辑分为前后两部分:进入条件变量等待、被唤醒后。

这里只讲解进入条件变量等待部分,主要完成以下工作:

  • 创建一个ConditionNode对象,用来封装等待线程;

  • 调用enableWait()方法,进入条件变量并且释放锁,唤醒在AQS等待队列中的第一个等待线程;

    Details
    java
    private int enableWait(ConditionNode node) {
      	// isHeldExclusively() 判断当前线程是否为持有锁的线程,只有持有锁的线程才能进入条件变量
        if (isHeldExclusively()) {
          	// 设置节点的waiter为当前线程
            node.waiter = Thread.currentThread();
          	// 设置节点的状态 COND|WAITING 的结果为3
            node.setStatusRelaxed(COND | WAITING);
          	// 以下代码是将节点放入条件变量等待队列中
            ConditionNode last = lastWaiter;
            if (last == null)
                firstWaiter = node;
            else
                last.nextWaiter = node;
            lastWaiter = node;
          	// 完全释放锁,有可能重入
            int savedState = getState();
            if (release(savedState))
                return savedState;
        }
        node.status = CANCELLED; // lock not held or inconsistent
        throw new IllegalMonitorStateException();
    }

    假如此时持有锁的线程Thread - 4进入条件变量,此时可以用如下示意图表示:

  • 然后进入while()循环,判断该节点是不是在AQS的CLH队列中,如果不在,则调用block()方法(底层就是LockSupport.park()方法)进入阻塞状态;

    为什么这里有ForkJoinPool.managedBlock(node);?

    假如现在要阻塞的线程是ForkJoinPool线程池中的线程,那么调用ForkJoinPool.managedBlock(node)可以告诉线程池,本线程要阻塞了,你可能需要增加线程来补偿,以免影响整体并行度。

    rejected 变量有什么用?

    如果调用 managedBlockForkJoinPool 实例已经处于非运行状态,例如:

    • 已经被关闭 (shutdown, shutdownNow)
    • 正在终止 (terminating)

    那么,ForkJoinPool 将无法执行其管理阻塞的任务(它不能再创建或激活新的线程来补偿了)。在这种情况下,ForkJoinPool.managedBlock() 方法就会抛出 RejectedExecutionException,表明它拒绝为你管理这个阻塞操作。

    此时通过把rejected设置为true,则在下一次循环中退化为node.block(),即等待条件变量的线程仍然能够使用底层的 LockSupport.park() 机制正确地挂起和等待信号,只是 ForkJoinPool 不会再为其阻塞进行补偿。

至此,该线程(Thread - 4)就进入条件变量中进行等待了,并且线程Thread - 2获得了锁的所有权:

image-20250518160031299

4.3 更多线程进入条件变量

假设现在Thread - 2也进入了条件变量,并且由于不公平锁的原因,Thread - 5抢占了锁,则示意图如下:

image-20250518160612960

4.4 signal():“唤醒”等待队列

注意:这里所说的”唤醒“并不是真正唤醒。

假如现在持有锁的线程调用了signal(),方法如下:

java
public final void signal() {
    // 判断当前线程是否持有锁
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
  	// 获取条件变量中第一个等待的线程
  	ConditionNode first = firstWaiter;
    if (first != null)
      	// 执行“唤醒”操作
        doSignal(first, false);
}

其实signalAll()和上面的逻辑一样,只是调用doSignal()时,第二个参数传的是true

实际工作是在doSignal()中执行的:

java
// first: 第一个节点
// all : 是否“唤醒”全部
private void doSignal(ConditionNode first, boolean all) {
    while (first != null) {
        ConditionNode next = first.nextWaiter;
      	// 将第一个节点出队
      	firstWaiter = next;
        if (firstWaiter == null)
            lastWaiter = null;
      	// 如果当前线程没有被取消
        if ((first.getAndUnsetStatus(COND) & COND) != 0) {
          	// 入队
            enqueue(first);
          	// 如果不是“唤醒”全部,则直接退出循环
            if (!all)
                break;
        }
      	// 接着“唤醒”下一个节点
        first = next;
    }
}

我们接着看enqueue()方法(这是AQS中的方法):

java
final void enqueue(Node node) {
    if (node != null) {
        for (;;) {
            Node t = tail;
            node.setPrevRelaxed(t);        
            if (t == null)                 
                tryInitializeHead();
            else if (casTail(t, node)) {
                t.next = node;
                if (t.status < 0)          
                    LockSupport.unpark(node.waiter);
                break;
            }
        }
    }
}

enqueue()方法,主要就是把条件变量中等待的节点放入CLH队列的尾部。示意图如下:

所以,signal()方法并没有真正唤醒等待线程,而是将其放入CLH队列中,使其可以被前驱节点唤醒来竞争锁。

qneueue()中,节点入队后有下面两行代码:

java
if (t.status < 0)          
  LockSupport.unpark(node.waiter);

这两行代码的作用是,当发现新入队的节点的前驱节点状态小于0(即被取消了),那么就唤醒新入队的线程(即时没轮到该线程竞争锁),使其执行清理CLH队列工作。

4.5 await():进入条件变量等待(下)

当进入过条件变量的线程被唤醒后,注意,此时仍然在ConditionObject.await()方法中,由于此时该节点已经被放入了CLH队列中,所以while(!canReacquire(node))不再满足,跳出循环。

java
private boolean canReacquire(ConditionNode node) {
    return node != null && node.prev != null && isEnqueued(node);
}

接着就清理节点状态(设置为0),并调用acquire()方法,开始竞争锁的争抢:

java
node.clearStatus();
acquire(node, savedState, false, false, false, 0L);

注意,在acquire()中有可能再次调用LockSupport.park()进入阻塞状态。

最后,在其获得到锁(在await()方法中调用的acquire()方法,结束的唯一可能就是获得到锁),判断在条件变量中等待期间是否被中断过,如果中断了,则判断是否被取消过,如果是,则执行条件变量等待队列的清理工作,最后抛出中断异常。

为什么要在获得到锁之后,还判断是否中断过并抛出中断异常?

Condition.await() 方法的完整含义是:原子地释放当前持有的锁,然后等待某个条件满足,并在条件满足后,重新获取之前释放的锁,最后才返回。如果在条件变量等待期间中断过,则最后抛出异常。

5. 总结

  1. 只能把握大的方向,但是一些细节就不能完全理解,一些边界条件(尤其涉及到多线程的情况)无法掌握;
  2. AQS源码中把条件判断和代码执行写在一起,虽然提高了效率,但是降低了代码可读性,对于业务代码来说,还是可读性更重要;

参考资料

[1] 黑马程序员:https://www.bilibili.com/video/BV16J411h7Rd