Skip to content

JUC Future和CompletableFuture

本文讲解Future和CompletableFuture的使用。

1. Callable接口

在前面介绍的多线程编程中,我们都是去实现Runnable接口:

java
@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

其中的run()方法没有返回值,如果我们要传递结果,则需要使用保护性暂停模式。

那么,有没有一种方式,可以让异步线程直接返回结果呢?当然有,就是Callable接口:

java
@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

Callable接口中,不仅能返回结果,也可以抛出异常。

我们再回到Thread的构造方法,发现其不接受Callable接口参数,那么如何将Callable接口与Thread联系起来呢?

image-20250514123704335

Runnable接口的子类中,有一个RunnableFuture子类,并且该类有一个实现类FutureTask,该实现类构造方法接受Callable

java
public FutureTask(Callable<V> callable)

至此,我们就通过FutureTaskCallableThread联系起来了。

下面以一个例子演示:

java
@Slf4j
public class FutureDemo01 {
    public static void main(String[] args) {
        FutureTask<Integer> futureTask = new FutureTask<>(new MyTask());

        Thread thread = new Thread(futureTask, "t1");
        thread.start();
    }
}

@Slf4j
class MyTask implements Callable<Integer>{

    @Override
    public Integer call() throws Exception {
        log.info("begin call");
        Sleeper.sleep(1000);
        log.info("end call");
        return 1;
    }
}

运行上述代码,结果如下:

txt
12:42:34.343 [t1] INFO  : begin call
12:42:35.350 [t1] INFO  : end call

可见成功启动了线程。

2.Future是什么

2.1 Future接口介绍

我们说引入Callable接口是为了获得任务执行结果,那么上面的代码中,任务执行了,那结果在哪呢?

结果就在Future对象中。在Future接口中,定义了5个方法,用来判断任务是否执行完毕、中断任务执行以及获取结果:

java
public interface Future<V> {
  	boolean isDone();
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
  • isDone():用来检查任务是否已经完成。任务完成可能因为正常结束、抛出异常或被取消。

    返回 true 表示任务已完成,false 表示任务还在进行中。

  • cancel(boolean mayInterruptIfRunning):尝试取消正在执行的任务。

    mayInterruptIfRunning 参数的含义是:如果任务当前正在运行,是否应该中断执行任务的线程。

    • true: 如果任务正在运行,会尝试中断该任务的线程。
    • false: 如果任务正在运行,则不会中断其线程;只有在任务尚未启动时,取消操作才会成功。

    返回 true 表示任务成功取消(或之前已经完成或取消),false 表示取消失败(通常是因为任务已经完成或者无法取消)。

  • boolean isCancelled():用来检查任务在正常完成之前是否被取消了。

    • 返回 true 表示任务被取消,false 表示任务没有被取消(可能已完成或还在进行)。
  • V get()阻塞方法。用于获取任务的计算结果。

    • 该方法会一直阻塞当前线程,直到任务完成并返回结果。

    • 如果任务在执行过程中抛出了异常,get() 方法会抛出 ExecutionException,该异常会封装实际的任务异常。

    • 如果任务被中断,get() 方法会抛出 InterruptedException

  • V get(long timeout, TimeUnit unit)带超时时间的阻塞方法。用于在指定的时间内获取任务的计算结果。

    • 如果在指定时间内任务完成,则返回结果。

