Skip to content

JUC 工具类

本文介绍JUC提供的三个工具类:SemaphoreCountDownLatchCyclicBarrier

1. Semaphore

1.1 介绍

Semaphore,译为信号量,是 java.util.concurrent 包中提供的一个同步辅助工具,它主要用于控制对共享资源的访问数量。在之前学习到的synchronizedReentrantLock中,不允许多个线程同时访问共享资源,但通过Semaphore,我们可以限制同时访问共享资源的线程数量。

Semaphore的核心概念称为许可 (Permit),在 Semaphore 内部维护着一定数量的许可。线程在访问共享资源前需要获取 (acquire) 许可,访问完成后需要释放 (release) 许可。

Semaphore的工作流程如下:

  1. 首先创建一个 Semaphore 实例,指定初始可用的许可数量。

  2. 当一个线程需要访问受 Semaphore 保护的资源时,调用 acquire() 方法:

    • 如果当前有可用许可(许可数量大于0),许可数量会减一,线程立即获得许可并继续执行;

    • 如果当前没有可用许可(许可数量等于0),线程会进入等待状态,直到有其他线程调用 release() 方法释放许可;

  3. 当一个线程使用完受保护的资源时,调用 release() 方法:

    • 可用许可数量加一;

    • 如果当前有线程因为等待许可而被阻塞,其中一个线程会被唤醒,获取刚刚释放的许可,然后继续执行;

1.2 使用案例

假设现在有一个停车场,有5个停车位,某一天,有10辆车陆陆续续来停车,那么我们可以使用Semaphore来进行停车场管理:

java
@Slf4j
public class Demo {
    // 停车位 以空字符串代表没车(此停车位空闲),有字符串代表有车(此停车位已被占用)
    static AtomicReference<String>[] parkingSlots = new AtomicReference[5];
    // 信号量,代表可以同时停放的车辆
    static Semaphore semaphore = new Semaphore(5);

    // 初始化停车位
    static {
        for (int i = 0; i < parkingSlots.length; i++) {
            parkingSlots[i] = new AtomicReference<>("");
        }
    }

    public static void main(String[] args) {

        // 创建10个线程,代表10辆车
        for (int i = 1; i <= 10; i++) {
            new Thread(()->{
                // 获取信号量,相当于获取停车场进入权
                // 当信号量为0时,会阻塞等待
                try {
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }

                // 寻找停车位
                for (int j = 0; j < parkingSlots.length; j++) {
                    // 找到了
                    if(parkingSlots[j].get().isEmpty()){
                        boolean b = parkingSlots[j].compareAndSet("", Thread.currentThread().getName());
                        if (b) {
                            // 成功把车停进去了
                            log.info("获得停车位:{}", j+1);
                            break;
                        }else{
                            // 被别人抢了车位,重新找停车位
                            j = -1;
                        }
                    }
                }
                
                // 随机休眠一段时间,代表停车时间
                Sleeper.sleep(new Random().nextInt(2000));

                // 释放车位
                for (int j = 0; j < parkingSlots.length; j++) {
                    if(Thread.currentThread().getName().equals(parkingSlots[j].get())){
                        parkingSlots[j].set("");
                        break;
                    }
                }
                log.info("离开了");
                // 释放信号量
                semaphore.release();

            },"t"+i).start();
        }
    }
}

运行结果:

txt
10:11:37.067 [t1] INFO  : 获得停车位:2
10:11:37.067 [t2] INFO  : 获得停车位:1
10:11:37.067 [t5] INFO  : 获得停车位:5
10:11:37.067 [t4] INFO  : 获得停车位:4
10:11:37.067 [t3] INFO  : 获得停车位:3
10:11:37.357 [t1] INFO  : 离开了
10:11:37.358 [t6] INFO  : 获得停车位:2
10:11:37.594 [t2] INFO  : 离开了
10:11:37.595 [t7] INFO  : 获得停车位:1
10:11:37.605 [t5] INFO  : 离开了
10:11:37.605 [t8] INFO  : 获得停车位:5
10:11:37.844 [t8] INFO  : 离开了
10:11:37.844 [t9] INFO  : 获得停车位:5
10:11:38.092 [t3] INFO  : 离开了
10:11:38.092 [t10] INFO  : 获得停车位:3
10:11:38.663 [t9] INFO  : 离开了
10:11:38.763 [t4] INFO  : 离开了
10:11:38.873 [t6] INFO  : 离开了
10:11:39.423 [t7] INFO  : 离开了
10:11:39.990 [t10] INFO  : 离开了

