Java CompletableFuture 异步编程详解
CompletableFuture是Java 8强大的异步编程工具。它通过链式调用、任务组合和优雅的异常处理,解决了传统Future的阻塞痛点,让你能以非阻塞方式构建高性能、高响应的复杂异步流程。
我们来全面深入地讲解一下Java中的CompletableFuture。
CompletableFuture 是 Java 8 引入的一个非常强大的类,位于java.util.concurrent包下。它极大地增强了Java的异步编程能力,解决了传统Future接口的诸多痛点。
可以把它理解为:一个“可完成的未来”。它代表一个异步计算的结果,你可以在这个结果完成时(或未完成时)附加各种操作,如转换、组合、处理异常等,而无需阻塞等待。
1. 为什么需要 CompletableFuture?(传统 Future 的痛点)
在 Java 5 中引入的Future接口,虽然开启了异步编程的大门,但有几个明显的缺点:
- 无法主动完成:你无法在代码中手动将一个
Future标记为“已完成”并设置其结果。它只能被执行它的ExecutorService完成。 - 阻塞式获取结果:
future.get()方法是阻塞的。调用该方法时,如果任务还没完成,当前线程就会被挂起,直到结果返回。这与异步编程的初衷相悖。 - 没有回调机制:你无法为
Future添加一个回调函数,让它在任务完成后自动执行某个操作。你只能通过循环调用isDone()来检查任务状态,或者直接阻塞在get()上。 - 无法链式操作:多个
Future之间很难进行串联。例如,你无法方便地表达“当任务A完成后,用它的结果去执行任务B”。 - 无法组合多个Future:你无法简单地实现“当多个任务都完成后,再执行某个操作”或者“当任意一个任务完成后,就执行某个操作”。
- 异常处理复杂:
get()方法会抛出受检异常(Checked Exception),使得异常处理代码比较冗长。
CompletableFuture 就是为了解决以上所有问题而设计的。
2. CompletableFuture 的核心优势
- 非阻塞:可以以声明式、函数式的方式定义计算流程,避免了
get()的阻塞等待。 - 可编程:可以手动创建并完成一个
CompletableFuture,非常适合将回调式API转换为响应式API。 - 强大的链式调用(Fluent API):提供了类似 Stream API 的链式方法(如
thenApply,thenAccept,thenCompose等),可以优雅地组织异步任务流。 - 灵活的组合能力:可以轻松组合多个
CompletableFuture(如allOf,anyOf,thenCombine)。 - 完善的异常处理:提供了
exceptionally,handle等方法来优雅地处理异步执行中的异常。
3. 如何创建 CompletableFuture
有两种主要方式来创建一个异步任务。
3.1. 手动创建和完成
这在需要将一个基于回调的API适配为CompletableFuture时非常有用。
// 创建一个未完成的Future
CompletableFuture<String> future = new CompletableFuture<>();
// 在另一个线程中完成它
new Thread(() -> {
try {
Thread.sleep(2000);
// 手动设置结果,完成Future
future.complete("Hello, CompletableFuture!");
} catch (InterruptedException e) {
// 手动设置异常,完成Future
future.completeExceptionally(e);
}
}).start();
// 主线程可以继续做其他事情...
// 当需要结果时,再获取
System.out.println("主线程在等待结果...");
String result = future.join(); // join() 和 get() 类似,但不抛出受检异常
System.out.println("获取到结果: " + result);
3.2. 使用静态工厂方法执行异步任务
这是最常用的方式,CompletableFuture提供了四个静态方法来启动异步计算:
runAsync(Runnable runnable): 运行一个没有返回值的异步任务。runAsync(Runnable runnable, Executor executor): 使用指定的线程池运行任务。supplyAsync(Supplier<U> supplier): 运行一个有返回值的异步任务。supplyAsync(Supplier<U> supplier, Executor executor): 使用指定的线程池运行任务。
注意:如果不指定Executor,CompletableFuture默认会使用ForkJoinPool.commonPool()作为其线程池。对于计算密集型任务这很合适,但对于IO密集型任务,最好提供自定义的线程池,以避免阻塞公共线程池。
示例:
// 1. 无返回值的异步任务
CompletableFuture<Void> futureRun = CompletableFuture.runAsync(() -> {
System.out.println("runAsync: 任务正在执行... Thread: " + Thread.currentThread().getName());
});
// 2. 有返回值的异步任务
CompletableFuture<String> futureSupply = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync: 任务正在执行... Thread: " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Task Result";
});
// 等待并获取结果
System.out.println(futureSupply.get()); // 阻塞等待结果
4. 核心API:链式操作
这是CompletableFuture最强大的地方。当一个异步任务完成后,我们可以触发后续的操作。这些操作分为三类:thenApply (转换), thenAccept (消费), thenRun (执行动作)。
每个方法都有三个变种:
method(...): 使用与上一个任务相同的线程。methodAsync(...): 使用默认的ForkJoinPool线程池。methodAsync(..., Executor executor): 使用你指定的线程池。
4.1. thenApply / thenApplyAsync - 转换结果
当上一个任务完成后,将其结果作为输入,进行转换后,返回一个新的CompletableFuture。类似于Stream.map()。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> "123")
.thenApply(s -> s.length()) // 转换:String -> Integer
.thenApply(i -> i * 10); // 转换:Integer -> Integer
System.out.println(future.join()); // 输出: 30
4.2. thenAccept / thenAcceptAsync - 消费结果
当上一个任务完成后,将其结果作为输入进行消费,但没有返回值 (CompletableFuture<Void>)。类似于Stream.forEach()。
CompletableFuture.supplyAsync(() -> "Hello")
.thenAccept(result -> System.out.println("收到的结果: " + result));
// 控制台会打印 "收到的结果: Hello"
4.3. thenRun / thenRunAsync - 执行动作
当上一个任务完成后,执行一个Runnable,不关心上一个任务的结果,也没有返回值。
CompletableFuture.supplyAsync(() -> "some result")
.thenRun(() -> System.out.println("上一个任务已完成,开始执行新动作。"));
5. 核心API:组合多个 Future
5.1. thenCompose - 串行依赖
用于连接两个有依赖关系的CompletableFuture。当前一个任务完成后,将其结果作为参数传递给一个函数,该函数返回另一个CompletableFuture。最终结果是这个新的CompletableFuture的结果。类似于Stream.flatMap()。
场景:先获取用户ID,然后根据用户ID去获取用户详情。
CompletableFuture<String> userDetailsFuture = CompletableFuture.supplyAsync(() -> 1001) // 模拟获取用户ID
.thenCompose(userId -> CompletableFuture.supplyAsync(() -> "用户详情 for " + userId)); // 根据ID获取详情
System.out.println(userDetailsFuture.join()); // 输出: 用户详情 for 1001
5.2. thenCombine - 并行组合 (AND)
将两个独立的CompletableFuture组合起来,当两个都完成时,将它们的结果作为参数传递给一个函数,并返回最终结果。
场景:并行获取商品价格和商品折扣,然后计算最终价格。
CompletableFuture<Double> priceFuture = CompletableFuture.supplyAsync(() -> 199.99);
CompletableFuture<Double> discountFuture = CompletableFuture.supplyAsync(() -> 0.8);
CompletableFuture<Double> finalPriceFuture = priceFuture.thenCombine(discountFuture,
(price, discount) -> price * discount); // (T, U) -> R
System.out.println("最终价格: " + finalPriceFuture.join()); // 输出: 最终价格: 159.992
5.3. allOf - 等待所有完成 (AND)
等待一组CompletableFuture全部完成。allOf的返回值是CompletableFuture<Void>。如果你需要获取所有任务的结果,需要一些额外的处理。
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "任务1");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "任务2");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "任务3");
CompletableFuture<Void> allFutures = CompletableFuture.allOf(f1, f2, f3);
// 等待所有任务完成
allFutures.join();
// 获取所有结果(常用技巧)
// 注意:此时f1, f2, f3都已经完成了,调用join()不会阻塞
System.out.println(f1.join());
System.out.println(f2.join());
System.out.println(f3.join());
5.4. anyOf - 等待任意一个完成 (OR)
等待一组CompletableFuture中任意一个完成,它的返回值是CompletableFuture<Object>,其结果是第一个完成的任务的结果。
场景:向多个服务查询同一个数据,谁先返回就用谁的。
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(800); } catch (Exception e) {}
return "服务A的结果";
});
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(500); } catch (Exception e) {}
return "服务B的结果";
});
CompletableFuture<Object> firstResult = CompletableFuture.anyOf(f1, f2);
System.out.println("最快返回的结果: " + firstResult.join()); // 输出: 最快返回的结果: 服务B的结果
6. 核心API:异常处理
异步任务中的异常处理至关重要。
6.1. exceptionally - 捕获异常并返回默认值
如果在执行链中任何一个环节出现异常,exceptionally可以捕获它,并提供一个备用结果。类似于try-catch中的catch块。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("计算出错!");
}
return "计算成功";
}).exceptionally(ex -> {
System.out.println("发生异常: " + ex.getMessage());
return "默认结果"; // 返回一个备用结果
});
System.out.println(future.join()); // 可能输出 "计算成功" 或 "默认结果"
6.2. handle - 更通用的处理方式
无论上一步是正常完成还是出现异常,handle都会被执行。它接收两个参数:结果和异常(其中一个为null)。这让你可以在一个地方同时处理成功和失败的情况。类似于try-catch-finally。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("计算出错!");
}
return "计算成功";
}).handle((result, ex) -> {
if (ex != null) { // 如果有异常
System.out.println("处理异常: " + ex.getMessage());
return "异常处理后的结果";
}
return "处理成功: " + result.toUpperCase(); // 处理正常结果
});
System.out.println(future.join());
6.3. whenComplete
与handle类似,但它不改变结果。它只是一个"side-effect"操作,用于在完成时(无论成功与否)执行一些操作,比如记录日志。
CompletableFuture.supplyAsync(() -> "Hello")
.whenComplete((res, ex) -> {
if (ex == null) {
System.out.println("任务成功,结果: " + res);
} else {
System.out.println("任务失败,异常: " + ex.getMessage());
}
});
7. 总结与最佳实践
- 使用自定义线程池:对于IO密集型或耗时长的任务,强烈建议使用自定义的
Executor,避免ForkJoinPool.commonPool()被长时间阻塞,影响其他任务。 - 避免在链式调用中写阻塞代码:
CompletableFuture的目的是为了避免阻塞,如果在thenApply等方法中写入Thread.sleep()或进行同步IO,就失去了其意义。 get()vsjoin():get()会抛出受检异常(InterruptedException,ExecutionException),需要try-catch。join()则将受检异常包装成非受检异常(CompletionException)抛出,代码更简洁。在主线程等待最终结果时,两者都可以用。- 理解
...和...Async的区别:不带Async后缀的方法,通常会由上一个阶段的线程来执行。带Async后缀的,会提交到线程池中执行,这有助于任务的快速切换,避免单个线程执行过长的任务链。
CompletableFuture是现代Java并发编程的基石,它让异步代码的编写变得如同步代码般流畅和直观。掌握它对于编写高性能、高响应性的Java应用程序至关重要。