    • 如果在指定时间内任务未完成,则抛出 TimeoutException

2.2 Future接口使用

2.2.1 get()

下面以上面的代码为基础,来演示Future接口来获取任务结果:

java
public static void main(String[] args) throws ExecutionException, InterruptedException {
    FutureTask<Integer> futureTask = new FutureTask<>(new MyTask());

    Thread thread = new Thread(futureTask, "t1");
    thread.start();

    log.info("主线程获取任务结果");
    Integer i = futureTask.get();
    log.info("结果:{}", i);
}

结果如下:

txt
12:52:03.147 [t1] INFO  : begin call
12:52:03.147 [main] INFO  : 主线程获取任务结果
12:52:04.153 [t1] INFO  : end call
12:52:04.154 [main] INFO  : 结果:1

可以看到,主线程要等任务结束后才能获取到结果,所以调用get()后阻塞了1秒钟。

2.2.2 cancel()

我们也可以使用cancel()方法来中断任务的执行:

java
public static void main(String[] args) throws ExecutionException, InterruptedException {
    FutureTask<Integer> futureTask = new FutureTask<>(new MyTask());

    Thread thread = new Thread(futureTask, "t1");
    thread.start();

    Sleeper.sleep(500);

    boolean cancel = futureTask.cancel(true);
    log.info("cancel result : {}", cancel);

    boolean cancelled = futureTask.isCancelled();
    log.info("isCancelled : {}", cancelled);

    boolean done = futureTask.isDone();
    log.info("is doned : {}", done);

    Integer i = futureTask.get();

}

结果如下:

java
12:57:14.235 [t1] INFO  : begin call
12:57:14.737 [main] INFO  : cancel result : true
12:57:14.739 [main] INFO  : isCancelled : true
12:57:14.740 [main] INFO  : is doned : true
Exception in thread "main" java.util.concurrent.CancellationException

可以看到调用cancel()成功中断了任务的执行,并且中断任务执行后,再次使用get()方法获取结果时,会抛出CancellationException异常。

2.3 Future接口缺点分析

虽然使用Future接口可以完成很多事,比如中断异步任务的执行、获取异步任务的结果,但是仍然存在以下问题:

  • 阻塞的 get() 方法: 这是最主要的局限性。当调用 get() 方法时,当前线程会被阻塞,这在某些场景下可能不是最优的。无法在不阻塞的情况下知道结果何时可用,也无法方便地注册一个回调函数在任务完成时自动执行。

  • 难以进行复杂的异步流程控制: 如果有多个相互依赖的异步任务(比如任务B需要任务A的结果才能开始),使用 Future 很难优雅地实现这种链式或组合的操作。通常需要手动管理这些 Future,在需要时调用 get(),这容易导致阻塞或复杂的逻辑。

  • 异常处理不直观: 任务内部抛出的异常被包装在 ExecutionException 中,需要额外的代码来解包。

3. CompletableFuture

为了解决 Future 的这些局限性,Java 8 引入了 CompletableFutureCompletableFuture 提供了更强大的功能,支持非阻塞的回调、任务的组合、异常处理的链式调用等,极大地简化了异步编程。

3.1 介绍

CompletableFuture是一个类,实现了FutureCompletionStage接口,简单来说,CompletableFuture 代表了一个可能还没有完成的异步操作的结果,并且允许你注册回调函数(actions),在操作完成时自动执行这些回调。它比 Future 更灵活和强大,因为它支持非阻塞的操作链、组合多个异步操作,以及更优雅的异常处理。

3.2 运行异步任务

CompletableFuture中,运行异步任务有两种方式(都为静态方法):

  • 异步任务没有返回值:
    • public static CompletableFuture<Void> runAsync(Runnable runnable):使用默认的线程池运行没有返回值的异步任务;
    • public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor):使用指定的线程池运行没有返回值的异步任务;
  • 异步任务有返回值:
    • public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier):使用默认的线程池运行有返回值的异步任务;
    • public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor):使用指定的线程池运行有返回值的异步任务;

下面的例子展示了运行没有返回值的异步任务:

java
public static void main(String[] args) {
    // 默认线程池运行
    CompletableFuture.runAsync(() -> {
        log.info("task 1");
    });

    // 指定线程池运行
    ExecutorService threadPool = Executors.newFixedThreadPool(2);
    try {
        CompletableFuture.runAsync(() -> {
            log.info("task 2");
        }, threadPool);
    }finally {
        threadPool.shutdown();
    }
}

结果如下:

txt
13:13:02.263 [ForkJoinPool.commonPool-worker-1] INFO  : task 1
13:13:02.263 [pool-1-thread-1] INFO  : task 2

可以看到,默认的线程池是ForkJoinPool

下面的例子展示了运行有返回值的异步任务:

java
public static void main(String[] args) throws ExecutionException, InterruptedException {
    // 默认线程池运行
    CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
        log.info("task 3");
        return 3;
    });

    // CompletableFuture实现了Future接口,所以也可以使用get()方法 
    Integer r1 = completableFuture1.get();
    log.info("r1: {}", r1);

    // 指定线程池运行
    ExecutorService threadPool = Executors.newFixedThreadPool(2);
    try {
        CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
            log.info("task 4");
            return 4;
        }, threadPool);

        Integer r2 = completableFuture2.get();
        log.info("r2: {}", r2);

    }finally {
        threadPool.shutdown();
    }
}

结果如下:

txt
13:16:23.768 [ForkJoinPool.commonPool-worker-1] INFO  : task 3
13:16:23.769 [main] INFO  : r1: 3
13:16:23.770 [pool-1-thread-1] INFO  : task 4
13:16:23.770 [main] INFO  : r2: 4

3.3 获取结果

除了可以使用get()获取结果,也可以使用getNow()获取结果。

  • public T getNow(T valueIfAbsent):如果任务已完成,则返回任务结果,否则返回valueIfAbsent

相比于get()getNow()不会阻塞调用线程。

3.4 注册回调函数

当异步任务执行完成后,如果允许自动执行一些操作,那么这些操作就称为回调函数。

CompletableFuture中注册回调函数使用whenComplete()方法,该方法接收BiConsumer<? super T, ? super Throwable>为参数,第一个为任务返回值,第二个为执行任务期间抛出的异常。

示例如下:

java
public static void main(String[] args) throws ExecutionException, InterruptedException {
    // 默认线程池运行
    CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
        log.info("task 3");
        Sleeper.sleep(1000);
      	int r = new Random().nextInt(10);
        if(r < 5) {
            throw new RuntimeException("发生异常");
        }
        return 3;
    }).whenComplete((v,e)->{
        if(e == null){
            log.info("v: {}", v);
        }else{
            e.printStackTrace();
        }
    });

    // 由于默认线程池中的线程是守护线程,所以主线程结束后,整个程序就退出了,此处让主线程休眠
    Sleeper.sleep(2000);
}

多次运行上述代码,会出现以下情况:

txt
14:57:00.612 [ForkJoinPool.commonPool-worker-1] INFO  : task 3
java.util.concurrent.CompletionException: java.lang.RuntimeException: 发生异常
txt
14:58:15.266 [ForkJoinPool.commonPool-worker-1] INFO  : task 3
14:58:16.272 [ForkJoinPool.commonPool-worker-1] INFO  : v: 3

3.5 异常处理

虽然在whenComplete()方法中可以处理异常,但是异常情况和正常情况逻辑混合在一起,不够直观,所以CompletableFuture还提供了异常情况下的回调函数:exceptionally(),该方法接收Function<Throwable, ? extends T> fn参数,其中消费异常,返回正常值。

代码示例如下:

java
public static void main(String[] args) throws ExecutionException, InterruptedException {
    // 默认线程池运行
    CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
        log.info("task 3");
        Sleeper.sleep(1000);
        int r = new Random().nextInt(10);
        if(r < 5) {
            throw new RuntimeException("发生异常");
        }
        return 3;
    }).whenComplete((v,e)->{
        // 正常逻辑逻辑
        if(e == null){
            log.info("v: {}", v);
        }
    }).exceptionally(e->{
        // 异常逻辑处理
        e.printStackTrace();
        return null;
    });

    // 由于默认线程池中的线程是守护线程,所以主线程结束后,整个程序就退出了,此处让主线程休眠
    Sleeper.sleep(2000);
}

3.6 链式执行任务

如果要求任务B在任务A完成之后执行,那么可以用以下方式:

  • thenApply(Function<T, R>): 当任务A 完成后,将其结果(类型 T)传递给任务B,开始执行任务B,并且任务B也要返回结果(类型T)。
  • thenAccept(Consumer<T>): 当 任务A 完成后,将其结果(类型 T)传递给任务B,任务B不返回结果。
  • thenRun(Runnable): 当任务A完成后,开始执行任务B,其中任务B的执行不依赖于任务A的结果,并且任务B不返回结果。