2. CountDownLatch

2.1 介绍

CountDownLatchjava.util.concurrent 包中提供的一个同步辅助工具,它的核心作用是允许一个或多个线程等待,直到在其他线程中执行的一组操作完成

核心概念如下:

  1. 初始计数 (Initial Count): 创建 CountDownLatch 时需要指定一个正整数作为初始计数,这个计数代表了需要等待的事件或任务的数量。
  2. 递减计数 (Count Down): 当一个线程完成了它负责的任务或某个事件发生时,它会调用 countDown() 方法,这会使 CountDownLatch 的内部计数器减一。
  3. 等待 (Await): 一个或多个线程(通常是主线程或协调线程)会调用 await() 方法,调用 await() 的线程会一直阻塞,直到 CountDownLatch 的内部计数器减为零。

举个例子,假设现在一扇大门被6把锁(初始计数为6)锁住,一群人(主线程或协调线程)因为没有钥匙,只能在门前等待(调用await()方法),看门者(有可能1个,也有可能多个)接受到命令后把锁打开(调用countDown()方法),当所有的锁被打开后,等在门前的人就可以通过门继续前进了(继续运行)。

2.2 使用案例

在校园生活里,一般都是由当天的值日生关教室门最后一个离开。假设现在教室里还有5名同学在学习、收拾东西,值日生只能等他们都离开后才能锁门,我们可以使用CountDownLatch来模拟这个场景:

java
@Slf4j
public class Demo {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch count = new CountDownLatch(5);

        log.info("值日生等所有同学离开教室...");

        for (int i = 1; i <= 5; i++) {
            new Thread(()->{
                Sleeper.sleep(new Random().nextInt(2000));
                log.info("离开了教室");
                count.countDown();
            },"同学" + i).start();
        }

        count.await();
        log.info("所有同学离开了,锁门。");
    }
}

结果如下:

txt
10:31:47.920 [main] INFO  : 值日生等所有同学离开教室...
10:31:48.419 [同学4] INFO  : 离开了教室
10:31:48.957 [同学2] INFO  : 离开了教室
10:31:49.450 [同学3] INFO  : 离开了教室
10:31:49.590 [同学1] INFO  : 离开了教室
10:31:49.661 [同学5] INFO  : 离开了教室
10:31:49.662 [main] INFO  : 所有同学离开了,锁门。

3. CyclicBarrier

3.1 介绍

CyclicBarrierjava.util.concurrent 包中提供的一个同步辅助工具,它的主要作用是让一组线程互相等待,直到所有线程都到达一个共同的屏障点 (barrier point),然后所有线程才能继续执行。

核心概念如下:

  1. 屏障点 (Barrier Point): 就是所有参与的线程需要汇合、等待的地方。
  2. 参与者数量 (Parties): 创建 CyclicBarrier 时需要指定有多少个线程需要到达这个屏障点。
  3. 循环性 (Cyclic): 这是 CyclicBarrier 的一个重要特点。一旦所有线程都到达了屏障点并被释放,屏障就会被重置,可以被同一组线程再次使用。这使得 CyclicBarrier 非常适合用于解决需要重复同步的问题,例如分阶段的任务处理。
  4. 屏障动作 (Barrier Action - 可选): 可以在创建 CyclicBarrier 时提供一个 Runnable 对象作为屏障动作。当所有线程都到达屏障点时,在释放所有线程之前,会由其中一个到达屏障的线程来执行这个 Runnable。这通常用于在进入下一阶段之前执行一些汇总、清理或准备工作。

CyclicBarrier的工作流程如下:

  • 创建一个 CyclicBarrier 实例,并指定需要等待的线程数量(parties);

  • 每个参与的线程在执行到需要同步的地方(到达屏障点)时,调用 await() 方法,调用 await() 的线程会进入等待状态,直到满足以下条件之一:

    • 最后一个线程也调用了 await() 方法(即到达屏障的线程数量达到了 parties);
    • 等待的线程被中断;
    • 等待的线程发生了超时;
    • 其他等待的线程因异常或中断离开了屏障,导致屏障被破坏 (BrokenBarrierException);
  • 当最后一个线程到达屏障点时:

    • 如果定义了屏障动作 (barrierAction),该动作会被执行;
    • 然后,所有等待的线程都被释放,可以继续执行 await() 方法之后的代码;
    • 屏障被重置,可以供下一轮使用;

