基于本文回答

播面 播面

文图音视,全方位拆解八股文
0
评论

Java CompletableFuture 异步编程详解

知识点图片

CompletableFuture是Java 8强大的异步编程工具。它通过链式调用、任务组合和优雅的异常处理,解决了传统Future的阻塞痛点,让你能以非阻塞方式构建高性能、高响应的复杂异步流程。

我们来全面深入地讲解一下Java中的CompletableFuture

CompletableFuture 是 Java 8 引入的一个非常强大的类,位于java.util.concurrent包下。它极大地增强了Java的异步编程能力,解决了传统Future接口的诸多痛点。

可以把它理解为:一个“可完成的未来”。它代表一个异步计算的结果,你可以在这个结果完成时(或未完成时)附加各种操作,如转换、组合、处理异常等,而无需阻塞等待。


1. 为什么需要 CompletableFuture?(传统 Future 的痛点)

在 Java 5 中引入的Future接口,虽然开启了异步编程的大门,但有几个明显的缺点:

  1. 无法主动完成:你无法在代码中手动将一个Future标记为“已完成”并设置其结果。它只能被执行它的ExecutorService完成。
  2. 阻塞式获取结果future.get()方法是阻塞的。调用该方法时,如果任务还没完成,当前线程就会被挂起,直到结果返回。这与异步编程的初衷相悖。
  3. 没有回调机制:你无法为Future添加一个回调函数,让它在任务完成后自动执行某个操作。你只能通过循环调用isDone()来检查任务状态,或者直接阻塞在get()上。
  4. 无法链式操作:多个Future之间很难进行串联。例如,你无法方便地表达“当任务A完成后,用它的结果去执行任务B”。
  5. 无法组合多个Future:你无法简单地实现“当多个任务都完成后,再执行某个操作”或者“当任意一个任务完成后,就执行某个操作”。
  6. 异常处理复杂get()方法会抛出受检异常(Checked Exception),使得异常处理代码比较冗长。

CompletableFuture 就是为了解决以上所有问题而设计的。


2. CompletableFuture 的核心优势

  1. 非阻塞:可以以声明式、函数式的方式定义计算流程,避免了get()的阻塞等待。
  2. 可编程:可以手动创建并完成一个CompletableFuture,非常适合将回调式API转换为响应式API。
  3. 强大的链式调用(Fluent API):提供了类似 Stream API 的链式方法(如thenApply, thenAccept, thenCompose等),可以优雅地组织异步任务流。
  4. 灵活的组合能力:可以轻松组合多个CompletableFuture(如allOf, anyOf, thenCombine)。
  5. 完善的异常处理:提供了exceptionally, handle等方法来优雅地处理异步执行中的异常。

3. 如何创建 CompletableFuture

有两种主要方式来创建一个异步任务。

3.1. 手动创建和完成

这在需要将一个基于回调的API适配为CompletableFuture时非常有用。

java
// 创建一个未完成的Future
CompletableFuture<String> future = new CompletableFuture<>();

// 在另一个线程中完成它
new Thread(() -> {
    try {
        Thread.sleep(2000);
        // 手动设置结果,完成Future
        future.complete("Hello, CompletableFuture!");
    } catch (InterruptedException e) {
        // 手动设置异常,完成Future
        future.completeExceptionally(e);
    }
}).start();

// 主线程可以继续做其他事情...

// 当需要结果时,再获取
System.out.println("主线程在等待结果...");
String result = future.join(); // join() 和 get() 类似,但不抛出受检异常
System.out.println("获取到结果: " + result);

3.2. 使用静态工厂方法执行异步任务

这是最常用的方式,CompletableFuture提供了四个静态方法来启动异步计算:

  • runAsync(Runnable runnable): 运行一个没有返回值的异步任务。
  • runAsync(Runnable runnable, Executor executor): 使用指定的线程池运行任务。
  • supplyAsync(Supplier<U> supplier): 运行一个有返回值的异步任务。
  • supplyAsync(Supplier<U> supplier, Executor executor): 使用指定的线程池运行任务。

