Callable和Future

  • Callable 负责 定义任务逻辑(类似 Runnable,但能返回结果和抛出异常)。
  • Future 负责 管理任务生命周期获取计算结果
  • 二者结合才能完整实现 “提交任务 → 异步执行 → 获取结果” 的流程。

功能互补

职责 对方的作用
Callable 定义可返回结果的任务逻辑 需要 Future 来接收和管理它的计算结果
Future 监控任务状态、获取结果或取消任务 需要 Callable 作为实际的任务提供者
// 典型使用场景
ExecutorService executor = Executors.newCachedThreadPool();
Callable<String> task = () -> "Hello World";  // 定义任务
Future<String> future = executor.submit(task); // 提交任务并获取Future
String result = future.get();                  // 通过Future获取结果

Callable接口

Callable是Java 5引入的接口,与Runnable类似,但有三个关键区别:

public interface Callable<V> {
    V call() throws Exception;
}
  • 可以返回结果(泛型V)
  • 可以抛出受检异常
  • 需要实现 call() 方法而非 run()

与 Runnable 的对比

特性 Callable Runnable
返回值 支持 不支持
异常 可以抛出受检异常 不能抛出受检异常
方法签名 V call() throws Exception void run()
使用场景 需要返回结果的异步任务 简单的异步操作

Callable的基本使用

基本用法示例:

// 创建Callable任务
Callable<Integer> task = () -> {
    int sum = 0;
    for (int i = 1; i <= 10; i++) {
        sum += i;
        Thread.sleep(100); // 模拟耗时操作
    }
    return sum;
};

// 使用FutureTask包装Callable
FutureTask<Integer> futureTask = new FutureTask<>(task);

// 启动线程执行任务
new Thread(futureTask).start();

try {
    // 获取计算结果(会阻塞直到计算完成)
    int result = futureTask.get();
    System.out.println("1到10的和为: " + result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

与 ExecutorService 结合使用:

// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(4);

// 创建Callable任务列表
List<Callable<Integer>> tasks = new ArrayList<>();
for (int i = 0; i < 10; i++) {
    final int taskId = i;
    tasks.add(() -> {
        System.out.println("任务" + taskId + "开始执行");
        Thread.sleep(1000); // 模拟耗时操作
        return taskId * taskId; // 返回平方值
    });
}

try {
    // 提交所有任务并获取Future列表
    List<Future<Integer>> futures = executor.invokeAll(tasks);

    // 处理结果
    for (Future<Integer> future : futures) {
        System.out.println("任务结果: " + future.get());
    }
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
} finally {
    // 关闭线程池
    executor.shutdown();
}

应用场景

场景1:并行计算

import java.util.concurrent.*;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class ParallelComputation {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        ExecutorService executor = Executors.newFixedThreadPool(4);

        try {
            // 并行计算每个数字的阶乘
            List<Future<Long>> futures = executor.invokeAll(
                    numbers.stream()
                            .map(n -> (Callable<Long>) () -> factorial(n))
                            .collect(Collectors.toList())
            );

            for (int i = 0; i < numbers.size(); i++) {
                System.out.printf("%d! = %d%n", numbers.get(i), futures.get(i).get());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }
    }

    private static long factorial(int n) {
        if (n == 0) return 1;
        long result = 1;
        for (int i = 1; i <= n; i++) {
            result *= i;
            try {
                Thread.sleep(100); // 模拟计算耗时
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        return result;
    }
}

场景2:超时控制

import java.util.concurrent.*;

public class TimeoutExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();

        Callable<String> task = () -> {
            Thread.sleep(2000); // 模拟长时间运行的任务
            return "任务完成";
        };

        Future<String> future = executor.submit(task);

        try {
            // 设置1秒超时
            String result = future.get(1, TimeUnit.SECONDS);
            System.out.println(result);
        } catch (TimeoutException e) {
            System.out.println("任务超时,取消任务");
            future.cancel(true); // 尝试中断任务
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }
    }
}

场景3:多个任务竞争(获取第一个完成的结果)

import java.util.concurrent.*;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

public class FirstCompletedExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();

        List<Callable<String>> tasks = Arrays.asList(
            () -> { Thread.sleep(new Random().nextInt(1000)); return "任务1完成"; },
            () -> { Thread.sleep(new Random().nextInt(1000)); return "任务2完成"; },
            () -> { Thread.sleep(new Random().nextInt(1000)); return "任务3完成"; }
        );

        try {
            // 获取第一个完成的任务结果
            String result = executor.invokeAny(tasks);
            System.out.println("第一个完成的任务: " + result);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }
    }
}