3.2 使用案例

我们以大学小组作业为例子,演示CyclicBarrier的使用。假设现在有3名同学组成一个小组完成大作业,并且约定周三和周五分别碰头一次,整合各自工作成果。

3.2.1 基本使用

java
public static void main(String[] args) {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(3, ()->{
        log.info("碰头了,汇总工作结果");
    });

    for (int i = 1; i <= 3; i++) {
        new Thread(()->{
            log.info("各自开始工作...");
            // 随机数模拟有人早到,有人迟到
            Sleeper.sleep(2000 + new Random().nextInt(-100, 100));
            log.info("周三了,到达集合点,准备碰头");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                throw new RuntimeException(e);
            }

            log.info("各自开始新一轮工作...");
            Sleeper.sleep(2000+ new Random().nextInt(-100, 100));
            log.info("周五了,到达集合点,准备碰头");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                throw new RuntimeException(e);
            }

        },"同学" + i).start();
    }
}

结果:

txt
11:01:43.957 [同学1] INFO  : 各自开始工作...
11:01:43.957 [同学3] INFO  : 各自开始工作...
11:01:43.957 [同学2] INFO  : 各自开始工作...
11:01:46.026 [同学3] INFO  : 周三了,到达集合点,准备碰头
11:01:46.040 [同学1] INFO  : 周三了,到达集合点,准备碰头
11:01:46.045 [同学2] INFO  : 周三了,到达集合点,准备碰头
11:01:46.045 [同学2] INFO  : 碰头了,汇总工作结果
11:01:46.045 [同学2] INFO  : 各自开始新一轮工作...
11:01:46.045 [同学3] INFO  : 各自开始新一轮工作...
11:01:46.045 [同学1] INFO  : 各自开始新一轮工作...
11:01:48.094 [同学3] INFO  : 周五了,到达集合点,准备碰头
11:01:48.112 [同学2] INFO  : 周五了,到达集合点,准备碰头
11:01:48.141 [同学1] INFO  : 周五了,到达集合点,准备碰头
11:01:48.142 [同学1] INFO  : 碰头了,汇总工作结果

可以看到,当调用await()方法时,线程会阻塞,直到所有的等待线程都到达屏障点后,才继续执行。并且,屏障动作是由其中一个等待线程执行的。

3.2.2 破坏屏障情形一

假设现在同学1不喜欢别人迟到,每次他到集合点之后,只会等待其他同学30分钟(程序中以10毫秒计算),如果等待超时后,他就不等了,自己离开继续其他工作。那么这是,他们之间的约定就失效了(屏障破坏)。

java
public static void main(String[] args) {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(3, ()->{
        log.info("碰头了,汇总工作结果");
    });

    for (int i = 1; i <= 3; i++) {
        new Thread(()->{
            log.info("各自开始工作...");
            // 随机数模拟有人早到,有人迟到
            Sleeper.sleep(2000 + new Random().nextInt(-100, 100));
            log.info("周三了,到达集合点,准备碰头");
            try {
                if("同学1".equals(Thread.currentThread().getName())) {
                  	// 只等待10毫秒
                    cyclicBarrier.await(10, TimeUnit.MILLISECONDS);
                }else{
                    cyclicBarrier.await();
                }
            } catch (InterruptedException | TimeoutException | BrokenBarrierException e) {
                log.error(e.getMessage(), e);
            }

            log.info("各自开始新一轮工作...");
            Sleeper.sleep(2000+ new Random().nextInt(-100, 100));
            log.info("周五了,到达集合点,准备碰头");
            try {
                if("同学1".equals(Thread.currentThread().getName())) {
                    cyclicBarrier.await(10, TimeUnit.MILLISECONDS);
                }else{
                    cyclicBarrier.await();
                }
            } catch (InterruptedException | TimeoutException | BrokenBarrierException e) {
                log.error(e.getMessage(), e);
            }

        },"同学" + i).start();
    }
}

结果(部分):

