Appearance
JUC AQS
本文进行AQS的源码解读。
注意
以下源码基于OpenJDK 17,为了方便理解,会做相关改动。
1. AQS是什么
AQS是AbstractQueuedSynchronizer类的简称,这是位于java.util.concurrent.locks包下的抽象类,AQS 提供了一个框架,用于实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关的同步器(信号量、事件等),子类通过继承 AQS 并重写特定的方法来管理同步状态以及线程的获取和释放。
简单来说,AQS是在Java层面上实现的Monitor机制。
2. AQS中的重要字段
在AQS中有一些重要字段,需要我们提前熟悉:
同步状态
state:状态字段,这是一个volatile int变量,用来表示同步状态。不同的同步器对state的使用方式不同,例如ReentrantLock用它来表示重入次数,Semaphore用它来表示可用许可数量,CountDownLatch用它来表示计数器的值。CLH 队列(Craig, Landin, and Hagersten queue): AQS 使用一个变种的 CLH 双向链表作为等待队列,用于管理所有等待获取同步状态(锁)的线程。当线程尝试获取同步状态(锁)失败时,会被封装成一个
Node节点并加入到队列中。CLH队列就像Monitor对象中的
EntryList。在AQS中,CLH队列表示为头尾指针,
Node是AQS的内部类。java// 头尾指针,用来表示等待队列 private transient volatile Node head; private transient volatile Node tail;其中
Node结构如下:javaabstract static class Node { volatile Node prev; // 前驱节点 volatile Node next; // 后继节点 Thread waiter; // 线程 volatile int status; // Node状态 }由于
Node是抽象类,所以根据是否独占,又分为两个子类,均定义为AQS内部类:javastatic final class ExclusiveNode extends Node { } static final class SharedNode extends Node { }Node的状态以常量定义在AQS中,如下:javastatic final int WAITING = 1; // must be 1 static final int CANCELLED = 0x80000000; // must be negative static final int COND = 2;我们可以理解为三个状态:负数、1和2。
exclusiveOwnerThread:当前锁的持有线程,类似于Monitor中的Owner,这是定义在AQS父类中的字段:javapublic abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 3737899427754241961L; protected AbstractOwnableSynchronizer() { } private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }条件变量
ConditionObject:这是AQS的内部类,用来定义条件变量,类似于Monitor中的WaitSet,不同的是,AQS支持多个条件变量。
通过以上的分析,我们可以发现,AQS的结构与Monitor结构非常相似。
3. 源码分析
由于ReentrantLock是以AQS为基础的,所以我们以ReentrantLock为入口,分析AQS的源码。
3.1 ReentrantLock结构
在RenentrantLock中,定义了一个内部抽象类Sync,Sync类继承了AQS,并且有两个子类:NonfairSync和FairSync。结构如下:

在ReentrantLock中,定义了一个Sync字段:
java
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
private final Sync sync;
}RenentrantLock的同步方法都是调用sync的方法,其实就是基于AQS的。
下面我们根据加解锁流程来了解AQS的工作原理。
3.2 lock():初始加锁流程
首先我们调用ReentrantLock中的lock()方法,其内部调用了Sync的lock()方法:
java
public void lock() {
sync.lock();
}Sync的lock()方法如下:
- 首先调用
initialTryLock(),用于初始获取锁,这是一个抽象方法; - 如果初始获取锁失败,则调用
acquire()方法,这是AQS提供的方法;
java
final void lock() {
if (!initialTryLock())
acquire(1);
}我们先关注于非公平锁的initialTryLock()实现:
java
final boolean initialTryLock() {
// 获取当前线程
Thread current = Thread.currentThread();
// 使用CAS,直接将AQS中的state从0改为1,试图获取锁
if (compareAndSetState(0, 1)) {
// 如果修改状态成功,表示获取锁成功,则直接设置锁的拥有线程,
setExclusiveOwnerThread(current);
return true;
} else if (getExclusiveOwnerThread() == current) {
// 获取锁失败,则判断当前锁的持有者线程是不是当前线程,
// 如果是,则表示重入,则设置state的值+1,表示重入次数
int c = getState() + 1;
if (c < 0)
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
} else
// 获取锁失败
return false;
}假设现在Thread-0调用lock()方法加锁,由于没有竞争,调用initialTryLock()时直接将AQS中的state从0改为1,获取锁成功:

