Appearance
JUC Future和CompletableFuture
本文讲解Future和CompletableFuture的使用。
1. Callable接口
在前面介绍的多线程编程中,我们都是去实现Runnable接口:
java
@FunctionalInterface
public interface Runnable {
public abstract void run();
}其中的run()方法没有返回值,如果我们要传递结果,则需要使用保护性暂停模式。
那么,有没有一种方式,可以让异步线程直接返回结果呢?当然有,就是Callable接口:
java
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}在Callable接口中,不仅能返回结果,也可以抛出异常。
我们再回到Thread的构造方法,发现其不接受Callable接口参数,那么如何将Callable接口与Thread联系起来呢?

在Runnable接口的子类中,有一个RunnableFuture子类,并且该类有一个实现类FutureTask,该实现类构造方法接受Callable:
java
public FutureTask(Callable<V> callable)至此,我们就通过FutureTask将Callable与Thread联系起来了。
下面以一个例子演示:
java
@Slf4j
public class FutureDemo01 {
public static void main(String[] args) {
FutureTask<Integer> futureTask = new FutureTask<>(new MyTask());
Thread thread = new Thread(futureTask, "t1");
thread.start();
}
}
@Slf4j
class MyTask implements Callable<Integer>{
@Override
public Integer call() throws Exception {
log.info("begin call");
Sleeper.sleep(1000);
log.info("end call");
return 1;
}
}运行上述代码,结果如下:
txt
12:42:34.343 [t1] INFO : begin call
12:42:35.350 [t1] INFO : end call可见成功启动了线程。
2.Future是什么
2.1 Future接口介绍
我们说引入Callable接口是为了获得任务执行结果,那么上面的代码中,任务执行了,那结果在哪呢?
结果就在Future对象中。在Future接口中,定义了5个方法,用来判断任务是否执行完毕、中断任务执行以及获取结果:
java
public interface Future<V> {
boolean isDone();
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}isDone():用来检查任务是否已经完成。任务完成可能因为正常结束、抛出异常或被取消。返回
true表示任务已完成,false表示任务还在进行中。cancel(boolean mayInterruptIfRunning):尝试取消正在执行的任务。mayInterruptIfRunning参数的含义是:如果任务当前正在运行,是否应该中断执行任务的线程。true: 如果任务正在运行,会尝试中断该任务的线程。false: 如果任务正在运行,则不会中断其线程;只有在任务尚未启动时,取消操作才会成功。
返回
true表示任务成功取消(或之前已经完成或取消),false表示取消失败(通常是因为任务已经完成或者无法取消)。boolean isCancelled():用来检查任务在正常完成之前是否被取消了。- 返回
true表示任务被取消,false表示任务没有被取消(可能已完成或还在进行)。
- 返回
V get():阻塞方法。用于获取任务的计算结果。该方法会一直阻塞当前线程,直到任务完成并返回结果。
如果任务在执行过程中抛出了异常,
get()方法会抛出ExecutionException,该异常会封装实际的任务异常。如果任务被中断,
get()方法会抛出InterruptedException。
V get(long timeout, TimeUnit unit):带超时时间的阻塞方法。用于在指定的时间内获取任务的计算结果。如果在指定时间内任务完成,则返回结果。
如果在指定时间内任务未完成,则抛出
TimeoutException。
2.2 Future接口使用
2.2.1 get()
下面以上面的代码为基础,来演示Future接口来获取任务结果:
java
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(new MyTask());
Thread thread = new Thread(futureTask, "t1");
thread.start();
log.info("主线程获取任务结果");
Integer i = futureTask.get();
log.info("结果:{}", i);
}结果如下:
txt
12:52:03.147 [t1] INFO : begin call
12:52:03.147 [main] INFO : 主线程获取任务结果
12:52:04.153 [t1] INFO : end call
12:52:04.154 [main] INFO : 结果:1可以看到,主线程要等任务结束后才能获取到结果,所以调用get()后阻塞了1秒钟。
2.2.2 cancel()
我们也可以使用cancel()方法来中断任务的执行:
java
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(new MyTask());
Thread thread = new Thread(futureTask, "t1");
thread.start();
Sleeper.sleep(500);
boolean cancel = futureTask.cancel(true);
log.info("cancel result : {}", cancel);
boolean cancelled = futureTask.isCancelled();
log.info("isCancelled : {}", cancelled);
boolean done = futureTask.isDone();
log.info("is doned : {}", done);
Integer i = futureTask.get();
}结果如下:
java
12:57:14.235 [t1] INFO : begin call
12:57:14.737 [main] INFO : cancel result : true
12:57:14.739 [main] INFO : isCancelled : true
12:57:14.740 [main] INFO : is doned : true
Exception in thread "main" java.util.concurrent.CancellationException可以看到调用cancel()成功中断了任务的执行,并且中断任务执行后,再次使用get()方法获取结果时,会抛出CancellationException异常。
2.3 Future接口缺点分析
虽然使用Future接口可以完成很多事,比如中断异步任务的执行、获取异步任务的结果,但是仍然存在以下问题:
阻塞的
get()方法: 这是最主要的局限性。当调用get()方法时,当前线程会被阻塞,这在某些场景下可能不是最优的。无法在不阻塞的情况下知道结果何时可用,也无法方便地注册一个回调函数在任务完成时自动执行。难以进行复杂的异步流程控制: 如果有多个相互依赖的异步任务(比如任务B需要任务A的结果才能开始),使用
Future很难优雅地实现这种链式或组合的操作。通常需要手动管理这些Future,在需要时调用get(),这容易导致阻塞或复杂的逻辑。异常处理不直观: 任务内部抛出的异常被包装在
ExecutionException中,需要额外的代码来解包。
3. CompletableFuture
为了解决 Future 的这些局限性,Java 8 引入了 CompletableFuture。CompletableFuture 提供了更强大的功能,支持非阻塞的回调、任务的组合、异常处理的链式调用等,极大地简化了异步编程。
3.1 介绍
CompletableFuture是一个类,实现了Future和CompletionStage接口,简单来说,CompletableFuture 代表了一个可能还没有完成的异步操作的结果,并且允许你注册回调函数(actions),在操作完成时自动执行这些回调。它比 Future 更灵活和强大,因为它支持非阻塞的操作链、组合多个异步操作,以及更优雅的异常处理。
3.2 运行异步任务
在CompletableFuture中,运行异步任务有两种方式(都为静态方法):
- 异步任务没有返回值:
public static CompletableFuture<Void> runAsync(Runnable runnable):使用默认的线程池运行没有返回值的异步任务;public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor):使用指定的线程池运行没有返回值的异步任务;
- 异步任务有返回值:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier):使用默认的线程池运行有返回值的异步任务;public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor):使用指定的线程池运行有返回值的异步任务;
下面的例子展示了运行没有返回值的异步任务:
java
public static void main(String[] args) {
// 默认线程池运行
CompletableFuture.runAsync(() -> {
log.info("task 1");
});
// 指定线程池运行
ExecutorService threadPool = Executors.newFixedThreadPool(2);
try {
CompletableFuture.runAsync(() -> {
log.info("task 2");
}, threadPool);
}finally {
threadPool.shutdown();
}
}结果如下:
txt
13:13:02.263 [ForkJoinPool.commonPool-worker-1] INFO : task 1
13:13:02.263 [pool-1-thread-1] INFO : task 2可以看到,默认的线程池是ForkJoinPool。
下面的例子展示了运行有返回值的异步任务:
java
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 默认线程池运行
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
log.info("task 3");
return 3;
});
// CompletableFuture实现了Future接口,所以也可以使用get()方法
Integer r1 = completableFuture1.get();
log.info("r1: {}", r1);
// 指定线程池运行
ExecutorService threadPool = Executors.newFixedThreadPool(2);
try {
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
log.info("task 4");
return 4;
}, threadPool);
Integer r2 = completableFuture2.get();
log.info("r2: {}", r2);
}finally {
threadPool.shutdown();
}
}结果如下:
txt
13:16:23.768 [ForkJoinPool.commonPool-worker-1] INFO : task 3
13:16:23.769 [main] INFO : r1: 3
13:16:23.770 [pool-1-thread-1] INFO : task 4
13:16:23.770 [main] INFO : r2: 43.3 获取结果
除了可以使用get()获取结果,也可以使用getNow()获取结果。
public T getNow(T valueIfAbsent):如果任务已完成,则返回任务结果,否则返回valueIfAbsent。
相比于get(),getNow()不会阻塞调用线程。
3.4 注册回调函数
当异步任务执行完成后,如果允许自动执行一些操作,那么这些操作就称为回调函数。
在CompletableFuture中注册回调函数使用whenComplete()方法,该方法接收BiConsumer<? super T, ? super Throwable>为参数,第一个为任务返回值,第二个为执行任务期间抛出的异常。
示例如下:
java
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 默认线程池运行
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
log.info("task 3");
Sleeper.sleep(1000);
int r = new Random().nextInt(10);
if(r < 5) {
throw new RuntimeException("发生异常");
}
return 3;
}).whenComplete((v,e)->{
if(e == null){
log.info("v: {}", v);
}else{
e.printStackTrace();
}
});
// 由于默认线程池中的线程是守护线程,所以主线程结束后,整个程序就退出了,此处让主线程休眠
Sleeper.sleep(2000);
}多次运行上述代码,会出现以下情况:
txt
14:57:00.612 [ForkJoinPool.commonPool-worker-1] INFO : task 3
java.util.concurrent.CompletionException: java.lang.RuntimeException: 发生异常txt
14:58:15.266 [ForkJoinPool.commonPool-worker-1] INFO : task 3
14:58:16.272 [ForkJoinPool.commonPool-worker-1] INFO : v: 33.5 异常处理
虽然在whenComplete()方法中可以处理异常,但是异常情况和正常情况逻辑混合在一起,不够直观,所以CompletableFuture还提供了异常情况下的回调函数:exceptionally(),该方法接收Function<Throwable, ? extends T> fn参数,其中消费异常,返回正常值。
代码示例如下:
java
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 默认线程池运行
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
log.info("task 3");
Sleeper.sleep(1000);
int r = new Random().nextInt(10);
if(r < 5) {
throw new RuntimeException("发生异常");
}
return 3;
}).whenComplete((v,e)->{
// 正常逻辑逻辑
if(e == null){
log.info("v: {}", v);
}
}).exceptionally(e->{
// 异常逻辑处理
e.printStackTrace();
return null;
});
// 由于默认线程池中的线程是守护线程,所以主线程结束后,整个程序就退出了,此处让主线程休眠
Sleeper.sleep(2000);
}3.6 链式执行任务
如果要求任务B在任务A完成之后执行,那么可以用以下方式:
thenApply(Function<T, R>): 当任务A 完成后,将其结果(类型T)传递给任务B,开始执行任务B,并且任务B也要返回结果(类型T)。thenAccept(Consumer<T>): 当 任务A 完成后,将其结果(类型T)传递给任务B,任务B不返回结果。thenRun(Runnable): 当任务A完成后,开始执行任务B,其中任务B的执行不依赖于任务A的结果,并且任务B不返回结果。
接下来以代码示例演示上面方法的用法:
java
public static void main(String[] args) {
CompletableFuture.supplyAsync(()->{
log.info("task 1");
return 1;
}).thenApply(v->{
log.info("task 2 , accept : {}", v);
return 2;
}).thenApply(v->{
log.info("task 3 , accept : {}", v);
return 3;
});
Sleeper.sleep(1000);
}
// 结果如下:
//15:20:14.688 [ForkJoinPool.commonPool-worker-1] INFO : task 1
//15:20:14.688 [ForkJoinPool.commonPool-worker-1] INFO : task 2 , accept : 1
//15:20:14.689 [ForkJoinPool.commonPool-worker-1] INFO : task 3 , accept : 2java
public static void main(String[] args) {
CompletableFuture.supplyAsync(()->{
log.info("task 1");
return 1;
}).thenAccept(v->{
log.info("task 2 , accept : {}", v);
}).thenAccept(v->{
log.info("task 3 , accept : {}", v);
});
Sleeper.sleep(1000);
}
// 结果如下:
// 15:21:54.533 [ForkJoinPool.commonPool-worker-1] INFO : task 1
// 15:21:54.533 [ForkJoinPool.commonPool-worker-1] INFO : task 2 , accept : 1
// 15:21:54.534 [ForkJoinPool.commonPool-worker-1] INFO : task 3 , accept : nulljava
public static void main(String[] args) {
CompletableFuture.supplyAsync(()->{
log.info("task 1");
return 1;
}).thenRun(()->{
log.info("task 2");
}).thenRun(()->{
log.info("task 3");
});
Sleeper.sleep(1000);
}
// 结果如下:
// 15:24:02.530 [ForkJoinPool.commonPool-worker-1] INFO : task 1
// 15:24:02.531 [ForkJoinPool.commonPool-worker-1] INFO : task 2
// 15:24:02.531 [ForkJoinPool.commonPool-worker-1] INFO : task 3注意,在上面的代码中(thenApply()、thenAccept()、thenRun()),如果链式调用中的某一步出现了异常,则后面的任务不会执行。
例如:
java
public static void main(String[] args) {
CompletableFuture.supplyAsync(()->{
log.info("task 1");
return 1;
}).thenAccept((v)->{
log.info("task 2");
int i = 10 / 0;
}).thenAccept((v)->{
log.info("task 3");
}).exceptionally(e->{
e.printStackTrace();
return null;
});
Sleeper.sleep(1000);
}结果如下(部分):
txt
15:27:27.339 [ForkJoinPool.commonPool-worker-1] INFO : task 1
15:27:27.340 [ForkJoinPool.commonPool-worker-1] INFO : task 2
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero可以看到任务三并没有执行。
如果要让链式执行任务过程中,即使某个任务出现了异常,后续任务也能继续执行,我们可以使用handle()方法,该方法接收BiFunction<? super T, Throwable, ? extends U>,即上一步的结果与异常,我们可以在下一步任务中判断是否有异常,然后执行不同的逻辑。
java
public static void main(String[] args) {
CompletableFuture.supplyAsync(()->{
log.info("task 1");
return 1;
}).handle((v,e)->{
if(e == null) {
log.info("task 2");
int i = 10 / 0;
return 2;
}else{
log.info("task 2: task 1 occur exception - {}", e.getMessage());
return -1;
}
}).handle((v,e)->{
if(e == null) {
log.info("task 3");
return 3;
}else{
log.info("task 3: task 2 occur exception - {}", e.getMessage());
return -1;
}
}).exceptionally(e->{
e.printStackTrace();
return null;
});
Sleeper.sleep(1000);
}结果如下:
txt
15:32:35.458 [ForkJoinPool.commonPool-worker-1] INFO : task 1
15:32:35.459 [ForkJoinPool.commonPool-worker-1] INFO : task 2
15:32:35.459 [ForkJoinPool.commonPool-worker-1] INFO : task 3: task 2 occur exception - java.lang.ArithmeticException: / by zero3.7 任务竞争
CompletableFuture 中的 applyToEither 方法是用于处理“二者择一”的场景。
它的作用是:当调用 applyToEither 方法的当前 CompletableFuture 或者 作为参数传递的 other 那个 CompletionStage 中有任何一个正常完成时,就将先完成的那个 Future 的结果作为输入,应用到一个指定的 Function 函数上,并将函数返回的结果作为新的 CompletableFuture 的结果。
简单来说,它实现了两个异步任务之间的**“赛跑”(race)**:谁先完成,就取谁的结果(经过函数处理后)作为最终的结果。
比如,现在有两个选手(A和B)比赛赛跑,谁先到达终点为冠军,那么可以用代码模拟如下:
java
public static void main(String[] args) {
CompletableFuture<String> playerA = CompletableFuture.supplyAsync(() -> {
log.info("player A start");
Sleeper.sleep(new Random().nextInt(1000));
log.info("player A end");
return "player A";
});
CompletableFuture<String> playerB = CompletableFuture.supplyAsync(() -> {
log.info("player B start");
Sleeper.sleep(new Random().nextInt(1000));
log.info("player B end");
return "player B";
});
CompletableFuture<String> result = playerA.applyToEither(playerB, (v) -> {
return v + " is winner";
});
// 获取结果
log.info(result.join());
}结果如下:
txt
15:39:47.904 [ForkJoinPool.commonPool-worker-1] INFO : player A start
15:39:47.904 [ForkJoinPool.commonPool-worker-2] INFO : player B start
15:39:48.453 [ForkJoinPool.commonPool-worker-2] INFO : player B end
15:39:48.456 [main] INFO : player B is winner可以看到任务B先执行完成,那么就取任务B的结果,并且中断任务A的执行。
除了applyToEither(),还有类似的方法,用于处理竞争情景:
acceptEither(CompletionStage<? extends T> other, Consumer<T> action): 类似于applyToEither,也是谁先完成就用谁的结果,但它不转换结果,而是将结果交给一个Consumer进行消费(副作用)。返回一个CompletableFuture<Void>。runAfterEither(CompletionStage<?> other, Runnable action): 也是谁先完成就触发,但它不关心结果值,只是执行一个Runnable操作。返回一个CompletableFuture<Void>。
3.8 任务组合依赖
在前面的场景中,都是任务A执行完成后开始执行任务B,那么如果某个任务的执行依赖多个任务的结束呢,比如,任务任务C要开始执行的前提条件是任务A和任务B都执行完毕,但是任务A和任务B是互相独立的。这种情况我们可以使用thenCombine()。
java
<U, V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)它的作用是:它会等待调用 thenCombine 方法的当前 任务 和作为参数传递给 other 的任务 都正常完成后,会将它们各自的结果作为输入,传递给一个指定的**二元函数(BiFunction)**进行处理,并将该函数的返回值作为新的 任务 的结果。
例如,我们要计算1-n的累加结果,那么可以将其拆分为两个任务,第一个任务计算1-mid的累加和,第二个任务计算mid+1 - n的累加和(其中mid = (1+n) / 2),最后将两个任务的结果累加返回最终结果:
java
public static void main(String[] args) {
int r = compute(100);
log.info("{}", r);
}
private static int compute(int n){
if (n <= 0){
throw new IllegalArgumentException("n 应该大于 0");
}
int middle = (1 + n) / 2;
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
int result = 0;
for (int i = 1; i <= middle; i++) {
result += i;
}
return result;
});
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
int result = 0;
for (int i = middle + 1; i <= n; i++) {
result += i;
}
return result;
});
CompletableFuture<Integer> task = task1.thenCombine(task2, (v1, v2) -> {
return v1 + v2;
});
Integer r = task.join();
return r;
}结果:
txt
15:57:30.646 [main] INFO : 5050