txt
11:11:37.526 [同学1] INFO  : 各自开始工作...
11:11:37.526 [同学2] INFO  : 各自开始工作...
11:11:37.526 [同学3] INFO  : 各自开始工作...
11:11:39.529 [同学1] INFO  : 周三了,到达集合点,准备碰头
11:11:39.543 [同学1] ERROR : null
java.util.concurrent.TimeoutException: null
11:11:39.547 [同学1] INFO  : 各自开始新一轮工作...
11:11:39.610 [同学3] INFO  : 周三了,到达集合点,准备碰头
11:11:39.611 [同学3] ERROR : null
java.util.concurrent.BrokenBarrierException: null
11:11:39.611 [同学3] INFO  : 各自开始新一轮工作...
11:11:39.614 [同学2] INFO  : 周三了,到达集合点,准备碰头
11:11:39.614 [同学2] ERROR : null
java.util.concurrent.BrokenBarrierException: null
11:11:39.614 [同学2] INFO  : 各自开始新一轮工作...
11:11:41.522 [同学1] INFO  : 周五了,到达集合点,准备碰头
11:11:41.522 [同学1] ERROR : null
java.util.concurrent.BrokenBarrierException: null
11:11:41.622 [同学2] INFO  : 周五了,到达集合点,准备碰头
11:11:41.623 [同学2] ERROR : null
java.util.concurrent.BrokenBarrierException: null
11:11:41.643 [同学3] INFO  : 周五了,到达集合点,准备碰头
11:11:41.644 [同学3] ERROR : null
java.util.concurrent.BrokenBarrierException: null

可以看到,同学1在第529毫秒到达了集合点,但是等待了10毫秒后,其他同学还没到达,约543毫秒时因等待超时离开了集合点,此时已经破坏了屏障(可以理解为在现场留下了一个纸条,说你们迟到了,我们不约了);同学2和同学3之后姗姗来迟,发现约定被破坏了,所以也不再等了。之后的约定自然而然也被破坏了,所以都会抛出错误。

3.2.3 破坏屏障情形二

假设现在同学1在到达集合点等待其他同学时,被老师打断了一下去做其他事情,之后他就忘记回去了,导致约定被破坏:

java
public static void main(String[] args) {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(3, ()->{
        log.info("碰头了,汇总工作结果");
    });

    List<Thread> threadList = new ArrayList<>();
    for (int i = 1; i <= 3; i++) {
        Thread thread = new Thread(() -> {
            log.info("各自开始工作...");
            if(!"同学1".equals(Thread.currentThread().getName())) {
                // 随机数模拟有人早到,有人迟到
                Sleeper.sleep(2000 + new Random().nextInt(-100, 100));
            }else{
                // 同学1早到,是为了保证能在await()期间被打断
                Sleeper.sleep(1800);
            }
            log.info("周三了,到达集合点,准备碰头");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                log.error(e.getMessage(), e);
            }

            log.info("各自开始新一轮工作...");
            Sleeper.sleep(2000 + new Random().nextInt(-100, 100));
            log.info("周五了,到达集合点,准备碰头");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                log.error(e.getMessage(), e);
            }

        }, "同学" + i);
        threadList.add(thread);

        thread.start();
    }

    // 打断同学1
    Sleeper.sleep(1900);
    threadList.get(0).interrupt();
}

结果:

txt
11:35:02.678 [同学1] INFO  : 各自开始工作...
11:35:02.678 [同学3] INFO  : 各自开始工作...
11:35:02.678 [同学2] INFO  : 各自开始工作...
11:35:04.484 [同学1] INFO  : 周三了,到达集合点,准备碰头
11:35:04.579 [同学1] ERROR : null
java.lang.InterruptedException: null
11:35:04.584 [同学1] INFO  : 各自开始新一轮工作...
11:35:04.597 [同学3] INFO  : 周三了,到达集合点,准备碰头
11:35:04.597 [同学3] ERROR : null
java.util.concurrent.BrokenBarrierException: null
11:35:04.598 [同学3] INFO  : 各自开始新一轮工作...
11:35:04.625 [同学2] INFO  : 周三了,到达集合点,准备碰头
11:35:04.625 [同学2] ERROR : null
java.util.concurrent.BrokenBarrierException: null
11:35:04.626 [同学2] INFO  : 各自开始新一轮工作...
11:35:06.571 [同学2] INFO  : 周五了,到达集合点,准备碰头
11:35:06.571 [同学2] ERROR : null
java.util.concurrent.BrokenBarrierException: null
11:35:06.628 [同学3] INFO  : 周五了,到达集合点,准备碰头
11:35:06.629 [同学3] ERROR : null
java.util.concurrent.BrokenBarrierException: null
11:35:06.647 [同学1] INFO  : 周五了,到达集合点,准备碰头
11:35:06.648 [同学1] ERROR : null
java.util.concurrent.BrokenBarrierException: null

