Skip to content

JUC 线程安全集合类

本文主要介绍线程安全集合类。

DANGER

以下源码基于OpenJDK 17。

1. 概述

“线程安全”意味着在多线程环境下,即使多个线程同时对容器进行读写操作,容器的内部状态也能保持一致性和正确性,不会出现数据损坏、丢失或产生不确定的行为(例如,ConcurrentModificationException)。

线程安全的集合类可以分为以下三类:

image-20250522105742737

  • 遗留的线程安全集合:如HashTableVector
  • 修饰的线程安全集合:主要是Collections中对普通集合类经过包装后的线程安全集合,包括Collections.SynchronizedListCollections.SynchronizedMap等;
  • JUC提供的线程安全集合:其中实现分为三种
    • Blocking(内部锁):大部分实现基于锁,会发生阻塞;
    • CopyOnWrite(写时复制):当发生修改时,复制一份副本,在副本上修改,修改完成后重新赋值回原始值;
    • Concurrent(CAS无锁):内部很多操作使用CAS优化,一般可以提供较高吞吐量,但是有弱一致性问题,例如,当利用迭代器进行遍历时,如果集合发生了修改,迭代器仍然可以继续进行遍历,但这时内容是旧的;

2. 内部锁: LinkedBlockingQueue

使用内部锁来实现线程安全集合,思想是通过加锁来强制对共享资源的互斥访问,避免了数据竞争和不一致性。

Collections.synchronized* 包装器 (Vector, Hashtable 也属于此范畴),以及 BlockingQueue 的实现(如 ArrayBlockingQueue, LinkedBlockingQueue)。

  • Collections.synchronized*:通过在每个公共方法上使用 synchronized 关键字(或一个内部锁对象),确保在任何给定时刻,只有一个线程能够执行该集合的任何方法。
  • BlockingQueue:内部使用 java.util.concurrent.locks.ReentrantLock(或多个锁)来保护队列的内部状态,并结合 Condition 变量来实现生产者和消费者的阻塞与唤醒。

我们以LinkedBlockingQueue为例,查看源码讲解原理。

首先查看LinkedBlockingQueue的结构:

java
// 队列容量
private final int capacity;
// 队列中实际元素数量
private final AtomicInteger count = new AtomicInteger();
// 队列头
transient Node<E> head;
// 队列尾
private transient Node<E> last;
// 从队列中取元素时的锁
private final ReentrantLock takeLock = new ReentrantLock();
// 当队列为空时的条件变量,用于阻塞线程
private final Condition notEmpty = takeLock.newCondition();
// 把元素加入队列中的锁
private final ReentrantLock putLock = new ReentrantLock();
// 当队列满时的条件变量,用于阻塞线程
private final Condition notFull = putLock.newCondition();

Node节点的数据结构如下:

java
static class Node<E> {
  	// 实际数据
    E item;
		// 下一个节点
    Node<E> next;

    Node(E x) { item = x; }
}

当我们调用构造函数时,会指定容量,并创建哨兵节点:

java
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) 
      throw new IllegalArgumentException();
  
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

示意图如下:

image-20250522113018807

当我们往队列内部添加元素时,调用put()方法:

java
public void put(E e) throws InterruptedException {
  	// 元素校验,不允许null
    if (e == null) 
      throw new NullPointerException();
  
    final int c;
  	// 把元素包装成Node节点
    final Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
  	
  	// 加锁
    putLock.lockInterruptibly();
    try {
      	// 如果队列已满,则进入条件变量等待
        while (count.get() == capacity) {
            notFull.await();
        }
      	// 实际元素入队
        enqueue(node);
      	// 队列中的元素数量 +1,c是之前的队列元素数量
        c = count.getAndIncrement();
      	// 本线程添加元素后,队列还有位置,叫醒其他添加元素的线程
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
      	// 释放锁
        putLock.unlock();
    }
  	// 如果c为0,则表示之前队列为空,现在调用put后队列非空,叫醒取元素的线程
    if (c == 0)
        signalNotEmpty();
}

可以看到,调用put()方法会使用putLock进行保护。

相同的,调用take()方法的逻辑类似:

java
public E take() throws InterruptedException {
    final E x;
    final int c;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
  	// 加锁
    takeLock.lockInterruptibly();
    try {
      	// 如果队列为空,则进入条件变量等待
        while (count.get() == 0) {
            notEmpty.await();
        }
      	// 从队列中获取元素
        x = dequeue();
      	// 队列中的元素数量 -1,c是之前的队列元素数量
        c = count.getAndDecrement();
      	// 如果队列还有其他元素,则叫醒其他取元素的线程
        if (c > 1)
            notEmpty.signal();
    } finally {
      	// 释放锁
        takeLock.unlock();
    }
  	// 如果c等于队列容量,则表示之前队列已满,
  	// 现在取一个元素后,表示队列不满,则唤醒其他添加元素的线程
    if (c == capacity)
        signalNotFull();
    return x;
}

LinkedBlockingQueue设计的高明之处在于内部使用了两把锁和哨兵节点:

  • 使用了两把锁,使得同一时刻,允许两个线程(生产者和消费者)同时执行,提高了并发度;

  • 使用了哨兵节点,使得两把锁的职责分耦,putLock只需要保护last指针,takeLock只需要保护head指针;

    假设现在没有哨兵节点,并且队列中只有一个元素,下面代码是模拟实现(线程非安全):

    一个线程t1执行take()操作,由于队列中有一个元素,所以不会被阻塞(进入条件变量),假设执行到第12行时,校验通过,但是由于时间片用完,进行上下文切换:

    java
    public E take() throws InterruptedException {
        E r = null;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0){
                takeCondition.await();
            }
    
            r = head.item;
            head = head.next;
    				// 队列中只有一个元素,该元素出队后,需要将尾指针置为null
            if(count.get() == 1){
                last = null;
            }
    
            count.decrementAndGet();
        }finally {
            takeLock.unlock();
        }
    
        return r;
    }

    图示如下:

    image-20250522124340702

    之后另一个线程t2执行put()操作,由于putLock是另一把锁,并且此时队列未满,所以该线程也不会被阻塞,执行完成:

    java
    public void put(E item) throws InterruptedException {
        if (item == null) {
            throw new NullPointerException();
        }
    
        Node<E> node = new Node<>(item);
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity){
                putCondition.await();
            }
    
          	// 队列中没有元素,需要将头尾指针指向新入队的元素
            if (head == null){
                head = node;
                last = node;
            }else{
                last.next = node;
                last = node;
            }
    
            count.getAndIncrement();
        }finally {
            putLock.unlock();
        }
    }

    示意图如下:

    image-20250522124502368

    假设之后再切换回线程t1,执行完剩余的逻辑,那么最后结果如下:

    image-20250522124623422

    结果显然不正确,必然要设计更复杂的机制,保证修改头尾指针的原子性。

    当有哨兵节点后,通过设计出队算法,我们可以不修改尾指针;并且入队算法也不会修改头指针。这样就保证了头尾指针互不干涉,可以分别用两把锁来保护:

    java
    // LinkedBlockingQueue的出入队算法
    
    // 出队算法,不修改尾指针
    private E dequeue() {
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }
    
    // 入队算法,不修改头指针
    private void enqueue(Node<E> node) {
        last = last.next = node;
    }

3. 写时复制: CopyOnWriteArrayList

在JUC中,利用写时复制技术实现并发安全的集合类有CopyOnWriteArrayListCopyOnWriteArraySet,但是CopyOnWriteArraySet是基于CopyOnWriteArrayList的,所以我们以CopyOnWriteArrayList介绍基本原理。

CopyOnWriteArrayListjava.util.concurrent 包提供的一个线程安全的 List 实现。它的核心思想是**“写时复制” (Copy-On-Write):当进行写操作(如添加、修改、删除元素)时,它会创建一个新的底层数组**,在新数组上进行修改,然后原子性地将集合的内部引用指向这个新数组。读操作则始终在旧的(不变的)数组上进行,因此不需要加锁。

CopyOnWriteArrayList中,有以下两个重要结构:

java
// 锁,用来保护写操作
final transient Object lock = new Object();
// 原始数组
private transient volatile Object[] array;

当我们进行写操作时(例如add()),会进行加锁操作,并且会创建一个新的数组,在新数组上进行修改,然后将原始数组指向新数组:

java
public boolean add(E e) {
  	// 加锁
    synchronized (lock) {
        Object[] es = getArray();
        int len = es.length;
      	// 创建新数组
        es = Arrays.copyOf(es, len + 1);
      	// 在新数组上进行修改
        es[len] = e;
      	// 将原始数组指向新数组
        setArray(es);
        return true;
    }
}

当我们进行读操作时,是不会加锁的,而且是读取的原始数组,例如get()方法:

java
public E get(int index) {
    return elementAt(getArray(), index);
}

final Object[] getArray() {
    return array;
}

static <E> E elementAt(Object[] a, int index) {
    return (E) a[index];
}

这种机制能带来极高的读并发性能,但是,也有一些问题:

  • 写操作开销大 ,内存占用高:每次写操作(add(), set(), remove() 等)都需要创建并复制整个底层数组。对于包含大量元素的列表,这将导致显著的 CPU 消耗和内存分配开销。即使只修改一个元素,也需要复制整个数组。所以写时复制不适用于写操作频繁的场景。

  • 数据一致性弱 / 读操作可能获得旧数据:当进行读操作时,总是在原始数组上进行读取,可能看不到最新的写操作结果,对于某些实时性要求高的应用来说是不可接受的,例如,如果需要读取的是金融交易数据或精确的传感器数据。

    例如,线程t1对数组进行遍历,与此同时,线程t2删除了最后一个元素,但线程t1仍然会读取到最后一个元素:

    java
    public static void main(String[] args) {
        // 新建一个CopyOnWriteArrayList,并插入数据
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        copyOnWriteArrayList.add(1);
        copyOnWriteArrayList.add(2);
        copyOnWriteArrayList.add(3);
        copyOnWriteArrayList.add(4);
    
        // 线程t1遍历数组
        new Thread(()->{
            Iterator iterator = copyOnWriteArrayList.iterator();
            while (iterator.hasNext()){
                Object next = iterator.next();
                log.info(next.toString());
                Sleeper.sleep(1000);  // 睡眠1秒,模拟操作数据
            }
        },"t1").start();
    
        // 线程t2删除最后一个元素
        Sleeper.sleep(100);
        new Thread(()->{
            Object removed = copyOnWriteArrayList.remove(copyOnWriteArrayList.size() - 1);
            log.info("删除最后一个元素: {}", removed);
        }, "t2").start();
    }

    结果:

    txt
    14:00:00.332 [t1] INFO  : 1
    14:00:00.433 [t2] INFO  : 删除最后一个元素: 4
    14:00:01.338 [t1] INFO  : 2
    14:00:02.343 [t1] INFO  : 3
    14:00:03.345 [t1] INFO  : 4

    可以看到,线程t1并没有发现线程t2删除了最后一个元素,读取到的是旧数据。

4. CAS无锁: ConcurrentHashMap

4.1 介绍

在JUC中,Concurrent思想实现的线程安全集合类利用CAS优化,降低锁的粒度,进一步提高并发性。

ConcurrentHashMap 是 Java 并发包 java.util.concurrent 中提供的一个线程安全的 HashMap 实现。它的设计目标是在保证线程安全的前提下,尽可能地提高并发性能,尤其是在读操作远多于写操作的场景下。

我们以ConcurrentHashMap为例子,讲解源码。

首先熟悉ConcurrentHashMap中的数据结构及相关逻辑:

  • ConcurrentHashMap 内部是一个 Node 数组(称为table),每个 Node 包含 hash, key, valuenext 指针。数组中的每个位置称为一个桶(bin)。

  • 最开始,桶中元素以链表结构存在;当链表长度超过一定阈值(默认为 8)时,链表会转换为红黑树 (TreeBin),提高查找效率,称为树化;红黑树的根节点称为TreeBin,继承于Node,其hash值为特殊值-2,表示该桶存储的是红黑树;

  • HashTable中的元素个数超过阈值(默认为数组长度的3/4),会触发扩容逻辑,将Node数组的长度乘以2创建新数组(称为nextTable),并将旧数组(table)中每个桶中的Node节点转移到新数组(nextTable)中,旧数组中每个桶的Node节点转移完成后,会将该桶的节点设置为ForwardingNode(是Node的子类,并且hash值为特殊值-1),最终将旧数组指向新数组(table = nextTable),新数组置空(nextTable = null);

  • 所有 NodeTreeBin 的字段(如 next 指针)都声明为 volatile,确保可见性;

