CompletableFuture 异步:链式调用与组合

FreeGuideOnline 最新 2026-06-17

认识 CompletableFuture:现代 Java 异步编程的基石

CompletableFuture 是 Java 8 引入的异步编程工具,实现了 FutureCompletionStage 接口。它不仅能让你以非阻塞的方式执行任务,更重要的是提供了丰富的操作符来链式调用组合多个异步计算,极大地简化了复杂异步流程的编排。

为什么需要 CompletableFuture

传统的 Future 存在明显痛点:获取结果需要阻塞等待,无法手动完成,更不支持任务间的依赖和组合。CompletableFuture 正好解决了这些难题:

  • 显式完成:你可以主动将结果或异常设置到 Future 中。
  • 链式回调:任务完成后自动触发下一步操作,无需轮询。
  • 灵活组合:能将多个异步任务串联、聚合或竞速。
  • 异常处理:提供了声明式的异常恢复机制。

创建 CompletableFuture 实例

在学习链式与组合之前,先掌握如何快速创建一个异步任务。

基于 Supplier 创建

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟耗时操作
    return "Hello";
});

supplyAsync 接收一个 Supplier,在默认的 ForkJoinPool 公共线程池中执行。你可以传入自定义线程池:

Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello", executor);

基于 Runnable 创建

如果任务不返回结果,使用 runAsync

CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("执行异步日志操作");
});

手动完成一个 Future

有时任务结果来自外部事件,你可以提前创建一个 CompletableFuture 并稍后完成它:

CompletableFuture<String> future = new CompletableFuture<>();
// 在其他线程中
future.complete("手动结果");

链式调用:单任务的后续处理

链式调用是整个 CompletableFuture 设计理念的体现:就像建造流水线,每一步在上一步完成后自动触发,且不会阻塞主线程。

thenApply:转换结果

当异步任务完成后,对结果进行同步转换,返回一个新的 CompletableFuture。

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> "Hello")
    .thenApply(s -> s + " World");
// future.get() -> "Hello World"

如果转换逻辑本身是异步的,请使用 thenCompose,避免嵌套的 CompletableFuture。

thenAccept:消费结果

只关心最终结果,不需要返回新值时使用 thenAccept

CompletableFuture.supplyAsync(() -> "result")
    .thenAccept(System.out::println);

thenRun:不依赖结果的动作

任务结束后执行一个 Runnable,前后结果无关。

CompletableFuture.supplyAsync(() -> "done")
    .thenRun(() -> System.out.println("操作已完成"));

链式调用的特点

  • 每个步骤默认在上一个任务完成的线程中执行,但你可以通过异步版方法(如 thenApplyAsync)强制切换到其他线程池。
  • 如果前一步抛出异常,后续的链式操作会被跳过,直接进入异常处理流程。

组合多个 CompletableFuture

实际场景中,经常需要等待多个分布式服务的结果,或将两个独立的异步流程合并处理。

两个任务的组合:thenCombine

当两个并行任务都完成后,使用它们的结果进行计算。

CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<String> combined = futureA.thenCombine(futureB, (a, b) -> a + " " + b);
// combined.get() -> "Hello World"

如果先处理 A 的结果,再与 B 的结果合并,可以用 thenCompose 结合 thenCombine,但直接使用 thenCombine 可以并行执行 A 和 B。

两种消费方式:thenAcceptBoth

只关心两个结果,不做返回转换。

futureA.thenAcceptBoth(futureB, (a, b) -> 
    System.out.println("A: " + a + ", B: " + b));

等待任意完成:applyToEither

两个任务竞速,只要其中一个先完成,就使用它的结果进行后续处理。

CompletableFuture<String> fastSource = CompletableFuture.supplyAsync(() -> {
    sleep(100); return "Fast";
});
CompletableFuture<String> slowSource = CompletableFuture.supplyAsync(() -> {
    sleep(200); return "Slow";
});

CompletableFuture<String> result = fastSource.applyToEither(slowSource, s -> "来自: " + s);
// 大概率得到 "来自: Fast"