Future接口

Future是Java并发编程中一个非常重要的接口,它位于 java.util.concurrent 包中。Future 表示一个异步计算的结果,它提供了检查计算是否完成的方法,以及等待计算完成并检索其结果的方法。

Future接口为Java并发编程提供了基础的异步计算支持,通过它可以:

  • 提交任务并获取未来可能的结果
  • 查询任务状态和取消任务
  • 实现任务的超时控制
package java.util.concurrent;

public interface Future<V> {
    // 尝试取消任务的执行
    boolean cancel(boolean mayInterruptIfRunning);

    // 判断任务是否被取消
    boolean isCancelled();

    // 判断任务是否完成
    boolean isDone();

    // 获取计算结果(阻塞直到计算完成)
    V get() throws InterruptedException, ExecutionException;

    // 获取计算结果(带超时限制)
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • isCancelled():只有任务未启动,或者在完成之前被取消,才会返回true,表示任务已经被成功取消。其他情况都会返回false
  • get():方法是阻塞的,可能会影响系统响应性。使用带超时的 get() 方法避免无限期等待

Future的使用

基本使用

案例1:基本使用

import java.util.concurrent.*;

public class FutureBasicDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();

        // 提交Callable任务,返回Future对象
        Future<String> future = executor.submit(() -> {
            Thread.sleep(2000); // 模拟耗时操作
            return "Hello, Future!";
        });

        System.out.println("任务已提交,可以继续做其他事情...");

        // 检查任务是否完成
        if (!future.isDone()) {
            System.out.println("任务还未完成,可以继续处理其他逻辑");
        }

        // 获取结果(会阻塞直到任务完成)
        String result = future.get();
        System.out.println("获取到结果: " + result);

        executor.shutdown();
    }
}

案例2:超时控制

import java.util.concurrent.*;

public class FutureTimeoutDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();

        Future<String> future = executor.submit(() -> {
            Thread.sleep(3000);
            return "Result after long processing";
        });

        try {
            // 设置超时时间为2秒
            String result = future.get(2, TimeUnit.SECONDS);
            System.out.println("结果: " + result);
        } catch (TimeoutException e) {
            System.out.println("任务执行超时");
            future.cancel(true); // 取消任务
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }
    }
}

案例3:任务取消

import java.util.concurrent.*;

public class FutureCancelDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();

        Future<String> future = executor.submit(() -> {
            Thread.sleep(5000);
            return "This should not be reached if cancelled";
        });

        // 模拟执行一段时间后决定取消任务
        Thread.sleep(1000);

        // 尝试取消任务(参数true表示可以中断正在执行的任务)
        boolean cancelled = future.cancel(true);
        System.out.println("任务取消" + (cancelled ? "成功" : "失败"));

        // 检查任务状态
        System.out.println("任务是否取消: " + future.isCancelled());
        System.out.println("任务是否完成: " + future.isDone());

        try {
            // 尝试获取已取消任务的结果会抛出CancellationException
            String result = future.get();
            System.out.println(result);
        } catch (CancellationException e) {
            System.out.println("无法获取结果,任务已被取消");
        }

        executor.shutdown();
    }
}

应用场景

场景1:并行处理多个任务

import java.util.concurrent.*;

public class MultipleFuturesDemo {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(3);

        // 提交多个任务
        Future<String> future1 = executor.submit(() -> {
            Thread.sleep(1000);
            return "Result from Task 1";
        });

        Future<String> future2 = executor.submit(() -> {
            Thread.sleep(1500);
            return "Result from Task 2";
        });

        Future<String> future3 = executor.submit(() -> {
            Thread.sleep(800);
            return "Result from Task 3";
        });

        // 获取所有结果(会按调用顺序阻塞)
        System.out.println(future1.get());
        System.out.println(future2.get());
        System.out.println(future3.get());

        executor.shutdown();
    }
}

场景2:电商价格聚合案例

import java.util.concurrent.*;