4.2 get()方法

下面我们来看ConcurrentHashMap的读操作,以get()方法为例(为了代码更容易理解,改写部分逻辑):

java
public V get(Object key) {
    Node<K,V>[] tab = table; 
  	int n = tab.length;
  	// Node数组还没有创建,此时没有元素,直接返回null
  	if(tab == null || tab.length <= 0)
      return null;
  
  	// spread()方法是对key的哈希值再进行一次hash,并且保证结果为正数
    int h = spread(key.hashCode());
  	// (n-1)&h 即 h%n,用来获取桶下标,即该元素应该放在哪个桶中
  	// tabAt(Node<K,V>[] tab, int i) 用来获取下标为i的桶中第一个元素
  	Node<K,V> e = tabAt(tab, (n - 1) & h);
  	// 桶中没有元素,直接返回null
  	if(e == null)
      return null;
  
    Node<K,V> p; 
  	int eh = e.hash; 
  	K ek = e.key;  // K 是键的类型
  	
    if (eh == h) {
      	// 桶中第一个节点就是要查找的元素
        if (ek == key || (ek != null && key.equals(ek)))
            return e.val;
    }
    else if (eh < 0) {
      	// 桶中第一个节点的hash值为负数,那么有两种情况:
      	// hash = -1: 表示该节点是ForwardingNode,即该桶的节点已转移到nextTable中
        // hash = -2: 表示该节点是TreeBin,即该桶的节点已树化,需要进行红黑树查找
        return (p = e.find(h, key)) != null ? p.val : null;
    }
  
  	// 进行链表查找
    while ((e = e.next) != null) {
        if (e.hash == h &&
            ((ek = e.key) == key || (ek != null && key.equals(ek))))
            return e.val;
    }
    
    return null;
}

在上面第30行中的find()方法,是Node中定义的方法,其实就是普通的链表查找:

java
Node<K,V> find(int h, Object k) {
    Node<K,V> e = this;
    if (k != null) {
        do {
            K ek;
            if (e.hash == h &&
                ((ek = e.key) == k || (ek != null && k.equals(ek))))
                return e;
        } while ((e = e.next) != null);
    }
    return null;
}

为什么普通Node定义了链表查找方法,在get()方法中还要实现一遍链表查找算法呢?这是为了少一次方法调用,最高效地处理最常见的情况(短链表),直接在 get() 方法中进行内联的链表遍历

设计精妙之处在于,TreeBinForwardingNode这两个子类,重写了find()方法,以实现不同情况下的查找算法:

java
final Node<K,V> find(int h, Object k) {
    if (k != null) { // 确保键不为空
        // 1. 遍历 TreeBin 内部的“first”链表
        //    这个 first 链表是一个“备份”或“回退”机制,在某些特殊情况下使用。
        //    例如,在树化过程中,可能存在一些节点尚未完全整合到树结构中。
        for (Node<K,V> e = first; e != null; ) { // 从 first 节点开始遍历
            int s; K ek;
            // 2. 检查 TreeBin 的锁状态(lockState)
            //    WAITER 表示有写线程在等待,WRITER 表示有写线程正在修改树。
            //    如果当前有写操作(WRITER)或写操作正在等待(WAITER),
            //    意味着树可能处于不稳定或修改过程中,此时不宜直接遍历树。
            if (((s = lockState) & (WAITER|WRITER)) != 0) {
                // 3. 如果检测到有写操作或等待写操作,则退化为线性查找 first 链表
                //    这是一个安全的回退策略:在树结构可能不一致时,线性查找一个较短的链表更安全。
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e; // 找到则返回
                e = e.next; // 继续遍历链表
            }
            // 4. 如果没有写操作或写等待,尝试获取读锁(乐观读)
            else if (U.compareAndSetInt(this, LOCKSTATE, s, s + READER)) {
                // U 是 Unsafe 实例,LOCKSTATE 是 TreeBin 的一个字段,表示锁状态
                // s 是当前锁状态的快照
                // s + READER 是尝试将读者数量增加1(READER 是一个位掩码或常量,表示一个读者)
                // 成功执行 CAS 意味着我们获得了读锁,可以安全地遍历树。
                TreeNode<K,V> r, p;
                try {
                    // 5. 执行红黑树的查找操作
                    //    root 是 TreeBin 内部实际红黑树的根节点
                    //    r.findTreeNode(h, k, null) 是红黑树的查找算法(O(logN) 复杂度)
                    p = ((r = root) == null ? null :
                         r.findTreeNode(h, k, null));
                } finally {
                    // 6. 释放读锁
                    //    原子地将读锁计数器减去 READER。
                    //    同时,这里包含了一个优化:如果当前读锁计数器减到只剩下 WAITER 位,
                    //    并且有等待的写线程(waiter != null),则唤醒写线程。
                    Thread w;
                    if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
                        (READER|WAITER) && (w = waiter) != null)
                        LockSupport.unpark(w); // 唤醒等待的写线程
                }
                return p; // 返回树查找的结果
            }
            // 7. 如果 CAS 失败(lockState 在读取 s 后被其他线程修改了),
            //    循环会继续,再次尝试从头开始(或者在某些情况下,会重新从 first 遍历)。
            //    这个逻辑在 `for` 循环头部的 `e = first` 和 `e = e.next` 之间,
            //    所以如果 CAS 失败,它会再次尝试 `e = first` 并重新开始整个逻辑。
        }
    }
    return null; // 键不存在
}
java
Node<K,V> find(int h, Object k) {
  	// 去新数组nextTable中查找元素
    outer: for (Node<K,V>[] tab = nextTable;;) {
        Node<K,V> e; int n;
      	// 新数组为null或新数组对应的桶中没有元素,直接返回null
        if (k == null || tab == null || (n = tab.length) == 0 ||
            (e = tabAt(tab, (n - 1) & h)) == null)
            return null;
        for (;;) {
            int eh; K ek;
          	// 链表查找算法一部分
            if ((eh = e.hash) == h &&
                ((ek = e.key) == k || (ek != null && k.equals(ek))))
                return e;
            if (eh < 0) {
              	// 在新数组查找过程中,有其他线程又扩容了,则重新到新数组中查找
                if (e instanceof ForwardingNode) {
                    tab = ((ForwardingNode<K,V>)e).nextTable;
                    continue outer;
                }
                else
                  	// 查找红黑树
                    return e.find(h, k);
            }
          	// 链表查找算法一部分
            if ((e = e.next) == null)
                return null;
        }
    }
}

4.3 put()方法

我们来看看put()方法的实现,其中涉及到树化、扩容:

java
// key 要存放的值
// value 要存放的值
// onlyIfAbsent 只有原map中不存在对应的key值,才会放入新的value
final V putVal(K key, V value, boolean onlyIfAbsent) {
  	// 参数校验,不允许存放null键,null值
    if (key == null || value == null) 
      throw new NullPointerException();
  	// 计算key的哈希值
    int hash = spread(key.hashCode());
  	// binCount表示对应桶中的节点数量,为树化做准备
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; 
      	int n, i, fh; 
      	K fk; 
      	V fv;
      	// 如果table为null,则初始化table
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
          	// 如果桶为null,则使用CAS直接将键值对放入桶中
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
          	// 如果桶中第一个节点的哈希值为 MOVED(-2),表示是ForwardingTable,则调用helpTransfer()帮助转移节点
            tab = helpTransfer(tab, f);
        else if (onlyIfAbsent 
                 && fh == hash
                 && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                 && (fv = f.val) != null)
          	// 如果桶中第一个节点与要放入的节点key值相同,并且不不允许覆盖,则不放入直接返回已存在的节点值
            return fv;
        else {
          	// 开始放入逻辑
            V oldVal = null;
          	// f是桶中第一个元素,对该元素加锁,减少锁的粒度,不影响其他桶的操作,增加了并发度
            synchronized (f) {
              	// 再次判断一下桶中第一个节点就是自己拿到的,有可能在等待锁期间,其他线程进行了树化、扩容操作,导致第一个节点发生了变化,此时需要重新进行put逻辑,即回到for循环开始
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                     	  // 第一个节点的hash值大于0,说明是链表结构
                      	// 默认为1,表示要放入的节点
                        binCount = 1;
                      	// 遍历链表,看是否有key值相同的节点,如果有则替换或什么也不做,并且在遍历过程中,binCount递增
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                          	// 放到最后
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key, value);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                      	// 第一个节点是TreeBin,表示该桶已进行树化,调用putTreeVal进行放入逻辑
                        Node<K,V> p;
                      	// 注意,此时binCount为2并且不再改变,不会大于树化阈值(8),不会再次进行树化
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update");
                }
            }
          	// 最后判断桶中元素数量
            if (binCount != 0) {
              	// 桶中元素数量大于了树化阈值(8),调用treeify()进行树化
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
  	// 最后计算table数组中的元素数量,如果超过阈值,则进行扩容操作
    addCount(1L, binCount);
    return null;
}