接下来以代码示例演示上面方法的用法:

java
public static void main(String[] args) {
    CompletableFuture.supplyAsync(()->{
        log.info("task 1");
        return 1;
    }).thenApply(v->{
        log.info("task 2 , accept : {}", v);
        return 2;
    }).thenApply(v->{
        log.info("task 3 , accept : {}", v);
        return 3;
    });

    Sleeper.sleep(1000);
}

// 结果如下:
//15:20:14.688 [ForkJoinPool.commonPool-worker-1] INFO  : task 1
//15:20:14.688 [ForkJoinPool.commonPool-worker-1] INFO  : task 2 , accept : 1
//15:20:14.689 [ForkJoinPool.commonPool-worker-1] INFO  : task 3 , accept : 2
java
public static void main(String[] args) {
    CompletableFuture.supplyAsync(()->{
        log.info("task 1");
        return 1;
    }).thenAccept(v->{
        log.info("task 2 , accept : {}", v);
    }).thenAccept(v->{
        log.info("task 3 , accept : {}", v);
    });

    Sleeper.sleep(1000);
}

// 结果如下:
// 15:21:54.533 [ForkJoinPool.commonPool-worker-1] INFO  : task 1
// 15:21:54.533 [ForkJoinPool.commonPool-worker-1] INFO  : task 2 , accept : 1
// 15:21:54.534 [ForkJoinPool.commonPool-worker-1] INFO  : task 3 , accept : null
java
public static void main(String[] args) {
    CompletableFuture.supplyAsync(()->{
        log.info("task 1");
        return 1;
    }).thenRun(()->{
        log.info("task 2");
    }).thenRun(()->{
        log.info("task 3");
    });

    Sleeper.sleep(1000);
}

// 结果如下:
// 15:24:02.530 [ForkJoinPool.commonPool-worker-1] INFO  : task 1
// 15:24:02.531 [ForkJoinPool.commonPool-worker-1] INFO  : task 2
// 15:24:02.531 [ForkJoinPool.commonPool-worker-1] INFO  : task 3

注意,在上面的代码中(thenApply()thenAccept()thenRun()),如果链式调用中的某一步出现了异常,则后面的任务不会执行。

例如:

java
public static void main(String[] args) {
    CompletableFuture.supplyAsync(()->{
        log.info("task 1");
        return 1;
    }).thenAccept((v)->{
        log.info("task 2");
        int i = 10 / 0;
    }).thenAccept((v)->{
        log.info("task 3");
    }).exceptionally(e->{
        e.printStackTrace();
        return null;
    });

    Sleeper.sleep(1000);
}

结果如下(部分):

txt
15:27:27.339 [ForkJoinPool.commonPool-worker-1] INFO  : task 1
15:27:27.340 [ForkJoinPool.commonPool-worker-1] INFO  : task 2
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

可以看到任务三并没有执行。

如果要让链式执行任务过程中,即使某个任务出现了异常,后续任务也能继续执行,我们可以使用handle()方法,该方法接收BiFunction<? super T, Throwable, ? extends U>,即上一步的结果与异常,我们可以在下一步任务中判断是否有异常,然后执行不同的逻辑。

java
public static void main(String[] args) {
    CompletableFuture.supplyAsync(()->{
        log.info("task 1");
        return 1;
    }).handle((v,e)->{
        if(e == null) {
            log.info("task 2");
            int i = 10 / 0;
            return 2;
        }else{
            log.info("task 2: task 1 occur exception - {}", e.getMessage());
            return -1;
        }
    }).handle((v,e)->{
        if(e == null) {
            log.info("task 3");
            return 3;
        }else{
            log.info("task 3: task 2 occur exception - {}", e.getMessage());
            return -1;
        }
    }).exceptionally(e->{
        e.printStackTrace();
        return null;
    });

    Sleeper.sleep(1000);
}

结果如下:

txt
15:32:35.458 [ForkJoinPool.commonPool-worker-1] INFO  : task 1
15:32:35.459 [ForkJoinPool.commonPool-worker-1] INFO  : task 2
15:32:35.459 [ForkJoinPool.commonPool-worker-1] INFO  : task 3: task 2 occur exception - java.lang.ArithmeticException: / by zero

