Appearance
JUC 线程安全集合类
本文主要介绍线程安全集合类。
DANGER
以下源码基于OpenJDK 17。
1. 概述
“线程安全”意味着在多线程环境下,即使多个线程同时对容器进行读写操作,容器的内部状态也能保持一致性和正确性,不会出现数据损坏、丢失或产生不确定的行为(例如,ConcurrentModificationException)。
线程安全的集合类可以分为以下三类:

- 遗留的线程安全集合:如
HashTable、Vector; - 修饰的线程安全集合:主要是
Collections中对普通集合类经过包装后的线程安全集合,包括Collections.SynchronizedList,Collections.SynchronizedMap等; - JUC提供的线程安全集合:其中实现分为三种
- Blocking(内部锁):大部分实现基于锁,会发生阻塞;
- CopyOnWrite(写时复制):当发生修改时,复制一份副本,在副本上修改,修改完成后重新赋值回原始值;
- Concurrent(CAS无锁):内部很多操作使用CAS优化,一般可以提供较高吞吐量,但是有弱一致性问题,例如,当利用迭代器进行遍历时,如果集合发生了修改,迭代器仍然可以继续进行遍历,但这时内容是旧的;
2. 内部锁: LinkedBlockingQueue
使用内部锁来实现线程安全集合,思想是通过加锁来强制对共享资源的互斥访问,避免了数据竞争和不一致性。
Collections.synchronized* 包装器 (Vector, Hashtable 也属于此范畴),以及 BlockingQueue 的实现(如 ArrayBlockingQueue, LinkedBlockingQueue)。
Collections.synchronized*:通过在每个公共方法上使用synchronized关键字(或一个内部锁对象),确保在任何给定时刻,只有一个线程能够执行该集合的任何方法。BlockingQueue:内部使用java.util.concurrent.locks.ReentrantLock(或多个锁)来保护队列的内部状态,并结合Condition变量来实现生产者和消费者的阻塞与唤醒。
我们以LinkedBlockingQueue为例,查看源码讲解原理。
首先查看LinkedBlockingQueue的结构:
java
// 队列容量
private final int capacity;
// 队列中实际元素数量
private final AtomicInteger count = new AtomicInteger();
// 队列头
transient Node<E> head;
// 队列尾
private transient Node<E> last;
// 从队列中取元素时的锁
private final ReentrantLock takeLock = new ReentrantLock();
// 当队列为空时的条件变量,用于阻塞线程
private final Condition notEmpty = takeLock.newCondition();
// 把元素加入队列中的锁
private final ReentrantLock putLock = new ReentrantLock();
// 当队列满时的条件变量,用于阻塞线程
private final Condition notFull = putLock.newCondition();Node节点的数据结构如下:
java
static class Node<E> {
// 实际数据
E item;
// 下一个节点
Node<E> next;
Node(E x) { item = x; }
}当我们调用构造函数时,会指定容量,并创建哨兵节点:
java
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}示意图如下:

当我们往队列内部添加元素时,调用put()方法:
java
public void put(E e) throws InterruptedException {
// 元素校验,不允许null
if (e == null)
throw new NullPointerException();
final int c;
// 把元素包装成Node节点
final Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 加锁
putLock.lockInterruptibly();
try {
// 如果队列已满,则进入条件变量等待
while (count.get() == capacity) {
notFull.await();
}
// 实际元素入队
enqueue(node);
// 队列中的元素数量 +1,c是之前的队列元素数量
c = count.getAndIncrement();
// 本线程添加元素后,队列还有位置,叫醒其他添加元素的线程
if (c + 1 < capacity)
notFull.signal();
} finally {
// 释放锁
putLock.unlock();
}
// 如果c为0,则表示之前队列为空,现在调用put后队列非空,叫醒取元素的线程
if (c == 0)
signalNotEmpty();
}可以看到,调用put()方法会使用putLock进行保护。
相同的,调用take()方法的逻辑类似:
java
public E take() throws InterruptedException {
final E x;
final int c;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
// 加锁
takeLock.lockInterruptibly();
try {
// 如果队列为空,则进入条件变量等待
while (count.get() == 0) {
notEmpty.await();
}
// 从队列中获取元素
x = dequeue();
// 队列中的元素数量 -1,c是之前的队列元素数量
c = count.getAndDecrement();
// 如果队列还有其他元素,则叫醒其他取元素的线程
if (c > 1)
notEmpty.signal();
} finally {
// 释放锁
takeLock.unlock();
}
// 如果c等于队列容量,则表示之前队列已满,
// 现在取一个元素后,表示队列不满,则唤醒其他添加元素的线程
if (c == capacity)
signalNotFull();
return x;
}LinkedBlockingQueue设计的高明之处在于内部使用了两把锁和哨兵节点:
使用了两把锁,使得同一时刻,允许两个线程(生产者和消费者)同时执行,提高了并发度;
使用了哨兵节点,使得两把锁的职责分耦,
putLock只需要保护last指针,takeLock只需要保护head指针;假设现在没有哨兵节点,并且队列中只有一个元素,下面代码是模拟实现(线程非安全):
一个线程t1执行
take()操作,由于队列中有一个元素,所以不会被阻塞(进入条件变量),假设执行到第12行时,校验通过,但是由于时间片用完,进行上下文切换:javapublic E take() throws InterruptedException { E r = null; takeLock.lockInterruptibly(); try { while (count.get() == 0){ takeCondition.await(); } r = head.item; head = head.next; // 队列中只有一个元素,该元素出队后,需要将尾指针置为null if(count.get() == 1){ last = null; } count.decrementAndGet(); }finally { takeLock.unlock(); } return r; }图示如下:

之后另一个线程t2执行
put()操作,由于putLock是另一把锁,并且此时队列未满,所以该线程也不会被阻塞,执行完成:javapublic void put(E item) throws InterruptedException { if (item == null) { throw new NullPointerException(); } Node<E> node = new Node<>(item); putLock.lockInterruptibly(); try { while (count.get() == capacity){ putCondition.await(); } // 队列中没有元素,需要将头尾指针指向新入队的元素 if (head == null){ head = node; last = node; }else{ last.next = node; last = node; } count.getAndIncrement(); }finally { putLock.unlock(); } }示意图如下:

假设之后再切换回线程t1,执行完剩余的逻辑,那么最后结果如下:

结果显然不正确,必然要设计更复杂的机制,保证修改头尾指针的原子性。
当有哨兵节点后,通过设计出队算法,我们可以不修改尾指针;并且入队算法也不会修改头指针。这样就保证了头尾指针互不干涉,可以分别用两把锁来保护:
java// LinkedBlockingQueue的出入队算法 // 出队算法,不修改尾指针 private E dequeue() { Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; } // 入队算法,不修改头指针 private void enqueue(Node<E> node) { last = last.next = node; }
3. 写时复制: CopyOnWriteArrayList
在JUC中,利用写时复制技术实现并发安全的集合类有CopyOnWriteArrayList和CopyOnWriteArraySet,但是CopyOnWriteArraySet是基于CopyOnWriteArrayList的,所以我们以CopyOnWriteArrayList介绍基本原理。
CopyOnWriteArrayList 是 java.util.concurrent 包提供的一个线程安全的 List 实现。它的核心思想是**“写时复制” (Copy-On-Write):当进行写操作(如添加、修改、删除元素)时,它会创建一个新的底层数组**,在新数组上进行修改,然后原子性地将集合的内部引用指向这个新数组。读操作则始终在旧的(不变的)数组上进行,因此不需要加锁。
在CopyOnWriteArrayList中,有以下两个重要结构:
java
// 锁,用来保护写操作
final transient Object lock = new Object();
// 原始数组
private transient volatile Object[] array;当我们进行写操作时(例如add()),会进行加锁操作,并且会创建一个新的数组,在新数组上进行修改,然后将原始数组指向新数组:
java
public boolean add(E e) {
// 加锁
synchronized (lock) {
Object[] es = getArray();
int len = es.length;
// 创建新数组
es = Arrays.copyOf(es, len + 1);
// 在新数组上进行修改
es[len] = e;
// 将原始数组指向新数组
setArray(es);
return true;
}
}当我们进行读操作时,是不会加锁的,而且是读取的原始数组,例如get()方法:
java
public E get(int index) {
return elementAt(getArray(), index);
}
final Object[] getArray() {
return array;
}
static <E> E elementAt(Object[] a, int index) {
return (E) a[index];
}这种机制能带来极高的读并发性能,但是,也有一些问题:
写操作开销大 ,内存占用高:每次写操作(
add(),set(),remove()等)都需要创建并复制整个底层数组。对于包含大量元素的列表,这将导致显著的 CPU 消耗和内存分配开销。即使只修改一个元素,也需要复制整个数组。所以写时复制不适用于写操作频繁的场景。数据一致性弱 / 读操作可能获得旧数据:当进行读操作时,总是在原始数组上进行读取,可能看不到最新的写操作结果,对于某些实时性要求高的应用来说是不可接受的,例如,如果需要读取的是金融交易数据或精确的传感器数据。
例如,线程t1对数组进行遍历,与此同时,线程t2删除了最后一个元素,但线程t1仍然会读取到最后一个元素:
javapublic static void main(String[] args) { // 新建一个CopyOnWriteArrayList,并插入数据 CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList(); copyOnWriteArrayList.add(1); copyOnWriteArrayList.add(2); copyOnWriteArrayList.add(3); copyOnWriteArrayList.add(4); // 线程t1遍历数组 new Thread(()->{ Iterator iterator = copyOnWriteArrayList.iterator(); while (iterator.hasNext()){ Object next = iterator.next(); log.info(next.toString()); Sleeper.sleep(1000); // 睡眠1秒,模拟操作数据 } },"t1").start(); // 线程t2删除最后一个元素 Sleeper.sleep(100); new Thread(()->{ Object removed = copyOnWriteArrayList.remove(copyOnWriteArrayList.size() - 1); log.info("删除最后一个元素: {}", removed); }, "t2").start(); }结果:
txt14:00:00.332 [t1] INFO : 1 14:00:00.433 [t2] INFO : 删除最后一个元素: 4 14:00:01.338 [t1] INFO : 2 14:00:02.343 [t1] INFO : 3 14:00:03.345 [t1] INFO : 4可以看到,线程t1并没有发现线程t2删除了最后一个元素,读取到的是旧数据。
4. CAS无锁: ConcurrentHashMap
4.1 介绍
在JUC中,Concurrent思想实现的线程安全集合类利用CAS优化,降低锁的粒度,进一步提高并发性。
ConcurrentHashMap 是 Java 并发包 java.util.concurrent 中提供的一个线程安全的 HashMap 实现。它的设计目标是在保证线程安全的前提下,尽可能地提高并发性能,尤其是在读操作远多于写操作的场景下。
我们以ConcurrentHashMap为例子,讲解源码。
首先熟悉ConcurrentHashMap中的数据结构及相关逻辑:
ConcurrentHashMap内部是一个Node数组(称为table),每个Node包含hash,key,value和next指针。数组中的每个位置称为一个桶(bin)。最开始,桶中元素以链表结构存在;当链表长度超过一定阈值(默认为 8)时,链表会转换为红黑树 (TreeBin),提高查找效率,称为树化;红黑树的根节点称为TreeBin,继承于
Node,其hash值为特殊值-2,表示该桶存储的是红黑树;当
HashTable中的元素个数超过阈值(默认为数组长度的3/4),会触发扩容逻辑,将Node数组的长度乘以2创建新数组(称为nextTable),并将旧数组(table)中每个桶中的Node节点转移到新数组(nextTable)中,旧数组中每个桶的Node节点转移完成后,会将该桶的节点设置为ForwardingNode(是Node的子类,并且hash值为特殊值-1),最终将旧数组指向新数组(table = nextTable),新数组置空(nextTable = null);所有
Node和TreeBin的字段(如next指针)都声明为volatile,确保可见性;
4.2 get()方法
下面我们来看ConcurrentHashMap的读操作,以get()方法为例(为了代码更容易理解,改写部分逻辑):
java
public V get(Object key) {
Node<K,V>[] tab = table;
int n = tab.length;
// Node数组还没有创建,此时没有元素,直接返回null
if(tab == null || tab.length <= 0)
return null;
// spread()方法是对key的哈希值再进行一次hash,并且保证结果为正数
int h = spread(key.hashCode());
// (n-1)&h 即 h%n,用来获取桶下标,即该元素应该放在哪个桶中
// tabAt(Node<K,V>[] tab, int i) 用来获取下标为i的桶中第一个元素
Node<K,V> e = tabAt(tab, (n - 1) & h);
// 桶中没有元素,直接返回null
if(e == null)
return null;
Node<K,V> p;
int eh = e.hash;
K ek = e.key; // K 是键的类型
if (eh == h) {
// 桶中第一个节点就是要查找的元素
if (ek == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0) {
// 桶中第一个节点的hash值为负数,那么有两种情况:
// hash = -1: 表示该节点是ForwardingNode,即该桶的节点已转移到nextTable中
// hash = -2: 表示该节点是TreeBin,即该桶的节点已树化,需要进行红黑树查找
return (p = e.find(h, key)) != null ? p.val : null;
}
// 进行链表查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
return null;
}在上面第30行中的find()方法,是Node中定义的方法,其实就是普通的链表查找:
java
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}为什么普通Node定义了链表查找方法,在get()方法中还要实现一遍链表查找算法呢?这是为了少一次方法调用,最高效地处理最常见的情况(短链表),直接在 get() 方法中进行内联的链表遍历。
设计精妙之处在于,TreeBin和ForwardingNode这两个子类,重写了find()方法,以实现不同情况下的查找算法:
java
final Node<K,V> find(int h, Object k) {
if (k != null) { // 确保键不为空
// 1. 遍历 TreeBin 内部的“first”链表
// 这个 first 链表是一个“备份”或“回退”机制,在某些特殊情况下使用。
// 例如,在树化过程中,可能存在一些节点尚未完全整合到树结构中。
for (Node<K,V> e = first; e != null; ) { // 从 first 节点开始遍历
int s; K ek;
// 2. 检查 TreeBin 的锁状态(lockState)
// WAITER 表示有写线程在等待,WRITER 表示有写线程正在修改树。
// 如果当前有写操作(WRITER)或写操作正在等待(WAITER),
// 意味着树可能处于不稳定或修改过程中,此时不宜直接遍历树。
if (((s = lockState) & (WAITER|WRITER)) != 0) {
// 3. 如果检测到有写操作或等待写操作,则退化为线性查找 first 链表
// 这是一个安全的回退策略:在树结构可能不一致时,线性查找一个较短的链表更安全。
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e; // 找到则返回
e = e.next; // 继续遍历链表
}
// 4. 如果没有写操作或写等待,尝试获取读锁(乐观读)
else if (U.compareAndSetInt(this, LOCKSTATE, s, s + READER)) {
// U 是 Unsafe 实例,LOCKSTATE 是 TreeBin 的一个字段,表示锁状态
// s 是当前锁状态的快照
// s + READER 是尝试将读者数量增加1(READER 是一个位掩码或常量,表示一个读者)
// 成功执行 CAS 意味着我们获得了读锁,可以安全地遍历树。
TreeNode<K,V> r, p;
try {
// 5. 执行红黑树的查找操作
// root 是 TreeBin 内部实际红黑树的根节点
// r.findTreeNode(h, k, null) 是红黑树的查找算法(O(logN) 复杂度)
p = ((r = root) == null ? null :
r.findTreeNode(h, k, null));
} finally {
// 6. 释放读锁
// 原子地将读锁计数器减去 READER。
// 同时,这里包含了一个优化:如果当前读锁计数器减到只剩下 WAITER 位,
// 并且有等待的写线程(waiter != null),则唤醒写线程。
Thread w;
if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
(READER|WAITER) && (w = waiter) != null)
LockSupport.unpark(w); // 唤醒等待的写线程
}
return p; // 返回树查找的结果
}
// 7. 如果 CAS 失败(lockState 在读取 s 后被其他线程修改了),
// 循环会继续,再次尝试从头开始(或者在某些情况下,会重新从 first 遍历)。
// 这个逻辑在 `for` 循环头部的 `e = first` 和 `e = e.next` 之间,
// 所以如果 CAS 失败,它会再次尝试 `e = first` 并重新开始整个逻辑。
}
}
return null; // 键不存在
}java
Node<K,V> find(int h, Object k) {
// 去新数组nextTable中查找元素
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
// 新数组为null或新数组对应的桶中没有元素,直接返回null
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
// 链表查找算法一部分
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
// 在新数组查找过程中,有其他线程又扩容了,则重新到新数组中查找
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
// 查找红黑树
return e.find(h, k);
}
// 链表查找算法一部分
if ((e = e.next) == null)
return null;
}
}
}4.3 put()方法
我们来看看put()方法的实现,其中涉及到树化、扩容:
java
// key 要存放的值
// value 要存放的值
// onlyIfAbsent 只有原map中不存在对应的key值,才会放入新的value
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 参数校验,不允许存放null键,null值
if (key == null || value == null)
throw new NullPointerException();
// 计算key的哈希值
int hash = spread(key.hashCode());
// binCount表示对应桶中的节点数量,为树化做准备
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f;
int n, i, fh;
K fk;
V fv;
// 如果table为null,则初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果桶为null,则使用CAS直接将键值对放入桶中
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
// 如果桶中第一个节点的哈希值为 MOVED(-2),表示是ForwardingTable,则调用helpTransfer()帮助转移节点
tab = helpTransfer(tab, f);
else if (onlyIfAbsent
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
// 如果桶中第一个节点与要放入的节点key值相同,并且不不允许覆盖,则不放入直接返回已存在的节点值
return fv;
else {
// 开始放入逻辑
V oldVal = null;
// f是桶中第一个元素,对该元素加锁,减少锁的粒度,不影响其他桶的操作,增加了并发度
synchronized (f) {
// 再次判断一下桶中第一个节点就是自己拿到的,有可能在等待锁期间,其他线程进行了树化、扩容操作,导致第一个节点发生了变化,此时需要重新进行put逻辑,即回到for循环开始
if (tabAt(tab, i) == f) {
if (fh >= 0) {
// 第一个节点的hash值大于0,说明是链表结构
// 默认为1,表示要放入的节点
binCount = 1;
// 遍历链表,看是否有key值相同的节点,如果有则替换或什么也不做,并且在遍历过程中,binCount递增
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
// 放到最后
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
else if (f instanceof TreeBin) {
// 第一个节点是TreeBin,表示该桶已进行树化,调用putTreeVal进行放入逻辑
Node<K,V> p;
// 注意,此时binCount为2并且不再改变,不会大于树化阈值(8),不会再次进行树化
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
// 最后判断桶中元素数量
if (binCount != 0) {
// 桶中元素数量大于了树化阈值(8),调用treeify()进行树化
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 最后计算table数组中的元素数量,如果超过阈值,则进行扩容操作
addCount(1L, binCount);
return null;
}treeifyBin()就是将桶中的链表结构转化为红黑树,注意,转化过程仍然会对桶中第一个元素加锁,因此,在树化过程中,不允许对该桶进行写操作。
然后我们来看addCount()逻辑:
在查看addCount()之前,最好先了解LongAdder的设计思想。
java
// x 表示table数组中新增的元素数量
// check 表示是否进行扩容检查,如果check为-1,表示删除操作,不会进行扩容,如果大于等于0,表示传入的是桶中元素数量
private final void addCount(long x, int check) {
CounterCell[] cs;
long b, s;
// baseCount 表示一个基本的table中所有元素数量
// counterCells 表示多个计数的容器,用于避免线程冲突,提高并发性
if ((cs = counterCells) != null ||
!U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell c; long v; int m;
boolean uncontended = true;
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {
// 线程安全地将值加到指定的Cell中
fullAddCount(x, uncontended);
// 在调用 fullAddCount() 之后直接返回,主要是出于效率考虑,因为 fullAddCount() 本身就是一个比较重量级且处理复杂情况的操作
return;
}
// 当一个元素被添加到一个空的桶中时,这意味着这个桶刚刚开始有元素。局部上,这个桶的密度很低。此时,整个 ConcurrentHashMap 达到容量阈值的可能性相对较小。为了避免每次向空桶添加元素时都执行昂贵的 sumCount() 和扩容检查,ConcurrentHashMap 选择在此处提前返回。它认为这种情况下检查扩容的收益很低,不值得付出额外的计算开销。
if (check <= 1)
return;
// 计算
s = sumCount();
}
// 总之,经过上面的if代码块后,s表示当前数组中元素总数量
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// sizeCtl 正常情况下表示扩容阈值,为数组table大小的3/4,例如,table的长度为16,那么sizeCtl为12
// 当 s >= sizeCtl时,会触发扩容机制
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 在继续下面的逻辑前,请看下面备注信息
int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;
if (sc < 0) {
// sizeCtl 小于0,则已经有线程在进行扩容了
if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
(nt = nextTable) == null || transferIndex <= 0)
// 来帮助扩容的线程已经到达最大值、扩容已接近尾声或新数组还没创建出来,直接退出
break;
// 将sizeCtl的值+1,就是低位+1,表示当前线程参与扩容
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2))
// sizeCtl大于等于0,此时还没有线程开始扩容,将sizeCtl设置为特殊值,并开始扩容
transfer(tab, null);
s = sumCount();
}
}
}备注信息:sizeCtl 在扩容中的状态
当 ConcurrentHashMap 进行扩容时,sizeCtl 会被设置为一个负数,这个负数编码了两个信息:
- 扩容戳 (Resize Stamp):
rs = resizeStamp(n) << RESIZE_STAMP_SHIFT。这是一个基于旧表长度n生成的唯一标识符,用于标识当前的扩容操作。 - 正在进行扩容的线程数:
sizeCtl的低位表示有多少个线程正在参与transfer()(数据转移) 操作。
4.4 transfer()方法
这是主要的扩容方法,其中涉及到新表创建、节点转移、线程安全控制等。
transfer()的思想大致如下:将旧数组按索引分段,然后每个线程领取一段,处理段中每个桶中的节点,处理完成后,将旧数组中该桶第一个节点置为ForwardingNode表示该桶已完成转移。当处理完该段后,再去领下一段,直到所有的段都被领完了,那么该线程退出。退出之前,判断该线程是不是最后一个退出的,如果是,那么需要重新检查一遍,看看旧数组中的每个桶是否都处理完成了。
transfer()中的大致流程如下:
首先判断新数组
nextTable是否已创建,如果没创建,那么就创建新数组,新数组的长度为旧数组长度的2倍,并且将transferIndex设置为旧数组的长度。如果新数组没有创建,那么其他线程不会进入到transfer()方法(在上面addCount()中的40-43行代码有所体现)transferIndex 为段的起始位置,默认从尾到头领取,段长又称为步长,在transfer()中以stride变量表示
当新数组创建完成后,其他线程也可以进入
transfer()方法,此时线程利用CAS领取一段,领取成功即表示将transferIndex的值减去段长;在transfer()中,假设我们现在的段长为4,旧数组长度为16
nextIndex 表示段的结束位置,例如16;nextBound表示段的开始位置,例如12;处理的段应该是[12, 16),即包括索引为12的位置,不包括索引为16的位置;
i 表示当前处理的桶索引,从后往前处理,即 i 的初始值为nextIndex -1 ,结束值为 nextBound ;
线程领取一段后,就处理该段中的桶,按照情况处理
- 如果桶为
null,直接将ForwardingNode节点设置为该桶第一个元素,然后继续处理下一个桶; - 如果桶第一个元素就是
ForwardingNode节点,表示该桶已被处理过,跳过该桶,继续处理下一个桶; - 其他情况,表示该桶存储的是正常的
Node数据节点,则对桶中第一个元素加锁:- 如果桶中是红黑树,则进行红黑树的转移;
- 如果桶中是链表,则进行链表的转移
- 如果桶为
线程处理完一段后,会再试图领取下一段,如果此时已没有段可领,则将
i置为-1,当i等于-1时,表示旧数组已处理完毕,可以退出。但是,在退出之前,判断自己是不是最后一个退出的线程,如果是,则要负责做最后的坚持,即重新遍历一遍旧数组。
代码如下:
java
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 计算步长(段长)
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
// 新数组还没创建,则先初始化新数组
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// 将transferIndex设置为旧数组长度,用于分配段
transferIndex = n;
}
int nextn = nextTab.length;
// ForwardingNode 节点
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true; // 是否要继续处理
boolean finishing = false; // 用于最后一个线程清扫工作
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 下面的while循环用于获取段长,并且改变要处理的桶索引
while (advance) {
int nextIndex, nextBound;
// --i 后桶索引仍然在领取的段内或者有线程正在结束,则跳出while循环
// finishing 为true表示该线程正在结束扩容流程,即使bound不为0,--i >= bound || finishing 整个表达式也会为true,最终导致 i 递减到0,从而可以检查整个旧数组
if (--i >= bound || finishing)
advance = false;
// 没有段可领
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 尝试领取一个新段
else if (U.compareAndSetInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 如果没有段可领,表示旧数组处理完成,则进入退出流程
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 正在退出,则将旧数组指针指向新数组,并且更新sizeCtl的值为新的扩容阈值
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 这解释了第一个进行扩容的线程为什么是将 sizeCtl设置为 rs+2 了
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 设置该线程是最后一个退出扩容的线程,要进行检查、清理工作
finishing = advance = true;
// 将 i 设置为旧数组长度,从尾到头进行检查
i = n;
}
}
else if ((f = tabAt(tab, i)) == null)
// 桶中为null,直接将第一个元素置为fwd节点
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
// 该桶已处理过
advance = true; // already processed
else {
// 该桶没处理过,加锁处理
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 桶中是链表
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
// 桶中是红黑树
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
}
}
}关于链表和红黑树的转移,此处不再详细讲解。