treeifyBin()就是将桶中的链表结构转化为红黑树,注意,转化过程仍然会对桶中第一个元素加锁,因此,在树化过程中,不允许对该桶进行写操作。

然后我们来看addCount()逻辑:

在查看addCount()之前,最好先了解LongAdder的设计思想。

java
// x 表示table数组中新增的元素数量
// check 表示是否进行扩容检查,如果check为-1,表示删除操作,不会进行扩容,如果大于等于0,表示传入的是桶中元素数量
private final void addCount(long x, int check) {
    CounterCell[] cs; 
  	long b, s;
  	// baseCount 表示一个基本的table中所有元素数量
  	// counterCells 表示多个计数的容器,用于避免线程冲突,提高并发性
    if ((cs = counterCells) != null ||
        !U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell c; long v; int m;
        boolean uncontended = true;
        if (cs == null || (m = cs.length - 1) < 0 ||
            (c = cs[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {
          	// 线程安全地将值加到指定的Cell中
            fullAddCount(x, uncontended);
          	// 在调用 fullAddCount() 之后直接返回,主要是出于效率考虑,因为 fullAddCount() 本身就是一个比较重量级且处理复杂情况的操作
            return;
        }
      	// 当一个元素被添加到一个空的桶中时,这意味着这个桶刚刚开始有元素。局部上,这个桶的密度很低。此时,整个 ConcurrentHashMap 达到容量阈值的可能性相对较小。为了避免每次向空桶添加元素时都执行昂贵的 sumCount() 和扩容检查,ConcurrentHashMap 选择在此处提前返回。它认为这种情况下检查扩容的收益很低,不值得付出额外的计算开销。
        if (check <= 1)
            return;
      	// 计算
        s = sumCount();
    }
  
  	// 总之,经过上面的if代码块后,s表示当前数组中元素总数量
  	
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
      	// sizeCtl 正常情况下表示扩容阈值,为数组table大小的3/4,例如,table的长度为16,那么sizeCtl为12
      	// 当 s >= sizeCtl时,会触发扩容机制
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
          	// 在继续下面的逻辑前,请看下面备注信息
            int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;
            if (sc < 0) {
              	// sizeCtl 小于0,则已经有线程在进行扩容了
                if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
                    (nt = nextTable) == null || transferIndex <= 0)
                  	// 来帮助扩容的线程已经到达最大值、扩容已接近尾声或新数组还没创建出来,直接退出
                    break;
              	// 	将sizeCtl的值+1,就是低位+1,表示当前线程参与扩容
                if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2))
              	// sizeCtl大于等于0,此时还没有线程开始扩容,将sizeCtl设置为特殊值,并开始扩容
                transfer(tab, null);
            s = sumCount();
        }
    }
}

备注信息:sizeCtl 在扩容中的状态

ConcurrentHashMap 进行扩容时,sizeCtl 会被设置为一个负数,这个负数编码了两个信息:

  1. 扩容戳 (Resize Stamp): rs = resizeStamp(n) << RESIZE_STAMP_SHIFT。这是一个基于旧表长度 n 生成的唯一标识符,用于标识当前的扩容操作。
  2. 正在进行扩容的线程数: sizeCtl 的低位表示有多少个线程正在参与 transfer() (数据转移) 操作。

4.4 transfer()方法

这是主要的扩容方法,其中涉及到新表创建、节点转移、线程安全控制等。