3.3 acquire():竞争加锁失败
假设现在Thread - 1在Thread - 0持有锁期间,也调用了同一把锁的lock()方法,执行initialTryLock()返回失败,则进入acquire()方法:
java
// arg 传入的是1
public final void acquire(int arg) {
if (!tryAcquire(arg))
acquire(null, arg, false, false, false, 0L);
}此时会先尝试调用tryAcquire()方法尝试获取锁,这个方法在AQS 中没有实现,需要子类实现,NonfairSync中该方法的实现如下:
java
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}在NonfairSync中tryAcquire()的实现很简单,就是判断AQS中的state是否为0,如果为0则使用CAS修改状态,获取锁成功后设置锁的持有者为当前线程。
很显然,由于Thread - 0还没有释放锁,所以这个方法调用失败,接着调用acquire()方法:
java
acquire(null, arg, false, false, false, 0L);Details
java
final int acquire(Node node,
int arg,
boolean shared,
boolean interruptible,
boolean timed,
long time) {
// 当前线程
Thread current = Thread.currentThread();
// 自旋重试次数
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
// 是否被中断
boolean interrupted = false;
// 是否为等待队列中第一个节点
boolean first = false;
// 前驱节点
Node pred = null; // predecessor of node when enqueued
for (;;) {
pred = null;
if(node != null){
pred = node.prev;
}
first = head == pred;
if (!first && pred != null) {
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
if (first || pred == null) {
boolean acquired;
try {
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1;
}
}
if (node == null) { // allocate; retry before enqueue
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
} else if (pred == null) { // try to enqueue
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null)
tryInitializeHead();
else if (!casTail(t, node))
node.setPrevRelaxed(null); // back out
else
t.next = node;
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) {
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
return cancelAcquire(node, interrupted, interruptible);
}先解释这几个方法参数的含义:
node:当前尝试获取的线程对应的 Node 节点。如果线程是第一次尝试获取失败,node可能为 null,会在后续创建。arg: 尝试获取的资源数量。shared: 是否是共享模式获取。interruptible: 获取过程是否允许中断。timed: 获取过程是否有超时时间(对应tryLock(long, TimeUnit))。time: 如果timed为true,表示获取的截止时间(纳秒)。
然后进入for(;;)循环中:
第一次循环:创建
Node节点,由于是独占的(方法参数shared传的false),所以执行node = new ExclusiveNode();(上面第63行代码);第二次循环:试图将node节点放入等待队列(满足上面第64行代码),在第二次循环中,由于头尾指针没有初始化(都为null),所以会先调用
tryInitializeHead()初始化头尾指针,代码如下:javaprivate void tryInitializeHead() { Node h = new ExclusiveNode(); // this 表示AQS对象 if (U.compareAndSetReference(this, HEAD, null, h)) tail = h; }结果示例图如下:

CLH队列中的第一个节点称为Dummy(哑元)或哨兵,用来占位,并不关联线程。
第三次循环:还是试图将node节点放入等待队列(满足上面第64行代码),最终完成入队,示意图如下:

这里有点难以理解,因为第三行代码中的casTail()会执行返回true,然后取反不满足if条件,接着执行最后的else逻辑:
javaif (t == null) tryInitializeHead(); else if (!casTail(t, node)) node.setPrevRelaxed(null); // back out else t.next = node;第四次循环:将node节点的状态设置为
WAITING状态:
第五次循环:设置自旋次数为1(
spins = postSpins = (byte)((postSpins << 1) | 1);),然后进入等待状态:javaif (!timed) LockSupport.park(this);至此,
Thread - 1进入等待状态,下图以灰色表示等待状态:
spins 方法局部变量,是线程私有状态,不是Node节点状态
3.4 再看acquire()方法
在上面一小节中,我们主要关注node节点的入队流程(第59行代码之后),我们再看看第59行代码之前做了什么事。
首先在循环开始之前,都会执行以下代码:
java
pred = null;
if(node != null){
pred = node.prev;
}
first = head == pred;判断当前节点是否为队列中第一个的节点(排除哨兵节点),以及当前节点的前驱节点。
如果当前节点是队列中第一个节点或前驱节点为空,则满足第34行中的if条件。
- 这个
if代码块中主要就是去获取锁,因此,在上一小节的5次循环中,都会试图获取锁,即在进入等待状态前,不放过每一次获取锁的机会,这也算另一种程度的自旋优化吧。
如果当前节点不是队列中第一个节点,并且当前节点的前驱节点不为空,则满足第25行中的if条件。这个if代码块中主要完成一件事:清理已取消的节点。
if (pred.status < 0): 检查前驱节点的状态。如果状态小于 0(例如CANCELLED),说明前驱节点取消了等待。这时会调用cleanQueue()方法清理队列中的已取消节点。然后continue跳到下一次循环,重新检查队列状态,尝试成为新的第一个等待节点。else if (pred.prev == null): 这个分支处理一种特殊情况,可能是在cleanQueue()后前驱的prev还没完全设置好,或者 Head 节点刚刚初始化。Thread.onSpinWait()是一个 JVM/CPU 级别的优化提示。continue重试。
总结acquire()方法,主要是将node节点入队,在进入等待状态前判断是否为第一个等待节点,如果是则进行自旋优化获取锁,如果不是则进行清理等待队列工作。
假设现在Thread - 2 和 Thread - 3都因为竞争锁失败进入等待队列,示意图如下:

3.5 unlock():释放锁流程
此时Thread - 0调用unlock()释放锁,ReentrantLock中unlock()代码如下:
java
public void unlock() {
sync.release(1);
}Sync中release()代码如下:
java
public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}tryRelease()在AQS中未实现,在Sync中实现如下:
java
protected final boolean tryRelease(int releases) {
// c 表示释放锁之后state的值
int c = getState() - releases;
// 只有锁的持有者线程才能释放锁
// 如果试图释放锁的线程和持有锁的线程不同,则抛出异常
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
// 如果c为0,则表示完全释放了锁,则锁的持有者线程置为null
boolean free = (c == 0);
if (free)
setExclusiveOwnerThread(null);
// 设置锁的状态
setState(c);
// 只有完全释放了锁才返回true
return free;
}当完全释放锁之后,此时示意图如下:

此时调用signalNext(head)唤醒等待队列中的线程:
java
// h是头队列
private static void signalNext(Node h) {
Node s = null;
if(h != null){
s = h.next;
}
if (s != null && s.status != 0) {
// 将节点状态取反
s.getAndUnsetStatus(WAITING);
// 唤醒线程
LockSupport.unpark(s.waiter);
}
}当线程(Thread - 1)被唤醒后,还在acquire()的无限循环中,此时主要完成以下工作:
清除节点状态:
node.clearStatus(),由于唤醒时将节点状态置为了WAITING的取反状态,此时应该恢复为0;判断是否被中断:如果当前线程是可中断的等待,并且在等待过程中被打断,则直接退出循环,取消获取锁。
注意,如果线程调用
LockSupport.park()等待过程中,其他线程调用interrupt()方法打断等待中的线程,等待线程会即时恢复运行,所以这个工作主要是判断等待线程是否由其他线程打断,如果等待线程是被释放锁的线程唤醒的,那么基本不可能进入该段逻辑(即退出循环)。
然后进入下一次循环,成功获取到锁,并且调整CLH队列,将原先的哨兵节点抛弃,将当前节点置为新的哨兵节点:

3.6 不公平锁与公平锁的原理、自旋优化流程
我们先看看不公平锁的实现,假设现在Thread - 1线程释放了锁,此时应该唤醒Thread - 2线程,但是此时刚好又有Thread - 4线程来竞争锁,根据前面的流程,Thread - 4调用lock()方法时就会调用initialTryLock()方法获取锁,而Thread - 2被唤醒后还要执行一些检查工作,因此Thread - 4成功获取到锁,而Thread - 2执行tryAcquire()时获取锁失败,此时会进入自旋优化流程:
java
else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
}
// Thread.onSpinWait()是一个空方法
public static void onSpinWait() {}自旋完成后,如果仍然没有获取到锁,则进入等待前,重新设置自旋优化次数:
java
// postSpins 之前是1,那么重新计算后,
// postSpins 和 spins 变为3
spins = postSpins = (byte)((postSpins << 1) | 1);示意图如下:

假如我们使用公平锁FairSync,那么我们来看看公平锁获取锁的流程:
java
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final boolean initialTryLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedThreads() &&
compareAndSetState(0, 1)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (getExclusiveOwnerThread() == current) {
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
return false;
}
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 &&
!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}可以看到,FairSync中的initialTryLock()和tryAcquire()方法中,在获取锁的判断中增加了!hasQueuedThreads()方法,这个方法用来判断等待队列中是否有等待的线程:
- 当等待队列为空,或者等待队列中第一个节点线程是当前线程,则返回
false,表示没有比当前线程等待时间更久的线程了; - 反之则返回
true;
java
public final boolean hasQueuedPredecessors() {
Thread first = null;
Node h = head;
Node s = null;
if(h != null){
s = h.next;
first = s.waiter;
}
if (h != null && (s == null || first == null || s.prev == null))
first = getFirstQueuedThread(); // retry via getFirstQueuedThread
return first != null && first != Thread.currentThread();
}所以公平锁的原理就是在获取锁的时候,判断等待队列是否有比当前线程等待更久的线程,如果有,则入队等待。
3.7 可打断原理
如果我们在获取锁时,调用的是可打断的方法,则源码如下:
java
public void lockInterruptibly() throws InterruptedException {
sync.lockInterruptibly();
}则会调用Sync中的lockInterruptibly()方法:
java
final void lockInterruptibly() throws InterruptedException {
// 首先判断一下线程是否被打断,如果是,直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 首先调用初始化获取锁方法,获取失败,调用AQS的acquireInterruptibly(1)方法
if (!initialTryLock())
acquireInterruptibly(1);
}在AQS的acquireInterruptibly()中,源码如下:
java
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted() ||
(!tryAcquire(arg) && acquire(null, arg, false, true, false, 0L) < 0))
throw new InterruptedException();
}最终仍然调用acqire()方法,只不过此时interruptible参数传的是true,在acquire()方法中,如果线程在LockSupport.park()等待期间被打断,则会判断是否被打断,如果是并且允许被打断,则直接退出循环,执行取消获取锁流程:cancelAcquire:
java
if ((interrupted |= Thread.interrupted()) && interruptible)
break;3.8 限时等待原理
当我们调用带时间的tryLock()时,会将等待时间转换为纳秒,然后调用Sync类中的tryLockNanos()方法:
java
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryLockNanos(unit.toNanos(timeout));
}java
final boolean tryLockNanos(long nanos) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return initialTryLock() || tryAcquireNanos(1, nanos);
}最终调用AQS中的tryAcquireNanos()方法:
java
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 线程没有被打断
if (!Thread.interrupted()) {
// 第一次获取锁成功,则直接返回
if (tryAcquire(arg))
return true;
// 方法参数校验
if (nanosTimeout <= 0L)
return false;
// interruptible:true,可打断
// timed: true,限时等待
// time: System.nanoTime() + nanosTimeout,当前时间的纳秒值+需等待的纳秒值,
// time表示等待截止时间
int stat = acquire(null, arg, false, true, true,
System.nanoTime() + nanosTimeout);
if (stat > 0)
return true;
if (stat == 0)
return false;
}
throw new InterruptedException();
}在acquire()方法中,如果是限时等待,则计算每次需要等待的时间nanos,计算逻辑就是等待截止时间减去当前时间,如果小于0,则表示已等待足够时间,直接退出循环。
java
if (!timed)
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;4. 条件变量实现原理
下面我们来看看AQS中如何实现条件变量。
4.1 创建条件变量
在ReentrantLock中,创建条件变量调用的是newCondition()方法:
java
public Condition newCondition() {
return sync.newCondition();
}实际调用的是Sync的newCondition()方法:
java
final ConditionObject newCondition() {
return new ConditionObject();
}其实就是创建了一个ConditionObject对象,这是定义在AQS中的内部类。该类也定义了一个等待队列,通过首尾指针:
java
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient ConditionNode firstWaiter;
/** Last node of condition queue. */
private transient ConditionNode lastWaiter;
}ConditionNode是Node的子类,其中多定义了一个nextWaiter字段,指向条件变量等待队列中下一个节点:
java
static final class ConditionNode extends Node
implements ForkJoinPool.ManagedBlocker {
ConditionNode nextWaiter; // 指向下一个节点
public final boolean isReleasable() {
return status <= 1 || Thread.currentThread().isInterrupted();
}
public final boolean block() {
while (!isReleasable())
LockSupport.park();
return true;
}
}注意到 ConditionNode 实现了 ForkJoinPool.ManagedBlocker 接口,这是关键点。
当一个 ForkJoinPool 的工作线程遇到阻塞操作时,如果这个阻塞操作实现了 ManagedBlocker 接口并通过 ForkJoinPool.managedBlock(ManagedBlocker blocker) 方法执行,ForkJoinPool 可以感知到这个阻塞,并可能为了维持足够的并行度而创建或激活额外的线程。
ManagedBlocker 接口有两个方法:
boolean isReleasable(): 检查当前阻塞是否可以解除(即是否可以停止阻塞)。在上面的实现中,当节点状态小于等于1(即为
WAITING或CANCELLED状态)时、或者线程被打断时,表示可以解除阻塞状态。boolean block(): 执行实际的阻塞操作。如果当前线程不能解除阻塞状态,则执行
LockSupport.park()进入等待状态。
4.2 await():进入条件变量等待(上)
如果某个线程因为不满足执行条件,需要释放锁进入条件变量时,会调用await()方法:
Details
java
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
ConditionNode node = new ConditionNode();
int savedState = enableWait(node);
boolean interrupted = false, cancelled = false, rejected = false;
while (!canReacquire(node)) {
if (interrupted |= Thread.interrupted()) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break; // else interrupted after signal
} else if ((node.status & COND) != 0) {
try {
if (rejected)
node.block();
else
ForkJoinPool.managedBlock(node);
} catch (RejectedExecutionException ex) {
rejected = true;
} catch (InterruptedException ie) {
interrupted = true;
}
} else
Thread.onSpinWait(); // awoke while enqueuing
}
node.clearStatus();
acquire(node, savedState, false, false, false, 0L);
if (interrupted) {
if (cancelled) {
unlinkCancelledWaiters(node);
throw new InterruptedException();
}
Thread.currentThread().interrupt();
}
}上面的逻辑分为前后两部分:进入条件变量等待、被唤醒后。
这里只讲解进入条件变量等待部分,主要完成以下工作:
创建一个
ConditionNode对象,用来封装等待线程;调用
enableWait()方法,进入条件变量并且释放锁,唤醒在AQS等待队列中的第一个等待线程;Details
javaprivate int enableWait(ConditionNode node) { // isHeldExclusively() 判断当前线程是否为持有锁的线程,只有持有锁的线程才能进入条件变量 if (isHeldExclusively()) { // 设置节点的waiter为当前线程 node.waiter = Thread.currentThread(); // 设置节点的状态 COND|WAITING 的结果为3 node.setStatusRelaxed(COND | WAITING); // 以下代码是将节点放入条件变量等待队列中 ConditionNode last = lastWaiter; if (last == null) firstWaiter = node; else last.nextWaiter = node; lastWaiter = node; // 完全释放锁,有可能重入 int savedState = getState(); if (release(savedState)) return savedState; } node.status = CANCELLED; // lock not held or inconsistent throw new IllegalMonitorStateException(); }假如此时持有锁的线程
Thread - 4进入条件变量,此时可以用如下示意图表示:
然后进入
while()循环,判断该节点是不是在AQS的CLH队列中,如果不在,则调用block()方法(底层就是LockSupport.park()方法)进入阻塞状态;为什么这里有ForkJoinPool.managedBlock(node);?
假如现在要阻塞的线程是ForkJoinPool线程池中的线程,那么调用ForkJoinPool.managedBlock(node)可以告诉线程池,本线程要阻塞了,你可能需要增加线程来补偿,以免影响整体并行度。
rejected 变量有什么用?
如果调用
managedBlock的ForkJoinPool实例已经处于非运行状态,例如:- 已经被关闭 (shutdown, shutdownNow)
- 正在终止 (terminating)
那么,
ForkJoinPool将无法执行其管理阻塞的任务(它不能再创建或激活新的线程来补偿了)。在这种情况下,ForkJoinPool.managedBlock()方法就会抛出RejectedExecutionException,表明它拒绝为你管理这个阻塞操作。此时通过把rejected设置为true,则在下一次循环中退化为
node.block(),即等待条件变量的线程仍然能够使用底层的LockSupport.park()机制正确地挂起和等待信号,只是ForkJoinPool不会再为其阻塞进行补偿。
至此,该线程(Thread - 4)就进入条件变量中进行等待了,并且线程Thread - 2获得了锁的所有权:

4.3 更多线程进入条件变量
假设现在Thread - 2也进入了条件变量,并且由于不公平锁的原因,Thread - 5抢占了锁,则示意图如下:

4.4 signal():“唤醒”等待队列
注意:这里所说的”唤醒“并不是真正唤醒。
假如现在持有锁的线程调用了signal(),方法如下:
java
public final void signal() {
// 判断当前线程是否持有锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取条件变量中第一个等待的线程
ConditionNode first = firstWaiter;
if (first != null)
// 执行“唤醒”操作
doSignal(first, false);
}其实signalAll()和上面的逻辑一样,只是调用doSignal()时,第二个参数传的是true。
实际工作是在doSignal()中执行的:
java
// first: 第一个节点
// all : 是否“唤醒”全部
private void doSignal(ConditionNode first, boolean all) {
while (first != null) {
ConditionNode next = first.nextWaiter;
// 将第一个节点出队
firstWaiter = next;
if (firstWaiter == null)
lastWaiter = null;
// 如果当前线程没有被取消
if ((first.getAndUnsetStatus(COND) & COND) != 0) {
// 入队
enqueue(first);
// 如果不是“唤醒”全部,则直接退出循环
if (!all)
break;
}
// 接着“唤醒”下一个节点
first = next;
}
}我们接着看enqueue()方法(这是AQS中的方法):
java
final void enqueue(Node node) {
if (node != null) {
for (;;) {
Node t = tail;
node.setPrevRelaxed(t);
if (t == null)
tryInitializeHead();
else if (casTail(t, node)) {
t.next = node;
if (t.status < 0)
LockSupport.unpark(node.waiter);
break;
}
}
}
}enqueue()方法,主要就是把条件变量中等待的节点放入CLH队列的尾部。示意图如下:

所以,signal()方法并没有真正唤醒等待线程,而是将其放入CLH队列中,使其可以被前驱节点唤醒来竞争锁。
在qneueue()中,节点入队后有下面两行代码:
java
if (t.status < 0)
LockSupport.unpark(node.waiter);这两行代码的作用是,当发现新入队的节点的前驱节点状态小于0(即被取消了),那么就唤醒新入队的线程(即时没轮到该线程竞争锁),使其执行清理CLH队列工作。
4.5 await():进入条件变量等待(下)
当进入过条件变量的线程被唤醒后,注意,此时仍然在ConditionObject.await()方法中,由于此时该节点已经被放入了CLH队列中,所以while(!canReacquire(node))不再满足,跳出循环。
java
private boolean canReacquire(ConditionNode node) {
return node != null && node.prev != null && isEnqueued(node);
}接着就清理节点状态(设置为0),并调用acquire()方法,开始竞争锁的争抢:
java
node.clearStatus();
acquire(node, savedState, false, false, false, 0L);注意,在acquire()中有可能再次调用LockSupport.park()进入阻塞状态。
最后,在其获得到锁(在await()方法中调用的acquire()方法,结束的唯一可能就是获得到锁),判断在条件变量中等待期间是否被中断过,如果中断了,则判断是否被取消过,如果是,则执行条件变量等待队列的清理工作,最后抛出中断异常。
为什么要在获得到锁之后,还判断是否中断过并抛出中断异常?
Condition.await() 方法的完整含义是:原子地释放当前持有的锁,然后等待某个条件满足,并在条件满足后,重新获取之前释放的锁,最后才返回。如果在条件变量等待期间中断过,则最后抛出异常。
5. 总结
- 只能把握大的方向,但是一些细节就不能完全理解,一些边界条件(尤其涉及到多线程的情况)无法掌握;
- AQS源码中把条件判断和代码执行写在一起,虽然提高了效率,但是降低了代码可读性,对于业务代码来说,还是可读性更重要;