同样有 acceptEitherrunAfterEither 变体。

等待所有完成:allOf

当你需要等待多个任务全部完成,但不需要聚合结果时,allOf 返回一个 CompletableFuture<Void>

CompletableFuture<Void> all = CompletableFuture.allOf(futureA, futureB);
// 当 A 和 B 都完成后,all 才算完成
all.thenRun(() -> System.out.println("两个任务均已完成"));

allOf 通常配合 join() 使用,或以任务列表的方式收集结果:

CompletableFuture<Void> all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
CompletableFuture<List<String>> allResults = all.thenApply(v ->
    futures.stream()
           .map(CompletableFuture::join)
           .collect(Collectors.toList())
);

等待任一完成:anyOf

allOf 对应,anyOf 只要有一个任务完成即返回,结果是 Object 类型。

CompletableFuture<Object> any = CompletableFuture.anyOf(futureA, futureB);
any.thenAccept(result -> System.out.println("第一个完成的结果: " + result));

异常处理与恢复机制

异步调用中异常不会直接抛出到主线程,而是在 CompletableFuture 内部记录。你可以通过以下方式优雅处理。

exceptionally:异常恢复

当上游任务抛出异常时,提供一个备用值。

CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) throw new RuntimeException("模拟错误");
    return "正常结果";
}).exceptionally(ex -> {
    System.out.println("捕获异常: " + ex.getMessage());
    return "降级值";
});

handle:无论异常与否都执行

handle 回调同时接收结果和异常,你可以据此返回新值。

CompletableFuture.supplyAsync(() -> {
    // 可能抛出异常
    return 100 / 0;
}).handle((result, ex) -> {
    if (ex != null) {
        return 0;
    }
    return result;
});

whenComplete:观察结果与异常

whenComplete 不能改变返回结果,仅用于记录日志或清理操作。

future.whenComplete((result, ex) -> {
    if (ex != null) {
        log.error("任务失败", ex);
    } else {
        log.info("任务成功: {}", result);
    }
});

实战示例:多服务聚合调用

假设一个电商系统需要同时获取用户信息、商品详情和库存数量,最后聚合返回给前端。

public CompletableFuture<OrderDetail> fetchOrderDetail(long userId, long productId) {
    CompletableFuture<User> userFuture = userService.getUserAsync(userId);
    CompletableFuture<Product> productFuture = productService.getProductAsync(productId);
    CompletableFuture<Integer> stockFuture = inventoryService.getStockAsync(productId);

    return CompletableFuture.allOf(userFuture, productFuture, stockFuture)
            .thenApply(ignored -> {
                User user = userFuture.join();    // 此时不会阻塞,因为已经完成
                Product product = productFuture.join();
                int stock = stockFuture.join();
                return new OrderDetail(user, product, stock);
            })
            .exceptionally(ex -> {
                log.error("获取订单详情失败", ex);
                return new OrderDetail(); // 返回默认值
            });
}

线程池的选择与注意事项

  • 默认线程池:ForkJoinPool.commonPool(),并行度等于 CPU 核心数。适合 CPU 密集型任务。
  • 自定义线程池:对于 IO 密集型、高延迟的操作,务必使用专用线程池,避免阻塞公共池。
  • 异步后缀thenApplyAsyncthenCombineAsync 等方法可以指定线程池,用于控制每个步骤的执行线程。
Executor ioPool = Executors.newCachedThreadPool();
CompletableFuture.supplyAsync(() -> fetchFromDB(), ioPool)
    .thenApplyAsync(data -> process(data), computationPool);

总结

CompletableFuture 提供了一套函数式、声明式的异步编程模型。通过 thenApply 等链式方法实现流畅的单任务处理,通过 thenCombineallOf 等组合方法实现复杂的多任务编排,配合 exceptionally 让异常处理同样优雅。掌握这些能力后,你就能写出清晰、高效且易于维护的异步 Java 代码。