3.7 任务竞争

CompletableFuture 中的 applyToEither 方法是用于处理“二者择一”的场景。

它的作用是:当调用 applyToEither 方法的当前 CompletableFuture 或者 作为参数传递的 other 那个 CompletionStage有任何一个正常完成时,就将先完成的那个 Future 的结果作为输入,应用到一个指定的 Function 函数上,并将函数返回的结果作为新的 CompletableFuture 的结果。

简单来说,它实现了两个异步任务之间的**“赛跑”(race)**:谁先完成,就取谁的结果(经过函数处理后)作为最终的结果。

比如,现在有两个选手(A和B)比赛赛跑,谁先到达终点为冠军,那么可以用代码模拟如下:

java
public static void main(String[] args) {
    CompletableFuture<String> playerA = CompletableFuture.supplyAsync(() -> {
        log.info("player A start");
        Sleeper.sleep(new Random().nextInt(1000));
        log.info("player A end");
        return "player A";
    });

    CompletableFuture<String> playerB = CompletableFuture.supplyAsync(() -> {
        log.info("player B start");
        Sleeper.sleep(new Random().nextInt(1000));
        log.info("player B end");
        return "player B";
    });

    CompletableFuture<String> result = playerA.applyToEither(playerB, (v) -> {
        return v + " is winner";
    });

    // 获取结果
    log.info(result.join());

}

结果如下:

txt
15:39:47.904 [ForkJoinPool.commonPool-worker-1] INFO  : player A start
15:39:47.904 [ForkJoinPool.commonPool-worker-2] INFO  : player B start
15:39:48.453 [ForkJoinPool.commonPool-worker-2] INFO  : player B end
15:39:48.456 [main] INFO  : player B is winner

可以看到任务B先执行完成,那么就取任务B的结果,并且中断任务A的执行。

除了applyToEither(),还有类似的方法,用于处理竞争情景:

  • acceptEither(CompletionStage<? extends T> other, Consumer<T> action): 类似于 applyToEither,也是谁先完成就用谁的结果,但它不转换结果,而是将结果交给一个 Consumer 进行消费(副作用)。返回一个 CompletableFuture<Void>
  • runAfterEither(CompletionStage<?> other, Runnable action): 也是谁先完成就触发,但它不关心结果值,只是执行一个 Runnable 操作。返回一个 CompletableFuture<Void>

3.8 任务组合依赖

在前面的场景中,都是任务A执行完成后开始执行任务B,那么如果某个任务的执行依赖多个任务的结束呢,比如,任务任务C要开始执行的前提条件是任务A和任务B都执行完毕,但是任务A和任务B是互相独立的。这种情况我们可以使用thenCombine()

java
<U, V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)

它的作用是:它会等待调用 thenCombine 方法的当前 任务 和作为参数传递给 other 的任务 都正常完成后,会将它们各自的结果作为输入,传递给一个指定的**二元函数(BiFunction)**进行处理,并将该函数的返回值作为新的 任务 的结果。

例如,我们要计算1-n的累加结果,那么可以将其拆分为两个任务,第一个任务计算1-mid的累加和,第二个任务计算mid+1 - n的累加和(其中mid = (1+n) / 2),最后将两个任务的结果累加返回最终结果:

java
public static void main(String[] args) {
    int r = compute(100);
    log.info("{}", r);
}

private static int compute(int n){
    if (n <= 0){
        throw new IllegalArgumentException("n 应该大于 0");
    }
    int middle = (1 + n) / 2;

    CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> {
        int result = 0;
        for (int i = 1; i <= middle; i++) {
            result += i;
        }
        return result;
    });

    CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> {
        int result = 0;
        for (int i = middle + 1; i <= n; i++) {
            result += i;
        }
        return result;
    });

    CompletableFuture<Integer> task = task1.thenCombine(task2, (v1, v2) -> {
        return v1 + v2;
    });

    Integer r = task.join();
    return r;
}

结果:

txt
15:57:30.646 [main] INFO  : 5050