FutureTask 是 Java 并发包 JUC 中一个重要的类,它实现了 Future 接口和 Runnable 接口,能够确保在高并发环境下任务只执行一次:

classDiagram
direction BT
class FunctionalInterface
class Future~V~ {
<<Interface>>

}
class FutureTask~V~
class Runnable {
<<Interface>>

}
class RunnableFuture~V~ {
<<Interface>>

}

FutureTask~V~  ..>  RunnableFuture~V~ 
FunctionalInterface  ..  Runnable 
RunnableFuture~V~  -->  Future~V~ 
RunnableFuture~V~  -->  Runnable 

源码分析

1. 状态机设计

FutureTask内部维护了一个状态变量state,定义了任务的7种状态:

private volatile int state;
private static final int NEW          = 0; // 初始状态
private static final int COMPLETING   = 1; // 正在完成
private static final int NORMAL       = 2; // 正常完成
private static final int EXCEPTIONAL  = 3; // 异常完成
private static final int CANCELLED    = 4; // 已取消
private static final int INTERRUPTING = 5; // 正在中断
private static final int INTERRUPTED  = 6; // 已中断

2. CAS操作保证原子性

状态变更使用Unsafe类的CAS操作保证原子性:

// java.util.concurrent.FutureTask#run
public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    // ...
}

// sun.misc.Unsafe#compareAndSwapObject
private boolean compareAndSetState(int expect, int update) {
    return UNSAFE.compareAndSwapInt(this, stateOffset, expect, update);
}

3. 执行逻辑保护

run()方法中,通过状态检查确保任务只执行一次:

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

确保任务只执行一次的关键点

  1. 初始状态检查:只有在NEW状态时才可能执行任务
  2. CAS设置runner:确保只有一个线程能成功设置当前线程为执行者
  3. 二次状态检查:在真正执行前再次检查状态
  4. 状态转换原子性:使用CAS操作保证状态变更的原子性

丰富的测试案例

测试案例1:基本单次执行验证

@Test
public void testExecuteOnlyOnce() throws Exception {
    AtomicInteger counter = new AtomicInteger(0);
    Callable<Integer> task = () -> counter.incrementAndGet();
    FutureTask<Integer> futureTask = new FutureTask<>(task);

    new Thread(futureTask).start();
    new Thread(futureTask).start(); // 第二个线程尝试执行

    assertEquals(1, futureTask.get().intValue());
    assertEquals(1, counter.get()); // 确保只执行了一次
}

测试案例2:高并发环境测试

@Test
public void testHighConcurrency() throws Exception {
    int threadCount = 100;
    ExecutorService executor = Executors.newFixedThreadPool(threadCount);
    AtomicInteger counter = new AtomicInteger(0);
    Callable<Integer> task = () -> {
        Thread.sleep(100); // 模拟耗时操作
        return counter.incrementAndGet();
    };
    FutureTask<Integer> futureTask = new FutureTask<>(task);

    List<Future<?>> futures = new ArrayList<>();
    for (int i = 0; i < threadCount; i++) {
        futures.add(executor.submit(futureTask));
    }

    // 所有线程应该返回相同的结果
    Integer result = futureTask.get();
    for (Future<?> future : futures) {
        assertEquals(result, future.get());
    }

    assertEquals(1, counter.get()); // 确保只执行了一次
    executor.shutdown();
}

测试案例3:取消任务后的行为

@Test
public void testAfterCancellation() throws Exception {
    AtomicBoolean executed = new AtomicBoolean(false);
    FutureTask<String> futureTask = new FutureTask<>(() -> {
        executed.set(true);
        return "result";
    });

    // 先取消任务
    futureTask.cancel(true);
    // 然后尝试执行
    futureTask.run();

    assertFalse(executed.get()); // 任务不应执行
    assertTrue(futureTask.isCancelled());
}

测试案例4:异常情况处理

@Test
public void testExceptionHandling() {
    FutureTask<String> futureTask = new FutureTask<>(() -> {
        throw new RuntimeException("Test exception");
    });

    new Thread(futureTask).start();
    new Thread(futureTask).start(); // 第二次尝试执行

    try {
        futureTask.get();
        fail("Should throw exception");
    } catch (Exception e) {
        assertTrue(e.getCause() instanceof RuntimeException);
    }
}

测试案例5:混合读写操作

@Test
public void testMixedReadWriteOperations() throws Exception {
    AtomicInteger counter = new AtomicInteger(0);
    FutureTask<Integer> futureTask = new FutureTask<>(() -> {
        Thread.sleep(50);
        return counter.incrementAndGet();
    });

    // 启动多个线程,有的执行任务,有的直接获取结果
    ExecutorService executor = Executors.newFixedThreadPool(10);
    List<Future<Integer>> results = new ArrayList<>();

    for (int i = 0; i < 5; i++) {
        results.add(executor.submit(futureTask)); // 执行任务
    }
    for (int i = 0; i < 5; i++) {
        results.add(executor.submit(() -> futureTask.get())); // 只获取结果
    }

    // 所有结果应该相同
    Integer expected = results.get(0).get();
    for (Future<Integer> result : results) {
        assertEquals(expected, result.get());
    }

    assertEquals(1, counter.get()); // 确保只执行了一次
    executor.shutdown();
}

结论

FutureTask通过以下机制确保在高并发环境下任务只执行一次:

  1. 使用volatile变量保证状态可见性
  2. 使用CAS操作保证状态变更的原子性
  3. 在关键执行路径上进行多重状态检查
  4. 通过runner字段确保只有一个线程能真正执行任务

这种设计使得FutureTask非常适合作为高并发环境下的一次性任务执行器,既保证了线程安全,又避免了不必要的资源浪费。


YOLO