注意:如果不指定ExecutorCompletableFuture默认会使用ForkJoinPool.commonPool()作为其线程池。对于计算密集型任务这很合适,但对于IO密集型任务,最好提供自定义的线程池,以避免阻塞公共线程池。

示例:

java
// 1. 无返回值的异步任务
CompletableFuture<Void> futureRun = CompletableFuture.runAsync(() -> {
    System.out.println("runAsync: 任务正在执行... Thread: " + Thread.currentThread().getName());
});

// 2. 有返回值的异步任务
CompletableFuture<String> futureSupply = CompletableFuture.supplyAsync(() -> {
    System.out.println("supplyAsync: 任务正在执行... Thread: " + Thread.currentThread().getName());
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Task Result";
});

// 等待并获取结果
System.out.println(futureSupply.get()); // 阻塞等待结果

4. 核心API:链式操作

这是CompletableFuture最强大的地方。当一个异步任务完成后,我们可以触发后续的操作。这些操作分为三类:thenApply (转换), thenAccept (消费), thenRun (执行动作)。

每个方法都有三个变种:

  • method(...): 使用与上一个任务相同的线程。
  • methodAsync(...): 使用默认的ForkJoinPool线程池。
  • methodAsync(..., Executor executor): 使用你指定的线程池。

4.1. thenApply / thenApplyAsync - 转换结果

当上一个任务完成后,将其结果作为输入,进行转换后,返回一个新的CompletableFuture。类似于Stream.map()

java
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> "123")
        .thenApply(s -> s.length()) // 转换:String -> Integer
        .thenApply(i -> i * 10);      // 转换:Integer -> Integer

System.out.println(future.join()); // 输出: 30

4.2. thenAccept / thenAcceptAsync - 消费结果

当上一个任务完成后,将其结果作为输入进行消费,但没有返回值 (CompletableFuture<Void>)。类似于Stream.forEach()

java
CompletableFuture.supplyAsync(() -> "Hello")
        .thenAccept(result -> System.out.println("收到的结果: " + result));
// 控制台会打印 "收到的结果: Hello"

4.3. thenRun / thenRunAsync - 执行动作

当上一个任务完成后,执行一个Runnable,不关心上一个任务的结果,也没有返回值。

java
CompletableFuture.supplyAsync(() -> "some result")
        .thenRun(() -> System.out.println("上一个任务已完成,开始执行新动作。"));

5. 核心API:组合多个 Future

5.1. thenCompose - 串行依赖

用于连接两个有依赖关系的CompletableFuture。当前一个任务完成后,将其结果作为参数传递给一个函数,该函数返回另一个CompletableFuture。最终结果是这个新的CompletableFuture的结果。类似于Stream.flatMap()

场景:先获取用户ID,然后根据用户ID去获取用户详情。

java
CompletableFuture<String> userDetailsFuture = CompletableFuture.supplyAsync(() -> 1001) // 模拟获取用户ID
        .thenCompose(userId -> CompletableFuture.supplyAsync(() -> "用户详情 for " + userId)); // 根据ID获取详情

System.out.println(userDetailsFuture.join()); // 输出: 用户详情 for 1001

5.2. thenCombine - 并行组合 (AND)

将两个独立的CompletableFuture组合起来,当两个都完成时,将它们的结果作为参数传递给一个函数,并返回最终结果。

场景:并行获取商品价格和商品折扣,然后计算最终价格。

java
CompletableFuture<Double> priceFuture = CompletableFuture.supplyAsync(() -> 199.99);
CompletableFuture<Double> discountFuture = CompletableFuture.supplyAsync(() -> 0.8);

CompletableFuture<Double> finalPriceFuture = priceFuture.thenCombine(discountFuture,
        (price, discount) -> price * discount); // (T, U) -> R

System.out.println("最终价格: " + finalPriceFuture.join()); // 输出: 最终价格: 159.992

5.3. allOf - 等待所有完成 (AND)