public class PriceAggregator {
    private ExecutorService executor = Executors.newFixedThreadPool(3);

    public double getFinalPrice(String product) throws Exception {
        // 并行获取不同平台的价格
        Future<Double> taobaoPrice = executor.submit(() -> getPriceFromTaobao(product));
        Future<Double> jdPrice = executor.submit(() -> getPriceFromJD(product));
        Future<Double> pddPrice = executor.submit(() -> getPriceFromPDD(product));

        // 模拟其他处理
        System.out.println("正在处理其他业务逻辑...");
        Thread.sleep(500);

        // 获取所有价格并计算最低价
        double finalPrice = Math.min(taobaoPrice.get(), Math.min(jdPrice.get(), pddPrice.get()));

        return finalPrice;
    }

    private double getPriceFromTaobao(String product) throws InterruptedException {
        Thread.sleep(1000);
        return 199.0; // 模拟价格
    }

    private double getPriceFromJD(String product) throws InterruptedException {
        Thread.sleep(1200);
        return 219.0; // 模拟价格
    }

    private double getPriceFromPDD(String product) throws InterruptedException {
        Thread.sleep(800);
        return 189.0; // 模拟价格
    }

    public static void main(String[] args) throws Exception {
        PriceAggregator aggregator = new PriceAggregator();
        double price = aggregator.getFinalPrice("iPhone 13");
        System.out.println("最终价格: " + price);
        aggregator.executor.shutdown();
    }
}

Future实现类

JUC 并发包中,Future 接口有几个重要的实现类,每个实现类都有其特定的使用场景和优势:

  1. FutureTask:最基础的 Future 实现,同时实现了 Runnable 接口
  2. ScheduledFuture:用于定时任务的 Future 实现
  3. ForkJoinTask:Fork/Join 框架中的 Future 实现
  4. CompletableFuture(Java 8+):功能最丰富的Future实现

FutureTask

FutureTask 是 Future 接口的一个实现类,特点:

  • 同时实现了 Future 和 Runnable 接口,因此可作为任务提交给线程池 Executor 执行
  • 只能执行一次,不能重复使用
  • 内部基于AQS(AbstractQueuedSynchronizer)实现状态控制

示例

import java.util.concurrent.FutureTask;

public class FutureTaskDemo {
    public static void main(String[] args) throws Exception {
        // 创建FutureTask
        FutureTask<Integer> futureTask = new FutureTask<>(() -> {
            int sum = 0;
            for (int i = 0; i < 100; i++) {
                sum += i;
                Thread.sleep(10);
            }
            return sum;
        });

        // 创建线程执行FutureTask
        new Thread(futureTask).start();

        // 主线程可以继续做其他事情
        System.out.println("主线程正在处理其他任务...");
        Thread.sleep(500);

        // 获取计算结果
        if (!futureTask.isDone()) {
            System.out.println("计算任务还未完成,请稍候...");
        }

        int result = futureTask.get();
        System.out.println("计算结果: " + result);
    }
}

案例:缓存系统实现

基于 FutureTask 实现的高并发缓存系统,解决了缓存系统中常见的”缓存击穿”问题,并提供了线程安全的缓存访问机制:

import java.math.BigInteger;
import java.util.Map;
import java.util.concurrent.*;

public class CacheSystem<K, V> {
    private final Map<K, FutureTask<V>> cache = new ConcurrentHashMap<>();
    private final Computable<K, V> computable;

    public CacheSystem(Computable<K, V> computable) {
        this.computable = computable;
    }

    public V get(final K key) throws InterruptedException {
        while (true) {
            FutureTask<V> future = cache.get(key);
            if (future == null) {
                FutureTask<V> newTask = new FutureTask<>(() -> computable.compute(key));
                future = cache.putIfAbsent(key, newTask);
                if (future == null) {
                    future = newTask;
                    future.run(); // 开始计算
                }
            }

            try {
                return future.get();
            } catch (CancellationException e) {
                cache.remove(key);
            } catch (ExecutionException e) {
                throw launderThrowable(e.getCause());
            }
        }
    }

    private RuntimeException launderThrowable(Throwable t) {
        if (t instanceof RuntimeException) return (RuntimeException) t;
        else if (t instanceof Error) throw (Error) t;
        else throw new IllegalStateException("Not unchecked", t);
    }

