Appearance
JUC 线程池
由于频繁地创建和销毁线程会消耗系统资源,所以提出了线程池的概念,通过重用线程资源,提高效率。
1. 线程池-ThreadPoolExecutor
1.1 体系介绍
在Java中,线程池相关接口和类如下所示:

图中的ThreadPoolExecutor就是我们要介绍的线程池。
1.2 构造方法
ThreadPoolExecutor完整的构造方法有7个参数:
java
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)corePoolSize: 核心线程数。线程池会维护的最少线程数量,即使这些线程是空闲的,也不会被销毁。maximumPoolSize: 最大线程数。线程池允许创建的最大线程数量,当核心线程不够用时,允许最多创建maximumPoolSize - corePoolSize数量的救急线程来执行任务。keepAliveTime: 非核心线程(救急线程)的空闲超时时间。当线程池中的线程数量大于corePoolSize时,如果一个非核心线程空闲时间超过keepAliveTime,则该线程会被终止回收。unit:keepAliveTime的时间单位。workQueue:任务队列。用于存放等待执行的任务,常用的队列类型有:ArrayBlockingQueue:基于数组的有界阻塞队列。LinkedBlockingQueue:基于链表的有界或无界阻塞队列。SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等待一个相应的移除操作,反之亦然。
threadFactory: 线程工厂。用于创建新的线程,可以自定义线程工厂来设置线程的名称、优先级等属性。handler: 拒绝策略(RejectedExecutionHandler)。当任务队列已满且线程池中的线程数量达到maximumPoolSize时,新提交的任务将无法被处理,此时会触发拒绝策略。Java 内置了几种拒绝策略,也可以自定义。内置的拒绝策略如下:AbortPolicy:直接抛出RejectedExecutionException运行时异常,这是默认的拒绝策略。DiscardPolicy:丢弃新添加的任务,不进行任何处理,也不抛出异常。DiscardOldestPolicy:丢弃任务队列中最旧(等待时间最长)的任务,然后尝试重新提交当前被拒绝的任务。CallerRunsPolicy:不会抛弃任务,也不会抛出异常,而是由提交任务的线程(调用execute()方法的线程)来执行这个被拒绝的任务。
1.3 提交任务
当我们创建完线程池后,就可以向池中提交任务了,提交任务主要有两种方式:
execute(Runnable command):这个方法用于执行一个不需要返回结果的任务。它不返回任何结果,也无法直接判断任务是否执行完成。submit(Callable<T> task):这个方法用于执行一个需要返回结果的任务。这个方法返回Future对象,后续可以使用Future对象来获取任务结果。
例如,下面的代码展示了如何使用execute提交任务:
java
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,
2,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2));
threadPoolExecutor.execute(()->{
log.info("begin task 1");
Sleeper.sleep(1000);
log.info("end task 1");
});
threadPoolExecutor.execute(()->{
log.info("begin task 2");
Sleeper.sleep(2000);
log.info("end task 2");
});
}当然,我们可以使用submit提交任务,并且从Future中获取结果(关于Future知识,我们在下一篇文章中介绍):
java
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,
2,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2));
Future<String> stringFuture = threadPoolExecutor.submit(() -> {
log.info("begin submit task 1");
Sleeper.sleep(1000);
log.info("end submit task 1");
return "hello world";
});
// 阻塞获取结果
log.info("获取结果: ");
String s = stringFuture.get();
log.info(s);
}结果如下:
txt
14:01:52.934 [pool-1-thread-1] INFO : begin submit task 1
14:01:52.934 [main] INFO : 获取结果:
14:01:53.940 [pool-1-thread-1] INFO : end submit task 1
14:01:53.942 [main] INFO : hello world1.4 submit()处理异常
当我们使用submit()提交任务,但是任务执行过程中发生了异常,线程池并不会抛出这个异常,所以我们需要处理异常。现象演示如下:
java
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
1,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2));
Future<Integer> integerFuture = threadPoolExecutor.submit(() -> {
log.info("begin task 1");
int i = 10 / 0;
log.info("end task 1");
return 1;
});
Sleeper.sleep(1000);
threadPoolExecutor.execute(()->{
log.info("begin task 2");
Sleeper.sleep(500);
log.info("end task 2");
});
}结果:
txt
14:07:17.495 [pool-1-thread-1] INFO : begin task 1
14:07:18.501 [pool-1-thread-1] INFO : begin task 2
14:07:19.007 [pool-1-thread-1] INFO : end task 2处理方式有两种:
手动处理:自己使用
try-catch块包住任务,手动处理。javaFuture<Integer> integerFuture = threadPoolExecutor.submit(() -> { Integer result = null; try { log.info("begin task 1"); int i = 10 / 0; log.info("end task 1"); result = i; }catch (Exception e){ e.printStackTrace(); } return result; });利用
Future对象处理:当我们想获取任务结果时,调用future.get()方法,此方法会抛出任务执行过程中的异常:javaFuture<Integer> integerFuture = threadPoolExecutor.submit(() -> { log.info("begin task 1"); int i = 10 / 0; log.info("end task 1"); return i; }); // get() 会抛出异常ExecutionException,其中封装了任务执行过程中出现的异常 Integer i = integerFuture.get(); log.info("{}", i);
1.5 线程池执行任务流程
当通过 executor.execute(task) 或 executor.submit(task) 提交一个任务时,线程池会按照如下流程执行:
- 检查当前运行线程数是否少于核心线程数 (
corePoolSize):- 当一个新任务到来时,线程池首先会判断当前正在运行的线程数量是否小于其核心线程数 (
corePoolSize)。 - 如果小于
corePoolSize: 线程池会创建一个新的工作线程来执行这个新提交的任务。即使当前有其他空闲的核心线程,它也倾向于创建新线程(除非使用了prestartCoreThread()或prestartAllCoreThreads()预创建了核心线程)。新创建的线程会立即启动并执行该任务。
- 当一个新任务到来时,线程池首先会判断当前正在运行的线程数量是否小于其核心线程数 (
- 如果当前运行线程数大于或等于核心线程数 (
corePoolSize):- 线程池会判断任务队列 (
workQueue) 是否已满。 - 如果任务队列未满: 新提交的任务会被放入任务队列中等待。线程池中的核心线程(以及后续可能创建的非核心线程)会从队列中取出任务并执行。
- 如果任务队列已满: 线程池会判断当前运行的线程数量是否少于最大线程数 (
maximumPoolSize)。
- 线程池会判断任务队列 (
- 如果任务队列已满且当前运行线程数少于最大线程数 (
maximumPoolSize):- 线程池会创建一个新的非核心工作线程来执行这个新提交的任务。
- 这些非核心线程是为了处理突发的高并发任务量而创建的。
- 如果任务队列已满且当前运行线程数等于最大线程数 (
maximumPoolSize):- 此时,线程池已经达到其处理能力的上限(核心线程都在忙或等待队列中的任务,队列已满,非核心线程也已达到最大数量)。
- 新提交的任务会被拒绝。
- 线程池会根据预设的**拒绝策略(RejectedExecutionHandler)**来处理这个被拒绝的任务。
1.6 关闭线程池
运行上面的代码,我们发现程序不会停止下来,这是由于我们没有关闭线程池。
默认情况下,线程工厂使用的是Executors.defaultThreadFactory(),其实现如下:
java
private static class DefaultThreadFactory implements ThreadFactory {
// 省略其他代码
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}可以发现,默认线程池中的线程都是非守护线程,所以不会自动关闭。
因此,使用完线程池后,需要我们手动关闭线程池,关闭方法有如下两个:
shutdown():启动一个平缓的关闭过程。一旦调用了这个方法,线程池将不再接受新的任务。但是,它会继续执行已经提交到队列中以及当前正在执行的任务,直到所有任务都完成。并且,该方法不会阻塞调用线程的执行。shutdownNow():shutdownNow()方法会尝试立即停止所有正在执行的任务,并且不会处理等待队列中的任务,返回一个尚未执行的任务列表。
下面的代码演示了shutdown()的用法:
java
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
1,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2));
threadPoolExecutor.execute(()->{
log.info("begin task 1");
Sleeper.sleep(1000);
log.info("end task 1");
});
threadPoolExecutor.execute(()->{
log.info("begin task 2");
Sleeper.sleep(2000);
log.info("end task 2");
});
Sleeper.sleep(500);
log.info("关闭线程池");
threadPoolExecutor.shutdown();
log.info("shutdown()不会阻塞主线程,可以执行其他任务");
}结果如下:
txt
14:35:25.633 [pool-1-thread-1] INFO : begin task 1
14:35:26.138 [main] INFO : 关闭线程池
14:35:26.139 [main] INFO : shutdown()不会阻塞主线程,可以执行其他任务
14:35:26.639 [pool-1-thread-1] INFO : end task 1
14:35:26.639 [pool-1-thread-1] INFO : begin task 2
14:35:28.640 [pool-1-thread-1] INFO : end task 2下面的代码演示了shutdownNow()的用法:
java
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
1,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2));
threadPoolExecutor.execute(()->{
log.info("begin task 1");
Sleeper.sleep(1000);
log.info("end task 1");
});
threadPoolExecutor.execute(()->{
log.info("begin task 2");
Sleeper.sleep(2000);
log.info("end task 2");
});
Sleeper.sleep(500);
log.info("关闭线程池");
List<Runnable> runnables = threadPoolExecutor.shutdownNow();
log.info("shutdownNow()不会阻塞主线程,可以执行其他任务, 返回值:{}", runnables);
}结果:
txt
14:39:26.221 [pool-1-thread-1] INFO : begin task 1
14:39:26.722 [main] INFO : 关闭线程池
14:39:26.723 [main] INFO : shutdownNow()不会阻塞主线程,可以执行其他任务, 返回值:[juc.p7.PoolDemo01$$Lambda$124/0x00000003010d95e8@1efee8e7]
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: java.lang.InterruptedException: sleep interrupted可以看到,shutdownNow()打断了正在执行的任务(如任务1),并且不会再执行等待队列中的任务。
优雅关闭线程池的做法
在调用 shutdown() 方法后,如果需要等待所有任务执行完毕再进行后续操作(例如主线程需要等待所有子任务完成后再退出),可以使用 awaitTermination() 方法:
boolean awaitTermination(long timeout, TimeUnit unit):阻塞当前线程,直到线程池达到终止状态,或者等到指定的时间超时。- 参数:
timeout:等待的最大时间量。unit:timeout参数的时间单位。
- 返回值: 如果线程池在超时时间到达之前终止,则返回
true,说明此时任务已全部完成;否则返回false,说明等待超时后任务还没完成。如果在等待期间当前线程被中断,则抛出InterruptedException。
- 参数:
使用两阶段终止线程池,首先调用shutdown()方法,然后调用awaitTermination()等待一段时间,如果在等待期间线程关闭,则退出,如果等待超时,则直接调用shutdownNow()立即关闭线程池。
java
void shutdownAndAwaitTermination(ExecutorService pool) {
// 关闭线程池
pool.shutdown();
try {
// 等待一段时间(60秒)
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
// 等待超时,线程池还在工作,则调用shutdownNow()立即停止线程池
pool.shutdownNow();
// 再等待一段时间,停止线程池
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// 在等待期间被打断,直接调用shutdownNow()立即停止线程池
pool.shutdownNow();
// 保持打断标志
Thread.currentThread().interrupt();
}
}以上代码来源:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
1.7 线程池状态
ThreadPoolExecutor的状态由线程池内部的一个ctl变量(AtomicInteger 类型)来控制,这个变量高3位表示线程池状态,低29位表示线程数。
状态表如下:
| 状态名 | 值 | 是否可以接收新任务 | 是否可以处理阻塞队列任务 | 说明 |
|---|---|---|---|---|
| RUNNING | 111 | 是 | 是 | 运行中状态 |
| SHUTDOWN | 000 | 否 | 是 | 关闭状态,当调用 shutdown() 方法时,线程池会进入这个状态。不会接受新任务,会处理阻塞队列剩余的任务 |
| STOP | 001 | 否 | 否 | 停止状态,当调用 shutdownNow() 方法时,线程池会进入这个状态。会中断正在执行的任务,并抛弃阻塞队列中的任务 |
| TIDYING | 010 | - | - | 整理状态,当线程池处于 SHUTDOWN 状态且任务队列为空,并且所有工作线程都已执行完毕时,或者线程池处于 STOP 状态且所有工作线程都已终止时,线程池会进入 TIDYING 状态。在这个状态下,会执行 terminated() 钩子方法 |
| TERMINATED | 011 | - | - | 终结状态 |
可以通过如下方法判断线程池状态:
boolean isShutdown(): 判断线程池是否已经调用了shutdown()或shutdownNow()方法。一旦调用了其中一个方法,该方法就返回true。boolean isTerminated(): 判断线程池是否已经完全终止。只有当所有任务都完成并且所有工作线程都已关闭时,该方法才返回true。
为什么要将线程池状态和线程数量放在一个AtomicInteger中呢?是为了一次CAS操作进行赋值:
java
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}2. 任务调度线程池
2.1 介绍
任务调度线程池(Scheduled Thread Pool)是 Java 并发包 java.util.concurrent 中提供的一种特殊类型的线程池,它在普通线程池的功能基础上,增加了任务的延迟执行和周期性执行的能力。
在 Java 中,任务调度线程池的主要实现类是 ScheduledThreadPoolExecutor,它实现了 ScheduledExecutorService 接口。
ScheduledThreadPoolExecutor提供的完整构造方法如下:
java
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}corePoolSize:核心线程数。threadFactory:线程工厂,用于创建线程,可以设置线程名称、优先级等。handler:拒绝策略。
默认情况下,任务调度线程池支持最大Integer.MAX_VALUE个线程,并且空闲线程的存活时间是10毫秒(DEFAULT_KEEPALIVE_MILLIS值为10)。
下面会介绍不同的调度方法。
Java原生的ScheduledThreadPoolExecutor不支持使用 Cron 表达式来实现复杂的定时任务调度,如果要使用Cron表达式,则可以使用Quartz或Spring框架。
2.2 schedule()
schedule()用于在指定延时后执行一个任务,根据是否有返回结果,有两种形式:
schedule(Runnable command, long delay, TimeUnit unit):排一个Runnable任务在指定的delay延迟时间后执行一次。<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit):安排一个Callable任务在指定的delay延迟时间后执行一次,并返回一个ScheduledFuture,可以通过它获取任务的返回值。
代码示例:
java
public static void main(String[] args) {
// 创建只有一个线程的任务调度线程池
ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(1);
log.info("开始");
// 1秒钟后执行任务一
threadPoolExecutor.schedule(()->{
log.info("begin task 1");
Sleeper.sleep(2000); // 睡眠2秒
log.info("end task 1");
}, 1, TimeUnit.SECONDS);
// 1秒钟后执行任务二
threadPoolExecutor.schedule(()->{
log.info("task 2");
}, 1, TimeUnit.SECONDS);
// 关闭线程池
threadPoolExecutor.shutdown();
}结果如下:
txt
15:44:44.295 [main] INFO : 开始
15:44:45.300 [pool-1-thread-1] INFO : begin task 1
15:44:47.303 [pool-1-thread-1] INFO : end task 1
15:44:47.305 [pool-1-thread-1] INFO : task 2可以看到,1秒钟后执行任务一,但是任务一需要2秒钟执行完,由于只有1个线程,所以任务二需要等任务一执行完毕后才开始执行,此时任务二并不是一秒后才开始执行的。
并且,即使任务一抛出了异常,那么也不会影响任务二的执行:
Details
java
public static void main(String[] args) {
// 创建只有一个线程的任务调度线程池
ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(1);
log.info("开始");
// 1秒钟后执行任务一
threadPoolExecutor.schedule(()->{
log.info("begin task 1");
Sleeper.sleep(2000); // 睡眠2秒
int i = 10/0;
log.info("end task 1");
}, 1, TimeUnit.SECONDS);
// 1秒钟后执行任务二
threadPoolExecutor.schedule(()->{
log.info("task 2");
}, 1, TimeUnit.SECONDS);
threadPoolExecutor.shutdown();
}txt
15:48:44.762 [main] INFO : 开始
15:48:45.769 [pool-1-thread-1] INFO : begin task 1
15:48:47.776 [pool-1-thread-1] INFO : task 22.3 scheduleAtFixedRate()
完整形式如下:
java
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)创建并执行一个周期性任务,首次执行延迟 initialDelay 时间后开始,之后以固定的 频率 period 重复执行。这意味着下一次任务的开始时间是基于上一次任务的开始时间计算的。如果任务执行时间超过了 period,下一次任务会在当前任务执行完毕后立即开始(不会并发执行)。
下面的代码演示每秒钟输出当前时间:
java
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
scheduledThreadPoolExecutor.scheduleAtFixedRate(()->{
log.info(LocalTime.now().toString());
},0, 1, TimeUnit.SECONDS);
}下面的代码演示每秒钟输出"1",但是每次输出需要等待2秒,结果是每2秒钟才会输出"1":
java
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);
scheduledThreadPoolExecutor.scheduleAtFixedRate(()->{
log.info("1");
Sleeper.sleep(2000);
},0, 1, TimeUnit.SECONDS);
}证明了如果任务执行时间超过了 period,下一次任务会在当前任务执行完毕后立即开始(不会并发执行),即时线程池中线程数量大于1(如上线程数量为2)。
2.4 scheduleWithFixedDelay()
完整形式如下:
java
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)创建并执行一个周期性任务,首次执行延迟 initialDelay 时间后开始,之后以上一次任务执行完毕到下一次任务开始之间固定的 延迟 delay 重复执行。
下面的代码演示每秒钟输出"1",但是每次输出需要等待2秒,结果是每3秒钟才会输出"1":
java
public static void main(String[] args) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(2);
scheduledThreadPoolExecutor.scheduleWithFixedDelay(()->{
log.info("1");
Sleeper.sleep(2000);
},0, 1, TimeUnit.SECONDS);
}3. Executors
Executors 是 Java java.util.concurrent 包中的一个工具类,它提供了一系列静态方法,用于简化线程池的创建。
3.1 创建线程池
newFixedThreadPool(int nThreads): 创建一个拥有固定线程数的线程池。核心线程数和最大线程数都等于nThreads,使用无界队列LinkedBlockingQueue。适合处理已知并发量的任务。newCachedThreadPool(): 创建一个可缓存的线程池。核心线程数为 0,最大线程数为Integer.MAX_VALUE,使用同步队列SynchronousQueue。根据需要创建新线程,空闲线程会快速回收。适合处理大量短时任务。newSingleThreadExecutor(): 创建一个单线程的线程池。只有一个工作线程,使用无界队列LinkedBlockingQueue。保证所有任务按照提交顺序依次执行。适合需要保证任务顺序执行的场景。
3.2 创建任务调度线程池
newScheduledThreadPool(int corePoolSize): 创建一个支持定时及周期性任务执行的固定大小线程池。适合需要延迟或定期执行任务的场景。newSingleThreadScheduledExecutor(): 创建一个单线程的支持定时及周期性任务执行的线程池。
3.3 其他方法
defaultThreadFactory(): 返回用于创建新线程的默认线程工厂。
3.4 注意事项
需要注意的是,虽然 Executors 提供了方便的创建方法,但在生产环境中,特别是一些对线程池参数有严格要求的场景,更推荐直接使用 ThreadPoolExecutor 的构造函数,以便更精细地控制线程池的核心线程数、最大线程数、队列类型、拒绝策略等,以避免潜在的问题(例如,newFixedThreadPool 和 newSingleThreadExecutor 使用的无界队列可能导致内存溢出)。
4. Fork/Join线程池
4.1 介绍
Fork/Join 线程池是 Java 在 JDK 7 中引入的一种专门用于并行计算的线程池框架,它设计用来高效地解决可以被递归地分解成更小任务的问题(即“分治”思想)。
例如,现在有一个大任务,可以分为两个小任务:任务1和任务2,分别耗时1秒和2秒,如果交由一个线程t1执行,那么总共耗时1+2=3秒中,如果线程t1将这大任务分为两个小任务,并把这两个小任务分别交由线程t2和线程t3执行(拆分),最后将两个小任务的结果合并起来(合并)组合成最终结果,那么总共耗时约2秒钟,提升了效率。
所以使用Fork/Join线程池的关键是任务可拆分。
分(Fork): 将一个大任务递归地分解成更小的子任务,直到子任务足够小,可以直接处理。
合(Join): 等待所有子任务执行完毕,然后将它们的计算结果合并,得到最终结果。
使用Fork/Join线程池有两个重要实体:
ForkJoinTask(任务):表示可拆分的任务,有两个主要的抽象子类,用于定义不同类型的任务:
RecursiveAction: 用于没有返回结果的任务。它重写compute()方法来执行任务逻辑,并在需要分解时调用fork()方法提交子任务。RecursiveTask<V>: 用于有返回结果的任务。它重写compute()方法执行任务逻辑并返回结果,在需要分解时调用fork()提交子任务,并使用join()方法等待子任务完成并获取其结果。
ForkJoinPool(线程池):专门用于执行
ForkJoinTask任务- 采用了一种称为**工作窃取(Work-Stealing)**的算法。与传统线程池共享一个任务队列不同,
ForkJoinPool中的每个工作线程通常都有自己的双端队列(Deque)。 - 当一个工作线程完成自己队列中的任务后,它不会空闲下来,而是会尝试从其他工作线程的队列的尾部“窃取”任务来执行。这种机制有助于平衡线程之间的负载,提高处理器的利用率。
- 采用了一种称为**工作窃取(Work-Stealing)**的算法。与传统线程池共享一个任务队列不同,
4.2 使用案例
下面以实际案例介绍如何使用Fork/Join线程池计算斐波那契数列。
在数学上,斐波那契数是以递归的方法来定义:
- F0=0
- F1=1
- F(n)=F(n−1)+F(n−2)
用白话文来说,就是斐波那契数列由0和1开始,之后的斐波那契数就是由之前的两数相加而得出。
首先定义任务类:
java
@Slf4j
class FibonacciTask extends RecursiveTask<Integer>{
private int n;
public FibonacciTask(int n) {
this.n = n;
}
@Override
protected Integer compute() {
log.info("开始执行,n = {}", n);
if (n <= 1) {
return n;
}
FibonacciTask task1 = new FibonacciTask(n-1);
task1.fork(); // 拆分任务,让另一个线程去执行
FibonacciTask task2 = new FibonacciTask(n-2);
task2.fork(); // 拆分任务,让另一个线程去执行
// join() 等待子任务执行完毕返回结果,将两个小任务的结果合并起来
int result = task1.join() + task2.join();
return result;
}
}然后使用ForkJoinPool执行任务:
java
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(5);
Integer result = forkJoinPool.invoke(new FibonacciTask(5));
log.info("{}", result);
}结果如下:
java
16:44:04.064 [ForkJoinPool-1-worker-1] INFO : 开始执行,n = 5
16:44:04.065 [ForkJoinPool-1-worker-1] INFO : 开始执行,n = 4
16:44:04.065 [ForkJoinPool-1-worker-2] INFO : 开始执行,n = 3
16:44:04.066 [ForkJoinPool-1-worker-1] INFO : 开始执行,n = 3
16:44:04.066 [ForkJoinPool-1-worker-3] INFO : 开始执行,n = 2
16:44:04.066 [ForkJoinPool-1-worker-3] INFO : 开始执行,n = 1
16:44:04.066 [ForkJoinPool-1-worker-2] INFO : 开始执行,n = 2
16:44:04.066 [ForkJoinPool-1-worker-4] INFO : 开始执行,n = 1
16:44:04.066 [ForkJoinPool-1-worker-3] INFO : 开始执行,n = 0
16:44:04.066 [ForkJoinPool-1-worker-2] INFO : 开始执行,n = 1
16:44:04.066 [ForkJoinPool-1-worker-4] INFO : 开始执行,n = 0
16:44:04.066 [ForkJoinPool-1-worker-3] INFO : 开始执行,n = 2
16:44:04.066 [ForkJoinPool-1-worker-3] INFO : 开始执行,n = 1
16:44:04.066 [ForkJoinPool-1-worker-1] INFO : 开始执行,n = 0
16:44:04.066 [ForkJoinPool-1-worker-2] INFO : 开始执行,n = 1
16:44:04.066 [main] INFO : 5可以发现,拆分出来的子任务,交给了不同的线程去运行。