可以看到,同学1线程因中断破坏了屏障,导致之后屏障都不可用了。

3.2.4 屏障破坏的应对方法

如果屏障被破坏后,我们可以调用reset()方法恢复屏障,但是,调用reset()的问题是难以同步线程,容易造成线程无限期阻塞。

假设下面这种场景,第一个线程到达屏障后,由于超时或中断破坏了屏障,那么这个线程调用reset()方法恢复屏障,但是注意,此时第二个和第三个线程都还没有到达屏障,此时第一个线程继续运行,第二个线程和第三个线程到达屏障后就会阻塞等待:

image-20250520120803432

对于屏障被破坏的情况,推荐使用多个屏障的方法,当某个阶段完成后,就在一个屏障点等待,另一个阶段完成后,就在另一个屏障点等待。这样,即使一个屏障被破坏,也不会影响另外的屏障。

java
public static void main(String[] args) {
    CyclicBarrier cyclicBarrier1 = new CyclicBarrier(3, ()->{
        log.info("周三碰头了,汇总工作结果");
    });
    CyclicBarrier cyclicBarrier2 = new CyclicBarrier(3, ()->{
        log.info("周五碰头了,汇总工作结果");
    });

    List<Thread> threadList = new ArrayList<>();
    for (int i = 1; i <= 3; i++) {
        Thread thread = new Thread(() -> {
            log.info("各自开始工作...");
            if(!"同学1".equals(Thread.currentThread().getName())) {
                // 随机数模拟有人早到,有人迟到
                Sleeper.sleep(2000 + new Random().nextInt(-100, 100));
            }else{
                // 同学1早到,是为了保证能在await()期间被打断
                Sleeper.sleep(1800);
            }
            log.info("周三了,到达集合点,准备碰头");
            try {
                cyclicBarrier1.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                log.error(e.getMessage(), e);
            }

            log.info("各自开始新一轮工作...");
            Sleeper.sleep(2000 + new Random().nextInt(-100, 100));
            log.info("周五了,到达集合点,准备碰头");
            try {
                cyclicBarrier2.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                log.error(e.getMessage(), e);
            }

        }, "同学" + i);
        threadList.add(thread);

        thread.start();
    }

    // 打断同学1
    Sleeper.sleep(1900);
    threadList.get(0).interrupt();
}

结果:

txt
12:11:54.007 [同学3] INFO  : 各自开始工作...
12:11:54.007 [同学2] INFO  : 各自开始工作...
12:11:54.007 [同学1] INFO  : 各自开始工作...
12:11:55.813 [同学1] INFO  : 周三了,到达集合点,准备碰头
12:11:55.908 [同学1] ERROR : null
java.lang.InterruptedException: null
12:11:55.911 [同学1] INFO  : 各自开始新一轮工作...
12:11:55.920 [同学2] INFO  : 周三了,到达集合点,准备碰头
12:11:55.921 [同学2] ERROR : null
java.util.concurrent.BrokenBarrierException: null
12:11:55.921 [同学2] INFO  : 各自开始新一轮工作...
12:11:56.072 [同学3] INFO  : 周三了,到达集合点,准备碰头
12:11:56.073 [同学3] ERROR : null
java.util.concurrent.BrokenBarrierException: null
12:11:56.073 [同学3] INFO  : 各自开始新一轮工作...
12:11:57.864 [同学1] INFO  : 周五了,到达集合点,准备碰头
12:11:57.977 [同学2] INFO  : 周五了,到达集合点,准备碰头
12:11:58.141 [同学3] INFO  : 周五了,到达集合点,准备碰头
12:11:58.142 [同学3] INFO  : 周五碰头了,汇总工作结果

可以发现,第一个屏障被破坏后,第二个屏障也会正常工作。