    public interface Computable<K, V> {
        V compute(K key) throws InterruptedException;
    }
}

// 使用示例
class CacheDemo {
    public static void main(String[] args) {
        CacheSystem<String, BigInteger> cache = new CacheSystem<>(key -> {
            // 模拟耗时计算
            Thread.sleep(1000);
            return new BigInteger(key);
        });

        ExecutorService executor = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            final String key = "12345";
            executor.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() +
                            " 获取结果: " + cache.get(key));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        executor.shutdown();
    }
}

工作流程:

[客户端调用get(key)]
       |
       v
检查缓存是否存在 --> 存在 --> 返回缓存结果
       |
       v
不存在 --> 创建FutureTask --> 放入缓存 --> 执行计算
       |
       v
其他线程再次请求相同key --> 获取已存在的FutureTask --> 等待计算结果

使用场景:

// 电商系统-商品详情缓存
CacheSystem<Long, ProductDetail> productCache = new CacheSystem<>(productId -> {
    // 从数据库获取商品详情(耗时操作)
    return productDao.getDetailById(productId);
});

// 获取商品详情(数万QPS场景下能有效防止缓存击穿)
ProductDetail detail = productCache.get(12345L);

ScheduledFuture与定时任务

ScheduledFuture 用于表示延迟或周期性任务的执行结果,通常与 ScheduledExecutorService 配合使用。

核心特性

  • 延迟/周期性任务:支持固定延迟(scheduleWithFixedDelay)或固定速率(scheduleAtFixedRate)的任务调度。
  • 继承自 Future:提供任务取消、结果获取等基础功能。
  • 时间控制:通过 getDelay() 方法获取剩余延迟时间。

示例

ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
ScheduledFuture<?> future = executor.schedule(
        () -> System.out.println("Task executed after 1 second"),
        1, TimeUnit.SECONDS
);

案例:订单超时取消

import java.util.Map;
import java.util.concurrent.*;

public class OrderTimeoutSystem {
    private final ScheduledExecutorService scheduler =
            Executors.newScheduledThreadPool(1);
    private final Map<Long, ScheduledFuture<?>> orderTasks = new ConcurrentHashMap<>();

    public void placeOrder(long orderId) {
        // 提交定时任务,30分钟后检查订单状态
        ScheduledFuture<?> future = scheduler.schedule(() -> {
            if (!isOrderPaid(orderId)) {
                cancelOrder(orderId);
                orderTasks.remove(orderId);
            }
        }, 30, TimeUnit.MINUTES);

        orderTasks.put(orderId, future);
    }

    public void orderPaid(long orderId) {
        ScheduledFuture<?> future = orderTasks.get(orderId);
        if (future != null) {
            future.cancel(false); // 订单已支付,取消定时任务
            orderTasks.remove(orderId);
        }
    }

    private boolean isOrderPaid(long orderId) {
        // 查询数据库或缓存
        return false;
    }

    private void cancelOrder(long orderId) {
        System.out.println("订单超时取消: " + orderId);
        // 实际业务中会更新订单状态等操作
    }

    public void shutdown() {
        scheduler.shutdown();
    }
}

// 使用示例
class OrderTimeoutDemo {
    public static void main(String[] args) throws InterruptedException {
        OrderTimeoutSystem system = new OrderTimeoutSystem();

        // 模拟订单创建
        long orderId = 10001L;
        system.placeOrder(orderId);
        System.out.println("订单创建: " + orderId);

        // 模拟15分钟后支付
        Thread.sleep(TimeUnit.MINUTES.toMillis(15));
        system.orderPaid(orderId);
        System.out.println("订单已支付: " + orderId);

        system.shutdown();
    }
}

ForkJoinTask

用途:支持分治(Divide-and-Conquer)任务的抽象基类,通常用于 ForkJoinPool 中处理递归或并行任务。

核心特性

  • 分治策略:通过 fork() 分解任务,join() 合并结果。
  • 自动负载均衡ForkJoinPool 使用工作窃取(Work-Stealing)算法,空闲线程从忙碌线程的队列尾部窃取任务。
  • 两种子类
    • RecursiveTask:返回结果的任务。
    • RecursiveAction:无返回结果的任务。

示例(计算斐波那契数列):

