Appearance
JUC 工具类
本文介绍JUC提供的三个工具类:Semaphore、CountDownLatch和CyclicBarrier。
1. Semaphore
1.1 介绍
Semaphore,译为信号量,是 java.util.concurrent 包中提供的一个同步辅助工具,它主要用于控制对共享资源的访问数量。在之前学习到的synchronized和ReentrantLock中,不允许多个线程同时访问共享资源,但通过Semaphore,我们可以限制同时访问共享资源的线程数量。
Semaphore的核心概念称为许可 (Permit),在 Semaphore 内部维护着一定数量的许可。线程在访问共享资源前需要获取 (acquire) 许可,访问完成后需要释放 (release) 许可。
Semaphore的工作流程如下:
首先创建一个
Semaphore实例,指定初始可用的许可数量。当一个线程需要访问受
Semaphore保护的资源时,调用acquire()方法:如果当前有可用许可(许可数量大于0),许可数量会减一,线程立即获得许可并继续执行;
如果当前没有可用许可(许可数量等于0),线程会进入等待状态,直到有其他线程调用
release()方法释放许可;
当一个线程使用完受保护的资源时,调用
release()方法:可用许可数量加一;
如果当前有线程因为等待许可而被阻塞,其中一个线程会被唤醒,获取刚刚释放的许可,然后继续执行;
1.2 使用案例
假设现在有一个停车场,有5个停车位,某一天,有10辆车陆陆续续来停车,那么我们可以使用Semaphore来进行停车场管理:
java
@Slf4j
public class Demo {
// 停车位 以空字符串代表没车(此停车位空闲),有字符串代表有车(此停车位已被占用)
static AtomicReference<String>[] parkingSlots = new AtomicReference[5];
// 信号量,代表可以同时停放的车辆
static Semaphore semaphore = new Semaphore(5);
// 初始化停车位
static {
for (int i = 0; i < parkingSlots.length; i++) {
parkingSlots[i] = new AtomicReference<>("");
}
}
public static void main(String[] args) {
// 创建10个线程,代表10辆车
for (int i = 1; i <= 10; i++) {
new Thread(()->{
// 获取信号量,相当于获取停车场进入权
// 当信号量为0时,会阻塞等待
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 寻找停车位
for (int j = 0; j < parkingSlots.length; j++) {
// 找到了
if(parkingSlots[j].get().isEmpty()){
boolean b = parkingSlots[j].compareAndSet("", Thread.currentThread().getName());
if (b) {
// 成功把车停进去了
log.info("获得停车位:{}", j+1);
break;
}else{
// 被别人抢了车位,重新找停车位
j = -1;
}
}
}
// 随机休眠一段时间,代表停车时间
Sleeper.sleep(new Random().nextInt(2000));
// 释放车位
for (int j = 0; j < parkingSlots.length; j++) {
if(Thread.currentThread().getName().equals(parkingSlots[j].get())){
parkingSlots[j].set("");
break;
}
}
log.info("离开了");
// 释放信号量
semaphore.release();
},"t"+i).start();
}
}
}运行结果:
txt
10:11:37.067 [t1] INFO : 获得停车位:2
10:11:37.067 [t2] INFO : 获得停车位:1
10:11:37.067 [t5] INFO : 获得停车位:5
10:11:37.067 [t4] INFO : 获得停车位:4
10:11:37.067 [t3] INFO : 获得停车位:3
10:11:37.357 [t1] INFO : 离开了
10:11:37.358 [t6] INFO : 获得停车位:2
10:11:37.594 [t2] INFO : 离开了
10:11:37.595 [t7] INFO : 获得停车位:1
10:11:37.605 [t5] INFO : 离开了
10:11:37.605 [t8] INFO : 获得停车位:5
10:11:37.844 [t8] INFO : 离开了
10:11:37.844 [t9] INFO : 获得停车位:5
10:11:38.092 [t3] INFO : 离开了
10:11:38.092 [t10] INFO : 获得停车位:3
10:11:38.663 [t9] INFO : 离开了
10:11:38.763 [t4] INFO : 离开了
10:11:38.873 [t6] INFO : 离开了
10:11:39.423 [t7] INFO : 离开了
10:11:39.990 [t10] INFO : 离开了2. CountDownLatch
2.1 介绍
CountDownLatch是 java.util.concurrent 包中提供的一个同步辅助工具,它的核心作用是允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
核心概念如下:
- 初始计数 (Initial Count): 创建
CountDownLatch时需要指定一个正整数作为初始计数,这个计数代表了需要等待的事件或任务的数量。 - 递减计数 (Count Down): 当一个线程完成了它负责的任务或某个事件发生时,它会调用
countDown()方法,这会使CountDownLatch的内部计数器减一。 - 等待 (Await): 一个或多个线程(通常是主线程或协调线程)会调用
await()方法,调用await()的线程会一直阻塞,直到CountDownLatch的内部计数器减为零。
举个例子,假设现在一扇大门被6把锁(初始计数为6)锁住,一群人(主线程或协调线程)因为没有钥匙,只能在门前等待(调用await()方法),看门者(有可能1个,也有可能多个)接受到命令后把锁打开(调用countDown()方法),当所有的锁被打开后,等在门前的人就可以通过门继续前进了(继续运行)。
2.2 使用案例
在校园生活里,一般都是由当天的值日生关教室门最后一个离开。假设现在教室里还有5名同学在学习、收拾东西,值日生只能等他们都离开后才能锁门,我们可以使用CountDownLatch来模拟这个场景:
java
@Slf4j
public class Demo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch count = new CountDownLatch(5);
log.info("值日生等所有同学离开教室...");
for (int i = 1; i <= 5; i++) {
new Thread(()->{
Sleeper.sleep(new Random().nextInt(2000));
log.info("离开了教室");
count.countDown();
},"同学" + i).start();
}
count.await();
log.info("所有同学离开了,锁门。");
}
}结果如下:
txt
10:31:47.920 [main] INFO : 值日生等所有同学离开教室...
10:31:48.419 [同学4] INFO : 离开了教室
10:31:48.957 [同学2] INFO : 离开了教室
10:31:49.450 [同学3] INFO : 离开了教室
10:31:49.590 [同学1] INFO : 离开了教室
10:31:49.661 [同学5] INFO : 离开了教室
10:31:49.662 [main] INFO : 所有同学离开了,锁门。3. CyclicBarrier
3.1 介绍
CyclicBarrier 是 java.util.concurrent 包中提供的一个同步辅助工具,它的主要作用是让一组线程互相等待,直到所有线程都到达一个共同的屏障点 (barrier point),然后所有线程才能继续执行。
核心概念如下:
- 屏障点 (Barrier Point): 就是所有参与的线程需要汇合、等待的地方。
- 参与者数量 (Parties): 创建
CyclicBarrier时需要指定有多少个线程需要到达这个屏障点。 - 循环性 (Cyclic): 这是
CyclicBarrier的一个重要特点。一旦所有线程都到达了屏障点并被释放,屏障就会被重置,可以被同一组线程再次使用。这使得CyclicBarrier非常适合用于解决需要重复同步的问题,例如分阶段的任务处理。 - 屏障动作 (Barrier Action - 可选): 可以在创建
CyclicBarrier时提供一个Runnable对象作为屏障动作。当所有线程都到达屏障点时,在释放所有线程之前,会由其中一个到达屏障的线程来执行这个Runnable。这通常用于在进入下一阶段之前执行一些汇总、清理或准备工作。
CyclicBarrier的工作流程如下:
创建一个
CyclicBarrier实例,并指定需要等待的线程数量(parties);每个参与的线程在执行到需要同步的地方(到达屏障点)时,调用
await()方法,调用await()的线程会进入等待状态,直到满足以下条件之一:- 最后一个线程也调用了
await()方法(即到达屏障的线程数量达到了parties); - 等待的线程被中断;
- 等待的线程发生了超时;
- 其他等待的线程因异常或中断离开了屏障,导致屏障被破坏 (
BrokenBarrierException);
- 最后一个线程也调用了
当最后一个线程到达屏障点时:
- 如果定义了屏障动作 (
barrierAction),该动作会被执行; - 然后,所有等待的线程都被释放,可以继续执行
await()方法之后的代码; - 屏障被重置,可以供下一轮使用;
- 如果定义了屏障动作 (
3.2 使用案例
我们以大学小组作业为例子,演示CyclicBarrier的使用。假设现在有3名同学组成一个小组完成大作业,并且约定周三和周五分别碰头一次,整合各自工作成果。
3.2.1 基本使用
java
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, ()->{
log.info("碰头了,汇总工作结果");
});
for (int i = 1; i <= 3; i++) {
new Thread(()->{
log.info("各自开始工作...");
// 随机数模拟有人早到,有人迟到
Sleeper.sleep(2000 + new Random().nextInt(-100, 100));
log.info("周三了,到达集合点,准备碰头");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
log.info("各自开始新一轮工作...");
Sleeper.sleep(2000+ new Random().nextInt(-100, 100));
log.info("周五了,到达集合点,准备碰头");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
},"同学" + i).start();
}
}结果:
txt
11:01:43.957 [同学1] INFO : 各自开始工作...
11:01:43.957 [同学3] INFO : 各自开始工作...
11:01:43.957 [同学2] INFO : 各自开始工作...
11:01:46.026 [同学3] INFO : 周三了,到达集合点,准备碰头
11:01:46.040 [同学1] INFO : 周三了,到达集合点,准备碰头
11:01:46.045 [同学2] INFO : 周三了,到达集合点,准备碰头
11:01:46.045 [同学2] INFO : 碰头了,汇总工作结果
11:01:46.045 [同学2] INFO : 各自开始新一轮工作...
11:01:46.045 [同学3] INFO : 各自开始新一轮工作...
11:01:46.045 [同学1] INFO : 各自开始新一轮工作...
11:01:48.094 [同学3] INFO : 周五了,到达集合点,准备碰头
11:01:48.112 [同学2] INFO : 周五了,到达集合点,准备碰头
11:01:48.141 [同学1] INFO : 周五了,到达集合点,准备碰头
11:01:48.142 [同学1] INFO : 碰头了,汇总工作结果可以看到,当调用await()方法时,线程会阻塞,直到所有的等待线程都到达屏障点后,才继续执行。并且,屏障动作是由其中一个等待线程执行的。
3.2.2 破坏屏障情形一
假设现在同学1不喜欢别人迟到,每次他到集合点之后,只会等待其他同学30分钟(程序中以10毫秒计算),如果等待超时后,他就不等了,自己离开继续其他工作。那么这是,他们之间的约定就失效了(屏障破坏)。
java
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, ()->{
log.info("碰头了,汇总工作结果");
});
for (int i = 1; i <= 3; i++) {
new Thread(()->{
log.info("各自开始工作...");
// 随机数模拟有人早到,有人迟到
Sleeper.sleep(2000 + new Random().nextInt(-100, 100));
log.info("周三了,到达集合点,准备碰头");
try {
if("同学1".equals(Thread.currentThread().getName())) {
// 只等待10毫秒
cyclicBarrier.await(10, TimeUnit.MILLISECONDS);
}else{
cyclicBarrier.await();
}
} catch (InterruptedException | TimeoutException | BrokenBarrierException e) {
log.error(e.getMessage(), e);
}
log.info("各自开始新一轮工作...");
Sleeper.sleep(2000+ new Random().nextInt(-100, 100));
log.info("周五了,到达集合点,准备碰头");
try {
if("同学1".equals(Thread.currentThread().getName())) {
cyclicBarrier.await(10, TimeUnit.MILLISECONDS);
}else{
cyclicBarrier.await();
}
} catch (InterruptedException | TimeoutException | BrokenBarrierException e) {
log.error(e.getMessage(), e);
}
},"同学" + i).start();
}
}结果(部分):
txt
11:11:37.526 [同学1] INFO : 各自开始工作...
11:11:37.526 [同学2] INFO : 各自开始工作...
11:11:37.526 [同学3] INFO : 各自开始工作...
11:11:39.529 [同学1] INFO : 周三了,到达集合点,准备碰头
11:11:39.543 [同学1] ERROR : null
java.util.concurrent.TimeoutException: null
11:11:39.547 [同学1] INFO : 各自开始新一轮工作...
11:11:39.610 [同学3] INFO : 周三了,到达集合点,准备碰头
11:11:39.611 [同学3] ERROR : null
java.util.concurrent.BrokenBarrierException: null
11:11:39.611 [同学3] INFO : 各自开始新一轮工作...
11:11:39.614 [同学2] INFO : 周三了,到达集合点,准备碰头
11:11:39.614 [同学2] ERROR : null
java.util.concurrent.BrokenBarrierException: null
11:11:39.614 [同学2] INFO : 各自开始新一轮工作...
11:11:41.522 [同学1] INFO : 周五了,到达集合点,准备碰头
11:11:41.522 [同学1] ERROR : null
java.util.concurrent.BrokenBarrierException: null
11:11:41.622 [同学2] INFO : 周五了,到达集合点,准备碰头
11:11:41.623 [同学2] ERROR : null
java.util.concurrent.BrokenBarrierException: null
11:11:41.643 [同学3] INFO : 周五了,到达集合点,准备碰头
11:11:41.644 [同学3] ERROR : null
java.util.concurrent.BrokenBarrierException: null可以看到,同学1在第529毫秒到达了集合点,但是等待了10毫秒后,其他同学还没到达,约543毫秒时因等待超时离开了集合点,此时已经破坏了屏障(可以理解为在现场留下了一个纸条,说你们迟到了,我们不约了);同学2和同学3之后姗姗来迟,发现约定被破坏了,所以也不再等了。之后的约定自然而然也被破坏了,所以都会抛出错误。
3.2.3 破坏屏障情形二
假设现在同学1在到达集合点等待其他同学时,被老师打断了一下去做其他事情,之后他就忘记回去了,导致约定被破坏:
java
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, ()->{
log.info("碰头了,汇总工作结果");
});
List<Thread> threadList = new ArrayList<>();
for (int i = 1; i <= 3; i++) {
Thread thread = new Thread(() -> {
log.info("各自开始工作...");
if(!"同学1".equals(Thread.currentThread().getName())) {
// 随机数模拟有人早到,有人迟到
Sleeper.sleep(2000 + new Random().nextInt(-100, 100));
}else{
// 同学1早到,是为了保证能在await()期间被打断
Sleeper.sleep(1800);
}
log.info("周三了,到达集合点,准备碰头");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
log.error(e.getMessage(), e);
}
log.info("各自开始新一轮工作...");
Sleeper.sleep(2000 + new Random().nextInt(-100, 100));
log.info("周五了,到达集合点,准备碰头");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
log.error(e.getMessage(), e);
}
}, "同学" + i);
threadList.add(thread);
thread.start();
}
// 打断同学1
Sleeper.sleep(1900);
threadList.get(0).interrupt();
}结果:
txt
11:35:02.678 [同学1] INFO : 各自开始工作...
11:35:02.678 [同学3] INFO : 各自开始工作...
11:35:02.678 [同学2] INFO : 各自开始工作...
11:35:04.484 [同学1] INFO : 周三了,到达集合点,准备碰头
11:35:04.579 [同学1] ERROR : null
java.lang.InterruptedException: null
11:35:04.584 [同学1] INFO : 各自开始新一轮工作...
11:35:04.597 [同学3] INFO : 周三了,到达集合点,准备碰头
11:35:04.597 [同学3] ERROR : null
java.util.concurrent.BrokenBarrierException: null
11:35:04.598 [同学3] INFO : 各自开始新一轮工作...
11:35:04.625 [同学2] INFO : 周三了,到达集合点,准备碰头
11:35:04.625 [同学2] ERROR : null
java.util.concurrent.BrokenBarrierException: null
11:35:04.626 [同学2] INFO : 各自开始新一轮工作...
11:35:06.571 [同学2] INFO : 周五了,到达集合点,准备碰头
11:35:06.571 [同学2] ERROR : null
java.util.concurrent.BrokenBarrierException: null
11:35:06.628 [同学3] INFO : 周五了,到达集合点,准备碰头
11:35:06.629 [同学3] ERROR : null
java.util.concurrent.BrokenBarrierException: null
11:35:06.647 [同学1] INFO : 周五了,到达集合点,准备碰头
11:35:06.648 [同学1] ERROR : null
java.util.concurrent.BrokenBarrierException: null可以看到,同学1线程因中断破坏了屏障,导致之后屏障都不可用了。
3.2.4 屏障破坏的应对方法
如果屏障被破坏后,我们可以调用reset()方法恢复屏障,但是,调用reset()的问题是难以同步线程,容易造成线程无限期阻塞。
假设下面这种场景,第一个线程到达屏障后,由于超时或中断破坏了屏障,那么这个线程调用reset()方法恢复屏障,但是注意,此时第二个和第三个线程都还没有到达屏障,此时第一个线程继续运行,第二个线程和第三个线程到达屏障后就会阻塞等待:

对于屏障被破坏的情况,推荐使用多个屏障的方法,当某个阶段完成后,就在一个屏障点等待,另一个阶段完成后,就在另一个屏障点等待。这样,即使一个屏障被破坏,也不会影响另外的屏障。
java
public static void main(String[] args) {
CyclicBarrier cyclicBarrier1 = new CyclicBarrier(3, ()->{
log.info("周三碰头了,汇总工作结果");
});
CyclicBarrier cyclicBarrier2 = new CyclicBarrier(3, ()->{
log.info("周五碰头了,汇总工作结果");
});
List<Thread> threadList = new ArrayList<>();
for (int i = 1; i <= 3; i++) {
Thread thread = new Thread(() -> {
log.info("各自开始工作...");
if(!"同学1".equals(Thread.currentThread().getName())) {
// 随机数模拟有人早到,有人迟到
Sleeper.sleep(2000 + new Random().nextInt(-100, 100));
}else{
// 同学1早到,是为了保证能在await()期间被打断
Sleeper.sleep(1800);
}
log.info("周三了,到达集合点,准备碰头");
try {
cyclicBarrier1.await();
} catch (InterruptedException | BrokenBarrierException e) {
log.error(e.getMessage(), e);
}
log.info("各自开始新一轮工作...");
Sleeper.sleep(2000 + new Random().nextInt(-100, 100));
log.info("周五了,到达集合点,准备碰头");
try {
cyclicBarrier2.await();
} catch (InterruptedException | BrokenBarrierException e) {
log.error(e.getMessage(), e);
}
}, "同学" + i);
threadList.add(thread);
thread.start();
}
// 打断同学1
Sleeper.sleep(1900);
threadList.get(0).interrupt();
}结果:
txt
12:11:54.007 [同学3] INFO : 各自开始工作...
12:11:54.007 [同学2] INFO : 各自开始工作...
12:11:54.007 [同学1] INFO : 各自开始工作...
12:11:55.813 [同学1] INFO : 周三了,到达集合点,准备碰头
12:11:55.908 [同学1] ERROR : null
java.lang.InterruptedException: null
12:11:55.911 [同学1] INFO : 各自开始新一轮工作...
12:11:55.920 [同学2] INFO : 周三了,到达集合点,准备碰头
12:11:55.921 [同学2] ERROR : null
java.util.concurrent.BrokenBarrierException: null
12:11:55.921 [同学2] INFO : 各自开始新一轮工作...
12:11:56.072 [同学3] INFO : 周三了,到达集合点,准备碰头
12:11:56.073 [同学3] ERROR : null
java.util.concurrent.BrokenBarrierException: null
12:11:56.073 [同学3] INFO : 各自开始新一轮工作...
12:11:57.864 [同学1] INFO : 周五了,到达集合点,准备碰头
12:11:57.977 [同学2] INFO : 周五了,到达集合点,准备碰头
12:11:58.141 [同学3] INFO : 周五了,到达集合点,准备碰头
12:11:58.142 [同学3] INFO : 周五碰头了,汇总工作结果可以发现,第一个屏障被破坏后,第二个屏障也会正常工作。