等待一组CompletableFuture全部完成allOf的返回值是CompletableFuture<Void>。如果你需要获取所有任务的结果,需要一些额外的处理。

java
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "任务1");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "任务2");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "任务3");

CompletableFuture<Void> allFutures = CompletableFuture.allOf(f1, f2, f3);

// 等待所有任务完成
allFutures.join();

// 获取所有结果(常用技巧)
// 注意:此时f1, f2, f3都已经完成了,调用join()不会阻塞
System.out.println(f1.join());
System.out.println(f2.join());
System.out.println(f3.join());

5.4. anyOf - 等待任意一个完成 (OR)

等待一组CompletableFuture任意一个完成,它的返回值是CompletableFuture<Object>,其结果是第一个完成的任务的结果。

场景:向多个服务查询同一个数据,谁先返回就用谁的。

java
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    try { Thread.sleep(800); } catch (Exception e) {}
    return "服务A的结果";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
    try { Thread.sleep(500); } catch (Exception e) {}
    return "服务B的结果";
});

CompletableFuture<Object> firstResult = CompletableFuture.anyOf(f1, f2);

System.out.println("最快返回的结果: " + firstResult.join()); // 输出: 最快返回的结果: 服务B的结果

6. 核心API:异常处理

异步任务中的异常处理至关重要。

6.1. exceptionally - 捕获异常并返回默认值

如果在执行链中任何一个环节出现异常,exceptionally可以捕获它,并提供一个备用结果。类似于try-catch中的catch块。

java
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        throw new RuntimeException("计算出错!");
    }
    return "计算成功";
}).exceptionally(ex -> {
    System.out.println("发生异常: " + ex.getMessage());
    return "默认结果"; // 返回一个备用结果
});

System.out.println(future.join()); // 可能输出 "计算成功" 或 "默认结果"

6.2. handle - 更通用的处理方式

无论上一步是正常完成还是出现异常,handle都会被执行。它接收两个参数:结果和异常(其中一个为null)。这让你可以在一个地方同时处理成功和失败的情况。类似于try-catch-finally

java
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (Math.random() < 0.5) {
        throw new RuntimeException("计算出错!");
    }
    return "计算成功";
}).handle((result, ex) -> {
    if (ex != null) { // 如果有异常
        System.out.println("处理异常: " + ex.getMessage());
        return "异常处理后的结果";
    }
    return "处理成功: " + result.toUpperCase(); // 处理正常结果
});

System.out.println(future.join());

6.3. whenComplete

handle类似,但它不改变结果。它只是一个"side-effect"操作,用于在完成时(无论成功与否)执行一些操作,比如记录日志。

java
CompletableFuture.supplyAsync(() -> "Hello")
    .whenComplete((res, ex) -> {
        if (ex == null) {
            System.out.println("任务成功,结果: " + res);
        } else {
            System.out.println("任务失败,异常: " + ex.getMessage());
        }
    });

7. 总结与最佳实践

  • 使用自定义线程池:对于IO密集型或耗时长的任务,强烈建议使用自定义的Executor,避免ForkJoinPool.commonPool()被长时间阻塞,影响其他任务。
  • 避免在链式调用中写阻塞代码CompletableFuture的目的是为了避免阻塞,如果在thenApply等方法中写入Thread.sleep()或进行同步IO,就失去了其意义。
  • get() vs join()get()会抛出受检异常(InterruptedException, ExecutionException),需要try-catchjoin()则将受检异常包装成非受检异常(CompletionException)抛出,代码更简洁。在主线程等待最终结果时,两者都可以用。
  • 理解......Async的区别:不带Async后缀的方法,通常会由上一个阶段的线程来执行。带Async后缀的,会提交到线程池中执行,这有助于任务的快速切换,避免单个线程执行过长的任务链。

CompletableFuture是现代Java并发编程的基石,它让异步代码的编写变得如同步代码般流畅和直观。掌握它对于编写高性能、高响应性的Java应用程序至关重要。

00:00
00:00