斐波那契数列的定义:

  • F(0) = 0
  • F(1) = 1
  • F(n) = F(n-1) + F(n-2) (当 n > 1 时)
    例如:0, 1, 1, 2, 3, 5, 8, 13...
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

class FibonacciTask extends RecursiveTask<Integer> {
    final int n;

    FibonacciTask(int n) {
        this.n = n;
    }

    protected Integer compute() {
        if (n <= 1) return n;
        FibonacciTask f1 = new FibonacciTask(n - 1);
        f1.fork(); // 分解任务
        FibonacciTask f2 = new FibonacciTask(n - 2);
        return f2.compute() + f1.join(); // 合并结果
    }
}

class FibonacciTest {
    public static void main(String[] args) {
        int n = 10; // 计算斐波那契数列的第10项
        ForkJoinPool pool = new ForkJoinPool(); // 创建ForkJoin线程池
        FibonacciTask task = new FibonacciTask(n);
        int result = pool.invoke(task); // 提交任务并获取结果
        System.out.println("F(" + n + ") = " + result); // 输出结果
        pool.shutdown(); // 关闭线程池
    }
}

分治逻辑:

  • 将问题拆解为计算 F(n-1)F(n-2) 的子任务。
  • fork() 提交一个子任务到线程池异步执行,compute() 同步执行另一个子任务。
  • join() 等待异步任务的完成并获取结果。

案例:统计大量字符串出现的频率

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

public class DataProcessor extends RecursiveTask<Map<String, Integer>> {
    // 任务拆分的阈值,当 数据量≤1000 时直接处理
    private static final int THRESHOLD = 1000;
    private final List<String> data; // 待处理的字符串列表

    public DataProcessor(List<String> data) {
        this.data = data;
    }

    /**
     *
     * @return 一个统计字符串频率的 Map
     */
    @Override
    protected Map<String, Integer> compute() {
        if (data.size() <= THRESHOLD) {
            return processDirectly(); // 数据量小,直接统计
        } else {
            int mid = data.size() / 2;
            DataProcessor leftTask = new DataProcessor(data.subList(0, mid));
            DataProcessor rightTask = new DataProcessor(data.subList(mid, data.size()));

            leftTask.fork(); // 异步提交左半部分任务
            Map<String, Integer> rightResult = rightTask.compute(); // 同步处理右半部分
            Map<String, Integer> leftResult = leftTask.join(); // 等待左半部分结果

            // 合并左右结果
            rightResult.forEach((k, v) ->
                    leftResult.merge(k, v, Integer::sum));
            return leftResult;
        }
    }

    // 遍历列表,使用 map.merge 方法统计词频
    private Map<String, Integer> processDirectly() {
        Map<String, Integer> result = new HashMap<>();
        for (String item : data) {
            result.merge(item, 1, Integer::sum);
        }
        return result;
    }
}

// 使用示例
class ForkJoinDemo {
    public static void main(String[] args) {
        // 模拟大数据集。生成 10_000 个随机字符串(格式为 item-0 到 item-99)
        List<String> bigData = new ArrayList<>();
        for (int i = 0; i < 10_000; i++) {
            bigData.add("item-" + ThreadLocalRandom.current().nextInt(100));
        }

        ForkJoinPool pool = new ForkJoinPool();
        DataProcessor task = new DataProcessor(bigData);
        Map<String, Integer> result = pool.invoke(task);

        System.out.println("处理结果大小: " + result.size());
        // 按词频降序排序,输出前5个高频词
        result.entrySet().stream()
                .sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
                .limit(5)
                .forEach(e -> System.out.println(e.getKey() + ": " + e.getValue()));
    }
}

CompletableFuture

CompletableFuture 是 Java 8 引入的异步编程工具。

用途:支持异步任务的链式调用、组合和异常处理,是 Future 的增强版。

核心特性

  • 非阻塞回调:通过 thenApplythenAccept 等方法链式处理结果。
  • 任务组合:支持 thenCombineallOfanyOf 等多任务协作。
  • 异常处理:通过 exceptionallyhandle 方法捕获异常。
  • 手动完成:可通过 complete()completeExceptionally() 主动干预任务状态。