transfer()的思想大致如下:将旧数组按索引分段,然后每个线程领取一段,处理段中每个桶中的节点,处理完成后,将旧数组中该桶第一个节点置为ForwardingNode表示该桶已完成转移。当处理完该段后,再去领下一段,直到所有的段都被领完了,那么该线程退出。退出之前,判断该线程是不是最后一个退出的,如果是,那么需要重新检查一遍,看看旧数组中的每个桶是否都处理完成了。

transfer()中的大致流程如下:

  • 首先判断新数组nextTable是否已创建,如果没创建,那么就创建新数组,新数组的长度为旧数组长度的2倍,并且将transferIndex设置为旧数组的长度。如果新数组没有创建,那么其他线程不会进入到transfer()方法(在上面addCount()中的40-43行代码有所体现)

    transferIndex 为段的起始位置,默认从尾到头领取,段长又称为步长,在transfer()中以stride变量表示

  • 当新数组创建完成后,其他线程也可以进入transfer()方法,此时线程利用CAS领取一段,领取成功即表示将transferIndex的值减去段长;

    在transfer()中,假设我们现在的段长为4,旧数组长度为16

    nextIndex 表示段的结束位置,例如16;nextBound表示段的开始位置,例如12;处理的段应该是[12, 16),即包括索引为12的位置,不包括索引为16的位置;

    i 表示当前处理的桶索引,从后往前处理,即 i 的初始值为nextIndex -1 ,结束值为 nextBound ;

  • 线程领取一段后,就处理该段中的桶,按照情况处理

    • 如果桶为null,直接将ForwardingNode节点设置为该桶第一个元素,然后继续处理下一个桶;
    • 如果桶第一个元素就是ForwardingNode节点,表示该桶已被处理过,跳过该桶,继续处理下一个桶;
    • 其他情况,表示该桶存储的是正常的Node数据节点,则对桶中第一个元素加锁:
      • 如果桶中是红黑树,则进行红黑树的转移;
      • 如果桶中是链表,则进行链表的转移
  • 线程处理完一段后,会再试图领取下一段,如果此时已没有段可领,则将i置为-1,当i等于-1时,表示旧数组已处理完毕,可以退出。但是,在退出之前,判断自己是不是最后一个退出的线程,如果是,则要负责做最后的坚持,即重新遍历一遍旧数组。

代码如下:

java
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
  	// 计算步长(段长)
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; 
  	// 新数组还没创建,则先初始化新数组
    if (nextTab == null) {            // initiating
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
      	// 将transferIndex设置为旧数组长度,用于分配段
        transferIndex = n;
    }
    int nextn = nextTab.length;
  	// ForwardingNode 节点
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;	// 是否要继续处理
    boolean finishing = false; // 用于最后一个线程清扫工作
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
      	// 下面的while循环用于获取段长,并且改变要处理的桶索引
        while (advance) {
            int nextIndex, nextBound;
          	// --i 后桶索引仍然在领取的段内或者有线程正在结束,则跳出while循环
          	// finishing 为true表示该线程正在结束扩容流程,即使bound不为0,--i >= bound || finishing 整个表达式也会为true,最终导致 i 递减到0,从而可以检查整个旧数组
            if (--i >= bound || finishing)
                advance = false;
          	// 没有段可领
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
          	// 尝试领取一个新段
            else if (U.compareAndSetInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
      
      	// 如果没有段可领,表示旧数组处理完成,则进入退出流程
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
          	// 正在退出,则将旧数组指针指向新数组,并且更新sizeCtl的值为新的扩容阈值
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
              	// 这解释了第一个进行扩容的线程为什么是将 sizeCtl设置为 rs+2 了
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
              	// 设置该线程是最后一个退出扩容的线程,要进行检查、清理工作
                finishing = advance = true;
              	// 将 i 设置为旧数组长度,从尾到头进行检查
                i = n; 
            }
        }
        else if ((f = tabAt(tab, i)) == null)
          	// 桶中为null,直接将第一个元素置为fwd节点
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
          	// 该桶已处理过
            advance = true; // already processed
        else {
          	// 该桶没处理过,加锁处理
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                  	// 桶中是链表
                    if (fh >= 0) {
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                  	// 桶中是红黑树
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                            (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                            (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update");
                }
            }
        }
    }
}

关于链表和红黑树的转移,此处不再详细讲解。