CompletableFuture 异步:链式调用与组合
认识 CompletableFuture:现代 Java 异步编程的基石
CompletableFuture 是 Java 8 引入的异步编程工具,实现了 Future 和 CompletionStage 接口。它不仅能让你以非阻塞的方式执行任务,更重要的是提供了丰富的操作符来链式调用与组合多个异步计算,极大地简化了复杂异步流程的编排。
为什么需要 CompletableFuture
传统的 Future 存在明显痛点:获取结果需要阻塞等待,无法手动完成,更不支持任务间的依赖和组合。CompletableFuture 正好解决了这些难题:
- 显式完成:你可以主动将结果或异常设置到 Future 中。
- 链式回调:任务完成后自动触发下一步操作,无需轮询。
- 灵活组合:能将多个异步任务串联、聚合或竞速。
- 异常处理:提供了声明式的异常恢复机制。
创建 CompletableFuture 实例
在学习链式与组合之前,先掌握如何快速创建一个异步任务。
基于 Supplier 创建
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
return "Hello";
});
supplyAsync 接收一个 Supplier,在默认的 ForkJoinPool 公共线程池中执行。你可以传入自定义线程池:
Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello", executor);
基于 Runnable 创建
如果任务不返回结果,使用 runAsync:
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("执行异步日志操作");
});
手动完成一个 Future
有时任务结果来自外部事件,你可以提前创建一个 CompletableFuture 并稍后完成它:
CompletableFuture<String> future = new CompletableFuture<>();
// 在其他线程中
future.complete("手动结果");
链式调用:单任务的后续处理
链式调用是整个 CompletableFuture 设计理念的体现:就像建造流水线,每一步在上一步完成后自动触发,且不会阻塞主线程。
thenApply:转换结果
当异步任务完成后,对结果进行同步转换,返回一个新的 CompletableFuture。
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World");
// future.get() -> "Hello World"
如果转换逻辑本身是异步的,请使用 thenCompose,避免嵌套的 CompletableFuture。
thenAccept:消费结果
只关心最终结果,不需要返回新值时使用 thenAccept。
CompletableFuture.supplyAsync(() -> "result")
.thenAccept(System.out::println);
thenRun:不依赖结果的动作
任务结束后执行一个 Runnable,前后结果无关。
CompletableFuture.supplyAsync(() -> "done")
.thenRun(() -> System.out.println("操作已完成"));
链式调用的特点
- 每个步骤默认在上一个任务完成的线程中执行,但你可以通过异步版方法(如
thenApplyAsync)强制切换到其他线程池。 - 如果前一步抛出异常,后续的链式操作会被跳过,直接进入异常处理流程。
组合多个 CompletableFuture
实际场景中,经常需要等待多个分布式服务的结果,或将两个独立的异步流程合并处理。
两个任务的组合:thenCombine
当两个并行任务都完成后,使用它们的结果进行计算。
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combined = futureA.thenCombine(futureB, (a, b) -> a + " " + b);
// combined.get() -> "Hello World"
如果先处理 A 的结果,再与 B 的结果合并,可以用 thenCompose 结合 thenCombine,但直接使用 thenCombine 可以并行执行 A 和 B。
两种消费方式:thenAcceptBoth
只关心两个结果,不做返回转换。
futureA.thenAcceptBoth(futureB, (a, b) ->
System.out.println("A: " + a + ", B: " + b));
等待任意完成:applyToEither
两个任务竞速,只要其中一个先完成,就使用它的结果进行后续处理。
CompletableFuture<String> fastSource = CompletableFuture.supplyAsync(() -> {
sleep(100); return "Fast";
});
CompletableFuture<String> slowSource = CompletableFuture.supplyAsync(() -> {
sleep(200); return "Slow";
});
CompletableFuture<String> result = fastSource.applyToEither(slowSource, s -> "来自: " + s);
// 大概率得到 "来自: Fast"
同样有 acceptEither 和 runAfterEither 变体。
等待所有完成:allOf
当你需要等待多个任务全部完成,但不需要聚合结果时,allOf 返回一个 CompletableFuture<Void>。
CompletableFuture<Void> all = CompletableFuture.allOf(futureA, futureB);
// 当 A 和 B 都完成后,all 才算完成
all.thenRun(() -> System.out.println("两个任务均已完成"));
allOf 通常配合 join() 使用,或以任务列表的方式收集结果:
CompletableFuture<Void> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<List<String>> allResults = all.thenApply(v ->
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
等待任一完成:anyOf
和 allOf 对应,anyOf 只要有一个任务完成即返回,结果是 Object 类型。
CompletableFuture<Object> any = CompletableFuture.anyOf(futureA, futureB);
any.thenAccept(result -> System.out.println("第一个完成的结果: " + result));
异常处理与恢复机制
异步调用中异常不会直接抛出到主线程,而是在 CompletableFuture 内部记录。你可以通过以下方式优雅处理。
exceptionally:异常恢复
当上游任务抛出异常时,提供一个备用值。
CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) throw new RuntimeException("模拟错误");
return "正常结果";
}).exceptionally(ex -> {
System.out.println("捕获异常: " + ex.getMessage());
return "降级值";
});
handle:无论异常与否都执行
handle 回调同时接收结果和异常,你可以据此返回新值。
CompletableFuture.supplyAsync(() -> {
// 可能抛出异常
return 100 / 0;
}).handle((result, ex) -> {
if (ex != null) {
return 0;
}
return result;
});
whenComplete:观察结果与异常
whenComplete 不能改变返回结果,仅用于记录日志或清理操作。
future.whenComplete((result, ex) -> {
if (ex != null) {
log.error("任务失败", ex);
} else {
log.info("任务成功: {}", result);
}
});
实战示例:多服务聚合调用
假设一个电商系统需要同时获取用户信息、商品详情和库存数量,最后聚合返回给前端。
public CompletableFuture<OrderDetail> fetchOrderDetail(long userId, long productId) {
CompletableFuture<User> userFuture = userService.getUserAsync(userId);
CompletableFuture<Product> productFuture = productService.getProductAsync(productId);
CompletableFuture<Integer> stockFuture = inventoryService.getStockAsync(productId);
return CompletableFuture.allOf(userFuture, productFuture, stockFuture)
.thenApply(ignored -> {
User user = userFuture.join(); // 此时不会阻塞,因为已经完成
Product product = productFuture.join();
int stock = stockFuture.join();
return new OrderDetail(user, product, stock);
})
.exceptionally(ex -> {
log.error("获取订单详情失败", ex);
return new OrderDetail(); // 返回默认值
});
}
线程池的选择与注意事项
- 默认线程池:ForkJoinPool.commonPool(),并行度等于 CPU 核心数。适合 CPU 密集型任务。
- 自定义线程池:对于 IO 密集型、高延迟的操作,务必使用专用线程池,避免阻塞公共池。
- 异步后缀:
thenApplyAsync、thenCombineAsync等方法可以指定线程池,用于控制每个步骤的执行线程。
Executor ioPool = Executors.newCachedThreadPool();
CompletableFuture.supplyAsync(() -> fetchFromDB(), ioPool)
.thenApplyAsync(data -> process(data), computationPool);
总结
CompletableFuture 提供了一套函数式、声明式的异步编程模型。通过 thenApply 等链式方法实现流畅的单任务处理,通过 thenCombine、allOf 等组合方法实现复杂的多任务编排,配合 exceptionally 让异常处理同样优雅。掌握这些能力后,你就能写出清晰、高效且易于维护的异步 Java 代码。