适用场景

  1. 并行执行多个任务(如调用多个微服务)。
  2. 异步任务编排(如 A 完成后触发 B)。
  3. 超时控制orTimeout, completeOnTimeout)。
  4. 异常恢复exceptionally, handle)。

常用方法:

方法 用途
supplyAsync / runAsync 创建异步任务
thenApply / thenAccept / thenRun 链式回调
thenCompose 链式嵌套 Future
thenCombine / allOf / anyOf 组合多个 Future
exceptionally / handle 异常处理
complete / completeExceptionally 手动控制 Future

示例

CompletableFuture.supplyAsync(() -> "Hello")
    .thenApply(s -> s + " World")
    .thenAccept(System.out::println)
    .exceptionally(ex -> { ex.printStackTrace(); return null; });

执行异步任务

1、supplyAsync - 执行有返回值的异步任务

// 异步获取用户信息
CompletableFuture<String> getUserInfo = CompletableFuture.supplyAsync(() -> {
    // 模拟网络请求
    try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
    return "User: Alice, Age: 25";
});

System.out.println(getUserInfo.get()); // 阻塞获取结果
  • 默认使用 ForkJoinPool.commonPool()

  • 可指定自定义线程池:

    ExecutorService executor = Executors.newFixedThreadPool(4);
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Task", executor);
    

2、runAsync - 执行无返回值的异步任务

// 异步记录日志
CompletableFuture<Void> logTask = CompletableFuture.runAsync(() -> {
    System.out.println("Logging operation completed at " + System.currentTimeMillis());
});

logTask.join(); // 等待任务完成

链式调用(回调处理)

链式调用(Chaining Calls)允许以流水线(Pipeline)的方式将多个异步操作串联起来,形成清晰的任务处理流程。

将多个操作通过方法连续调用的方式连接在一起,前一个操作的输出作为后一个操作的输入,形成”任务流水线”。

1、thenApply - 转换结果,对前一个任务的结果进行转换。

// 异步获取用户名后转为大写
CompletableFuture<String> upperCaseName = CompletableFuture.supplyAsync(() -> "alice")
    .thenApply(name -> name.toUpperCase());

System.out.println(upperCaseName.get()); // "ALICE"

2、thenAccept - 消费结果,获取前一个任务的结果并进行消费(无返回值)。

// 异步查询订单后发送邮件
CompletableFuture.supplyAsync(() -> "Order#12345")
    .thenAccept(order -> System.out.println("Sending confirmation for " + order));

3、thenRun - 任务完成后执行操作,前一个任务完成后执行某个操作,不关心结果。

// 数据库备份完成后通知管理员
CompletableFuture.runAsync(() -> backupDatabase())
    .thenRun(() -> System.out.println("Notifying admin: Backup complete"));

4、thenCompose - 链式嵌套 Future,用于链式调用多个 CompletableFuture,避免回调地狱。

// 先查用户ID,再查用户详情
CompletableFuture<Integer> userIdFuture = CompletableFuture.supplyAsync(() -> 1001);

CompletableFuture<String> userDetailFuture = userIdFuture.thenCompose(id -> 
    CompletableFuture.supplyAsync(() -> "User Details for ID: " + id)
);

System.out.println(userDetailFuture.get()); // "User Details for ID: 1001"

Q1:thenApplythenCompose 的区别?

  • thenApply:接收前序结果,返回普通值

    .thenApply(v -> v.toUpperCase()) // 返回 String
    
  • thenCompose:接收前序结果,返回新的 CompletableFuture

    .thenCompose(v -> queryFromDB(v)) // 返回 CompletableFuture<String>
    

Q2:电商订单处理链

// 1. 查询用户信息 → 2. 查询商品 → 3. 计算价格 → 4. 生成订单
CompletableFuture.supplyAsync(() -> getUser(userId))
    .thenApply(user -> getProduct(user, productId))
    .thenApply(product -> calculatePrice(product))
    .thenAccept(price -> createOrder(price))
    .exceptionally(ex -> {
        log.error("Order failed", ex);
        return null;
    });

组合多个Future

1、thenCombine - 合并两个 Future 的结果,等待两个任务完成,并对结果进行合并。

// 并行查询商品价格和库存,然后计算总价值
CompletableFuture<Double> priceFuture = CompletableFuture.supplyAsync(() -> 19.99);
CompletableFuture<Integer> stockFuture = CompletableFuture.supplyAsync(() -> 100);

CompletableFuture<BigDecimal> totalValue = priceFuture.thenCombine(stockFuture,
        (price, stock) -> new BigDecimal(String.valueOf(price)).multiply(new BigDecimal(stock))
);

System.out.println(totalValue.get().toString()); // 1999.0

2、allOf - 等待所有 Future 完成。并行执行多个任务,并在所有任务完成后触发回调。

// 批量调用多个API,全部完成后处理数据
CompletableFuture<String> api1 = CompletableFuture.supplyAsync(() -> "Data1");
CompletableFuture<String> api2 = CompletableFuture.supplyAsync(() -> "Data2");
CompletableFuture<String> api3 = CompletableFuture.supplyAsync(() -> "Data3");

CompletableFuture<Void> allApis = CompletableFuture.allOf(api1, api2, api3);

allApis.thenRun(() -> {
    System.out.println("All APIs completed");
    // 获取各个任务的结果
    try {
        String result1 = api1.get();
        String result2 = api2.get();
        String result3 = api3.get();
        System.out.println(result1 + ", " + result2 + ", " + result3);
    } catch (Exception e) {
        e.printStackTrace();
    }
});

3、anyOf - 任意一个 Future 完成时触发。多个任务竞争,只要有一个完成就触发回调。

// 多个服务提供相同数据,取最先返回的
CompletableFuture<String> serviceA = CompletableFuture.supplyAsync(() -> {
    try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
    return "Service A Result";
});

CompletableFuture<String> serviceB = CompletableFuture.supplyAsync(() -> {
    try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
    return "Service B Result";
});

CompletableFuture<Object> fastestResult = CompletableFuture.anyOf(serviceA, serviceB);

System.out.println(fastestResult.get()); // "Service B Result"(因为B更快)

异常处理

1、exceptionally - 捕获异常并返回默认值。类似于 try-catch,在异常时提供备用值。

// 查询用户信息失败时返回默认用户
CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> {
    if (Math.random() > 0.5) {
        throw new RuntimeException("Failed to fetch user");
    }
    return "User Data";
}).exceptionally(ex -> {
    System.err.println("Error: " + ex.getMessage());
    return "Default User";
});

System.out.println(userFuture.get());

2、handle - 无论成功或失败都处理。无论任务成功还是失败,都会执行回调。

// 处理任务结果或异常
CompletableFuture<Integer> parsedFuture = CompletableFuture.supplyAsync(() -> "123")
    .handle((str, ex) -> {
        if (ex != null) {
            return -1; // 解析失败返回默认值
        }
        try {
            return Integer.parseInt(str);
        } catch (NumberFormatException e) {
            return -1;
        }
    });

System.out.println(parsedFuture.get()); // 123 或 -1(如果异常)

手动控制Future

1、complete - 手动设置结果。强制完成任务并设置结果。

// 超时手动完成任务
CompletableFuture<String> future = new CompletableFuture<>();

// 模拟超时控制
new Thread(() -> {
    try {
        Thread.sleep(2000);
        future.complete("Fallback Data");
    } catch (InterruptedException e) {
        future.completeExceptionally(e);
    }
}).start();

System.out.println(future.get()); // "Fallback Data"(如果超时)

2、completeExceptionally - 手动设置异常。强制任务失败并抛出异常。

// 任务验证失败时主动终止
CompletableFuture<String> future = new CompletableFuture<>();

if (invalidInput) {
    future.completeExceptionally(new IllegalArgumentException("Invalid input"));
} else {
    future.complete("Valid Result");
}

对比总结

核心场景 是否支持链式调用 任务类型 底层机制
FutureTask 手动包装异步任务 一次性任务 线程池或单独线程执行
ScheduledFuture 延迟/周期性任务 单次或周期性 基于时间调度
ForkJoinTask 分治任务(递归、并行) 可分解的子任务 工作窃取线程池
CompletableFuture 异步回调、组合多任务 ✔️ 链式或组合任务 回调链 + 线程池

如何选择?

  1. 需要延迟/周期任务ScheduledFuture
  2. 简单异步任务封装FutureTask
  3. 递归或并行计算ForkJoinTask
  4. 复杂异步流程(如回调、组合)CompletableFuture

YOLO