FutureTask 源码解析

在这篇文章中,我将采用 “自顶向下” 的方式来讨论 FutureTask 的源码。在深入分析每个 FutureTask 的方法的源码之前,我都会先概述这个方法的整体执行逻辑,以便提供一个更清晰的理解框架。在大家对方法的执行逻辑有了整体了解之后,我再以注释的形式对源码进行讨论,并尽可能指出某行源码对应的是方法整体执行逻辑中的哪一步。

通常情况下,我们不会直接创建 FutureTask 实例。当我们通过 ExecutorService 接口的 submit() 方法向线程池提交任务时,线程池会将任务封装成一个 FutureTask 实例,从而间接创建一个 FutureTask 实例。需要注意的是,如果使用 execute() 方法提交任务,任务是不会被封装成 FutureTask 实例的。

在深入讨论 FutureTask 源码之前,为了帮助大家更好地理解,我们需要先了解几个基本前提。

  • FutureTask 的 get() 方法可能会被多个线程同时调用,也就是说 get() 方法存在并发。
  • FutureTask 的 run() 方法也同样有可能会被多个线程同时调用,但最终只会有一个线程在 run() 方法的内部成功抢夺到执行权,其它抢夺执行权失败的线程会直接从 run() 方法中返回。
  • FutureTask 的 cancel() 方法也存在并发,可能会有多个线程同时调用 cancel() 方法去尝试撤销任务,与 run() 方法一样,最终只会有一个线程成功获取到 cancel() 方法的执行权。
get()、run()、cancel() 方法均存在并发

图 1 get()run()、以及 cancel() 方法均存在并发

了解完这些基本前提,才能更好地理解为什么在 FutureTask 的源码中,有些地方要使用 CAS(compare and swap)方式赋值。

FutureTask 的字段

我们先来看看和 FutureTask 的状态有关的字段。

代码清单 1 FutureTask 的字段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class FutureTask<V> implements RunnableFuture<V> {
/**
* {@link FutureTask} 的当前状态。
*/
private volatile int state;

/**
* 表示当前任务尚未开始执行。
*/
private static final int NEW = 0;

/**
* 表示当前任务已经执行完成,但还没有将任务执行结果赋值给 {@link FutureTask} 的 {@code outcome} 字段。
* 此时任务虽然已经执行完成,但任务的执行结果还保存在执行任务的线程的方法栈中。
* 在执行任务的线程把该结果赋值给 {@code outcome} 字段后,其它等待结果的线程才能看到这个结果。
* 而 {@code COMPLETING} 字段表示的就是这样一种临界状态,或者说中间状态。
*/
private static final int COMPLETING = 1;

/**
* 表示任务成功执行完成,期间没有抛出任何异常。
*/
private static final int NORMAL = 2;

/**
* 表示任务执行期间抛出了异常。
*/
private static final int EXCEPTIONAL = 3;

/**
* 表示任务被用户取消。
*/
private static final int CANCELLED = 4;

/**
* 表示线程正在被中断。
*/
private static final int INTERRUPTING = 5;

/**
* 表示线程已经被中断。
*/
private static final int INTERRUPTED = 6;

/**
* 如果任务成功执行完成,即任务执行期间没有抛出异常,则该字段保存的是任务的执行结果;
* 否则保存的是任务执行期间抛出的异常。
*/
private Object outcome;

/**
* 执行任务的线程(我们不妨形象地称之为执行者线程)。
*/
private volatile Thread runner;

/**
* 封装了用户提交的任务代码,执行者线程执行的就是该任务代码。
*/
private Callable<V> callable;

/**
* 正如上文所述,可能会有多个线程同时调用并执行 {@link #get} 方法来获取任务执行结果。
* 如果任务还没有执行完成,那么调用 {@link #get} 方法的线程就会被阻塞(LockSupport::park)。
* 而该 {@link #waiters} 字段表示的是等待队列,保存的正是这些被 park 的阻塞线程。
* 一旦任务执行完成,就要 unpark 等待队列中的线程。
*/
private volatile WaitNode waiters;

...
}

代码清单 1 FutureTask 的字段

我们从上述代码可以看到,FutureTask 一共有六种状态,分别是 COMPLETING、NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTING、INTERRUPTED。FutureTask 所有可能的状态转移路径如下图所示。

FutureTask 状态转移路径

图 2 FutureTask 状态转移路径

在后续小节中,我们将结合源码来深入讨论 FutureTask 的这六种状态。读者可以结合这张 FutureTask 的状态转移路径图来更好地阅读和理解本文的内容。

在 FutureTask 的六种状态中,有两种状态(INTERRUPTING 和 INTERRUPTED)与中断有关。关于中断,我们需要知道的是:在任务执行期间,如果有其它线程调用了执行者线程(执行任务的线程,请参阅代码清单 1 中的 runner 字段)的 interrupt() 方法,执行者线程并不一定会抛出中断异常。这完全取决于执行者线程正在执行的任务代码本身是否包含响应中断的代码逻辑。如果任务代码中没有相应中断的代码逻辑,执行者线程将继续执行,而不会受到任何影响。

如果需要响应中断,任务代码该如何编写呢?其实很简单,只需在任务代码中使用 try-catch 块捕获 InterruptedException 异常即可。

FutureTask 的构造函数

我们接着看看 FutureTask 的构造函数。FutureTask 有两个构造函数,分别是:

  • FutureTask(Callable callable)
  • FutureTask(Runnable rnunable, V result)

形参类型为 Callable 接口的这个构造函数其实很简单,它只是简单地把用户提交的任务保存到 callable 字段中,如代码清单 2 所示。

代码清单 2 FutureTask(Callable callable) 构造函数
1
2
3
4
5
6
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}

代码清单 2 FutureTask(Callable callable) 构造函数

Callable 类型的任务有返回值,而 FutureTask 的 get() 方法的返回值就是 Callable 任务的返回值,这点我们很好理解。但 Runnable 任务没有返回值,那么 FutureTask 是如何获取并返回 Runnable 任务的执行结果的呢?要解答这个问题,让我们直接看源码,如代码清单 3 所示。

代码清单 3 FutureTask(Runnable, V) 构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* {@link Runnable#run()} 方法没有返回值,所以它肯定返回不了任务的执行结果。
* 因此该版本的构造函数额外接受了一个 result 参数。
* 这个 result 参数值就是任务的返回结果。
* 若任务执行成功,则直接返回该 result 参数的值。
*/
public FutureTask(Runnable runnable, V result) {
/**
* 这里其实就是把 runnable 和 result 包装成 Callable。
* 怎么包装呢?答案是使用 {@link Executors#callable(Runnable)} 方法。
* {@link Executors#callable(Runnable)} 方法内部其实就是使用装饰者模式把 runnable 和 result 包装成一个 Runnable。
*/
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}

代码清单 3 FutureTask(Runnable, V) 构造函数

run() 方法

run() 方法是执行者线程执行任务代码的入口。run() 方法是存在并发的。线程之间会通过 CAS 方式抢夺任务的执行权,哪个线程成功地通过 CAS 方式把自己设置到 FutureTask 对象的 runner 字段,就表示哪个线程就成功抢夺到了任务的执行权。

run() 方法的整体执行流程如下所述:

  1. 如果任务的状态为 NEW,并且通过 CAS 的方式成功抢夺到了任务的执行权,则开始执行任务。而在执行任务代码之前,还会对任务的状态进行一次检测:如果状态依然为 NEW,则正式开始执行任务代码,否则放弃执行;
  2. 如果任务成功执行完成,则通过 set() 方法把任务执行结果赋值给 outcome 字段;如果任务执行期间抛出了异常,则通过 setException() 方法把抛出的异常赋值给 outcome 字段。
    如果在任务执行期间,任务的状态发生了变化,比如有线程调用 FutureTask 的 cancel() 方法取消了任务,那么任务的状态就会变为 CANCELLED 或 INTERRUPTING 或 INTERRUPTED。但无论是哪一种状态,只要不是 NEW,那么任务执行完成后,无论执行成功还是执行失败,都不会把任务执行结果或异常对象赋值给 outcome 字段,而是什么也不做,直接从 run() 方法返回。
  3. 当然,无论是哪种情况,即无论是执行成功,还是执行失败,抑或是任务执行期间任务的状态发生了变化,在从 run() 方法返回之前,都会检查一下当前任务是不是正处于 INTERRUPTING 状态(正在中断),如果是,则执行者线程会调用 Thread::yield 方法放弃处理器的使用权,等待其它线程完成对任务的中断工作,直到任务的状态变为 INTERRUPTED 就说明其它线程已经完成对任务的中断工作,这时执行者线程才会从 run() 方法返回。

结合上面描述的 run() 方法的总体执行流程,让我们看一下 run() 方法的源码,如代码清单 4 所示。

代码清单 4 run() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/**
* 任务执行入口。
*/
public void run() {
/*
* 条件 1:结合图 2 可知:
* 如果任务状态不为 NEW,那么就说明当前的任务已经被执行过了,或者已经被其它线程调用 cancel() 方法取消掉了。
* 总之,如果任务状态不为 NEW,那么线程将不会执行该任务。
*
* 条件 2:如果任务状态还为 NEW,那么线程将以 CAS 方式争抢该任务的执行权。
* CAS 的期望旧值是 null,期望新值是线程对象本身。
*
* 如果上面两个条件都不满足,那么直接返回。
*/
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;


try {
// callable 是 FutureTask 的一个字段。其中封装了用户所提交的任务代码。
Callable<V> c = callable;

/*
* 条件 1:用于防止用户提交一个空任务。
*
* 条件 2:线程在执行任务之前,会再次判断即将要执行的任务的状态是不是 NEW。
* 从上面第一个 if 语句到执行到这里的这段时间内,用户可能会改变主意,不再希望任务继续执行。
* 因此,在此期间,任务可能会被用户通过调用 cancel() 方法取消掉。
*
* 如果任务不为空,且没被取消,则开始执行任务。
*/
if (c != null && state == NEW) {

// 用于保存任务执行完成后的返回结果。
V result;
/*
* true 表示任务执行成功,期间未抛出异常;
* false 表示任务执行失败,期间抛出了异常。
*/
boolean ran;

try {
// c 表示用户提交的任务代码。
result = c.call();
/*
* 能执行到这里,说明任务执行成功且未抛出任何异常。
* 因此把 ran 设置为 true。
*/
ran = true;
} catch (Throwable ex) {
/*
* 如果抛出异常,说明用户提交的任务代码有 bug。
* 这是直接把返回结果设置为 null。
*/
result = null;
ran = false;

/*
* setException() 我会在下文再详细讨论。
* 这里我们只需要知道:如果执行任务期间抛出了异常,那么 setException() 方法将会被调用。
*
* 在 setException() 方法中,异常对象会被赋值给 FutureTask 的 outcome 字段。
*/
setException(ex);
}

/*
* set() 方法我也会在下文再详细讨论。
* 这里我们只需要知道:如果任务执行成功,那么 set() 方法将会被调用。
*
* 在 set() 方法中,任务的执行结果会被赋值给 FutureTask 的 outcome 字段。
*/
if (ran)
set(result);
}
} finally {
/*
* runner 字段存储的是执行任务的线程对象。
* 无论任务执行成功,还是执行失败(执行期间抛出了异常),都需要把 runner 字段置为 null。
*/
runner = null;

/*
* 无论任务执行成功,还是执行失败,抑或是执行期间任务的状态发生了变化(不再是 NEW),
* 都需要判断一下当前的任务状态是否为 INTERRUPTING。
* 如果是,则让权等待,直到任务的状态变为 INTERRUPTED;否则直接返回。
*/
int s = state;
if (s >= INTERRUPTING)
// 让权等待(Thread::yield)的逻辑在该方法中实现。
handlePossibleCancellationInterrupt(s);
}
}

代码清单 4 run() 方法

set() 方法

如果任务执行成功且任务状态没有发生变化(即依然为 NEW),则会通过 set() 方法将任务的执行结果保存到 outcome 字段,如代码清单 5 所示。

代码清单 5 set() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected void set(V v) {
/*
* 通过 CAS 方式将任务的状态设置为 COMPLETING。
* 这里对应的就是上文描述的 run() 方法的整体执行流程的第 2 步。
* 也就是说如果在任务执行期间,任务的状态发生了变化(任务被取消了),则什么也不做。
* 但这是小概率事件,大部分时候都会成功进入到 if 语句块。
*/
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
outcome = v;

// 将任务执行结果保存到 outcome 字段之后,马上将当前的任务状态修改为 NORMAL。
STATE.setRelease(this, NORMAL);

// 这个方法将在下文再详细讨论。
finishCompletion();
}
}

代码清单 5 set() 方法

setException() 方法

若在任务执行期间抛出了异常,则会执行 setException() 方法把抛出的异常赋值给 outcome 字段,如代码清单 6 所示。

代码清单 6 setException() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected void setException(Throwable t) {
/*
* 通过 CAS 方式将任务的状态设置为 COMPLETING。
* 这里对应的就是上文描述的 run() 方法的整体执行流程的第 2 步。
* 也就是说如果在任务执行期间,任务的状态发生了变化(任务被取消了),则什么也不做。
* 但这是小概率事件,大部分时候都会成功进入到 if 语句块。
*/
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
/*
* 进入到这个方法,说明任务执行期间一定抛出了异常。
* 不同于 set() 方法的是,这里保存到 outcome 字段的不是任务的执行结果,因为任务都没有执行成功。
* 保存到 outcome 字段的是任务执行期间所抛出的异常。
*/
outcome = t;

// 同样地,将异常对象赋值给 outcome 字段之后,马上将当前的任务状态修改为 EXCEPTIONAL。
STATE.setRelease(this, EXCEPTIONAL); // final state

// 下文会详细讨论这个方法。
finishCompletion();
}
}

代码清单 6 setException()

finishCompletion() 方法

代码清单 7 finishCompletion() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 哪些方法会调用 finishCompletion() 方法呢?
* 1. cancel() -> finishCompletion()。
* 2. run() --执行成功--> set() -> finishCompletion()。
* 3. run() --执行期间抛出异常--> setException() -> finishCompletion()。
* 所以可能会有多个线程同时执行 finishCompletion() 方法。
*/
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {

// 存在竞争,所以要通过 CAS 方式去争抢等待队列的操作权。
if (WAITERS.weakCompareAndSet(this, q, null)) {
for (;;) {
// 当前结点的 thread 字段。
Thread t = q.thread;

// 若不为 null,说明该线程还在等待执行结果,那么使用 unpark() 唤醒该线程。
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}

// next 表示后继结点。
WaitNode next = q.next;
// 若后继为 null,说明当前结点是链尾结点,则直接返回。
if (next == null)
break;

// 非必要代码,只是为了 help GC。
q.next = null;

// 前进到后继结点。
q = next;
}
break;
}
}

// 钩子方法。
done();
// help GC
callable = null;
}

代码清单 7 finishCompletion() 方法

handlePossibleCancellationInterrupt() 方法

最后我们再看一下 handlePossibleCancellationInterrupt() 方法,这个方法对应的就是上文描述的 run() 方法的整体执行流程的第 3 步,即无论任务执行成功与否,都会调用这个方法来检查一下任务状态是否为 INTERRUPTING,并据此决定要不要让权等待,如代码清单 8 所示。

代码清单 8 handlePossibleCancellationInterrupt() 方法
1
2
3
4
5
6
private void handlePossibleCancellationInterrupt(int s) {
// 如果任务处于 INTERRUPTING 状态,则让权等待,直到任务状态变为 INTERRUPTED。
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield();
}

代码清单 8 handlePossibleCancellationInterrupt() 方法

get() 方法

get() 方法的总体执行逻辑相对比较简单,可概括如下:

为了方便叙述,任务状态用 state 表示。

  1. state <= COMPLETING 时,表示如下三种情况之一:

    • 任务还没有开始执行;
    • 任务已经开始执行了,但还没有执行完;
    • 任务已经执行完了,但还没有将结果保存到 outcome 字段。

    无论是哪种情况,线程都会阻塞等待(阻塞等待的执行逻辑对应于 awaitDone() 方法)。

  2. 如果 state > COMPLETING,表示任务执行结果已经保存到 outcome 字段。但此时还不能确定任务到底是执行成功(NORMAL),还是执行失败(EXCEPTIONAL),异或是被取消(CANCELLED、INTERRUPTED)了。
    如果执行成功,则直接返回任务执行结果,否则抛出异常(这一步的执行逻辑对应于 report() 方法)。

代码清单 9 get() 方法
1
2
3
4
5
6
7
8
9
10
11
public V get() throws InterruptedException, ExecutionException {
// 线程的状态。
int s = state;

// 对应于上述执行流程的第 1 步。
if (s <= COMPLETING)
s = awaitDone(false, 0L);

// 对应于上述执行流程的第 2 步。
return report(s);
}

代码清单 9 get() 方法

可以看到,其实 get() 方法把核心的代码逻辑都放在了 awaitDone()report() 这两个方法中。所以接下来让我们直接看看这两个方法的源码。

awaitDone() 方法

awaitDone() 方法的整体执行逻辑可概括如下。

state 表示任务状态。

  1. 当前线程正是因为 state <= COMPLETING 才进入到 awaitDone() 方法,所以当前线程会先检查 state 是否为 COMPLETING。如果是,则调用 Thread::yield 方法进行让权等待;

    为什么不调用 LockSupport::park 进行阻塞等待,而是调用 Thread::yield 进行让权等待呢?

    从上文对 set()setException() 方法的讨论可知,执行者线程在将执行结果保存到 outcome 字段之后,就会马上将 state 修改为 NORMAL 或者 EXCEPTIONAL。也就是说 COMPLETING 状态的持续时间不会太长,而线程从调用 LockSupport::park 开始阻塞直至被 LockSupport::unpark 唤醒,期间的执行开销还是挺大的,所以没必要调用 LockSupport::park 进行阻塞等待。只需要调用 Thread::yield 放弃处理器使用权,等到下一次再被调度上处理器运行,state 大概率已经大于 COMPLETING。

  2. 如果 state 不是 COMPLETING,而是 NEW,则当前线程会先把自己包装成一个 WaitNode 结点,然后以 CAS 方式自旋入队(等待队列);

  3. 一旦入队成功,则调用 LockSupport::park 进行阻塞等待。

调用 LockSupport::park 阻塞等待任务执行结果的线程有两种被唤醒的方式:

  • 一种是被其它线程中断唤醒(LockSupport::park 是响应中断的);
  • 另一种唤醒方式是被其它线程使用 LockSupport::unpark 唤醒。

但不管是被哪种方式唤醒,阻塞线程被唤醒后都会再次检查任务状态。

  1. state > COMPLETING,则先出队(把 WaitNode 结点的 thread 字段置为 null),然后从 awaitDone() 方法返回并进入 report() 方法的代码逻辑中处理任务执行结果;
  2. state == COMPLETING,则调用 Thread::yield 进行让权等待;
  3. state == NEW,并且线程是被中断方式唤醒的,则出队(会调用 removeWaiter() 方法清理一下等待队列中 thread 字段为 null 的结点),然后直接抛出 InterruptedException 中断异常。

查看 FutureTask 的源码可知,如果线程是被其它线程通过调用 LockSupport::unpark 的方式唤醒的话,那任务状态必不可能是 NEW。因为只有 finishCompletion() 方法会调用 LockSupport::unpark 唤醒等待队列中的线程,而 finishCompletion() 方法又只会被 cancel()set()setException() 这三个方法调用,而这三个方法调用 finishCompletion() 方法之前都会先以 CAS 的方式把任务状态修改为大于 COMPLETING 的某一个状态,所以不可能出现线程被 LockSupport::unpark 方式唤醒了,任务状态却还是 NEW。

由于 awaitDone() 方法有 7 个 if-else 语句块,如果按序阅读这些 if-else 语句块可能很难领会到 awaitDone() 源码的核心思想,所以大家可以先按我标注的顺序阅读这些 if-else 语句块。

在对 awaitDone() 源码的执行逻辑有了基本的了解后,大家可以自行再按序阅读这些 if-else 语句块。同样地,我将以注释的方式讨论 awaitDone() 方法的源码,如代码清单 10 所示。

代码清单 10 get() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/**
* nanos 形参表示超时时间。即如果等待时间超过了 nanos 任务还没执行完,则直接返回到调用者的业务层代码。
* 会超时和不会超时的 awaitDone() 方法的区别是:
* - 会超时的 awaitDone() 方法是通过调用带超时参数的 park() 方法实现的;
* - 不会超时的 awaitDone() 方法是通过调用不带超时参数的 park() 方法实现的。
* 所以我们只需要看不会超时的 awaitDone() 方法即可。
*
* timed 为 false 表示不会超时,为 true 则表示会超时。
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {

// 当前线程在进入等待队列之前,需要先把自己包装进一个等待队列结点。
WaitNode q = null;

/*
* 表示当前线程是否已经入队(等待队列)。
* false 表示尚未入队;
* true 表示已经入队。
*/
boolean queued = false;

for (;;) {
// 任务的状态。
int s = state;

// -5-
/*
* 这里对应上文讨论的唤醒逻辑的第 1 步。
* 阻塞线程被唤醒后,若发现任务状态已经大于 COMPLETING,则出队并从 awaitDone() 方法返回。
*/
if (s > COMPLETING) {
/*
* 当前线程通过把等待队列中表示自己的那个结点的 thread 字段置空即代表出队。
*
* 但问题来了,谁来负责清理等待队列中的这些 thread 字段为 null 的无效结点呢?
* 答案就是:由 finishCompletion() 方法和 removeWaiter() 方法负责清理。
* 首先 finishCompletion() 方法只会被执行者线程和调用 cancel() 方法的线程所调用。
* removeWaiter() 方法只会被以中断方式唤醒的线程所调用。
*/
if (q != null)
q.thread = null;

return s;
}
// -6- // 这里也对应唤醒逻辑的第 2 步

// -1-
/*
* 这里对应上文描述的 awaitDone() 整体执行流程的第 1 步。
* 线程刚进入这个方法时,任务状态大概率小于等于 COMPLETING。
* 如果任务状态为 COMPLETING,则说明很快就能拿到任务执行结果了,所以让权等待一下就好。
*/
else if (s == COMPLETING)
Thread.yield();

/*
* 有两种情况,等待任务结果的线程会被中断:
* 1. 从进入 get() 方法到执行到这里期间被中断掉。(小概率事件)
* 2. 在 park() 中等待着任务的执行结果时,被中断唤醒后执行到这里。(park() 方法是响应中断的)
*
* 但无论是哪种情况,都可归纳总结为一种情况,即:
* 如果在任务还没有执行完成之前(此时任务状态为 NEW),当前等待任务结果的线程发生中断,那么当前线程会向上抛出中断异常。
*/
else if (Thread.interrupted()) {
/*
* 在抛出异常之前,先把自己从等待者队列中移除(出队)。
* 在自己出队的同时也移除字段 thread 为 null 的结点。
*/
removeWaiter(q);

throw new InterruptedException();
}

// -2-
/*
* 这里对应 awaitDone() 整体执行逻辑的第 2 步。
* 先把自己包装进一个等待队列结点,为入队做准备。
*/
else if (q == null) {
/*
* 这个 if 语句块和超时逻辑相关。
* 意思是如果会超时,且已达到超时时长,则从 awaitDone() 方法返回。
*/
if (timed && nanos <= 0L)
return s;

// 把当前线程包装进一个 WaitNode 结点。
q = new WaitNode();
}

// -3-
/*
* 这里同样对应 awaitDone() 整体执行流程的第 2 步。
* 执行到这里,说明当前任务已经把自己包装成一个等待队列结点。
* 线程使用头插法,以 CAS 方式自旋入队。
* CAS 的期望旧值为队头结点,期望新值为包装了当前线程的 WaitNode 结点。
*/
else if (!queued)
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);

// 这里是和超时相关的代码逻辑。
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}

// -4-
/*
* 这里对应 awaitDone() 整体执行流程的第 3 步。
* 入队成功后,调用 LockSupport::park 进行阻塞等待。
*/
else
LockSupport.park(this);
}
}

代码清单 10 get() 方法

至此,awaitDone() 方法的代码逻辑已讨论完毕。接下来让我们看看 report() 方法。

report() 方法

能执行到 report() 方法,说明任务执行结果肯定已经保存到 outcome 字段中。但此时还不能确定任务到底是执行成功(NORMAL),还是执行失败(EXCEPTIONAL),异或是被取消(CANCELLED、INTERRUPTED)了。

report() 方法就是要根据任务的状态进行相应的处理,如果执行成功,则返回保存在 outcome 字段中的结果,否则抛出异常。现在让我们直接来看 report() 方法的源码吧。

代码清单 11 report() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private V report(int s) throws ExecutionException {
// 任务执行结果
Object x = outcome;

// 如果当前任务状态为 NORMAL,说明任务执行成功,直接返回任务执行结果
if (s == NORMAL)
return (V)x;

// 如果任务状态 >= CANCELLED,说明当前任务被取消了,抛出取消异常
if (s >= CANCELLED)
throw new CancellationException();

// 能执行到这里,说明当前任务的状态是 EXCEPTIONAL,也就是说任务执行期间抛出了异常
// 向上抛出执行异常
throw new ExecutionException((Throwable)x);
}

代码清单 11 report() 方法

可以看到,report() 方法的源码其实很简单。

removeWaiter() 方法

removeWaiter() 方法只会被 “被中断唤醒的线程” 执行。该方法用于清理等待队列中 thread 字段为 null 的结点。

代码清单 12 removeWaiter() 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 移除队列中的 node 结点。
* 同时顺便移除队列中 thread 字段为 null 的结点。
*/
private void removeWaiter(WaitNode node) {
if (node != null) {

node.thread = null;
retry:
for (;;) {
/*
* q 表示当前结点。
* pred 表示当前结点的前驱结点。
*/
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
// s 用于保存后继结点。
s = q.next;

// 若当前结点的 thread 字段不为 null,则设置前驱结点为当前结点,然后继续遍历下一结点。
if (q.thread != null)
pred = q;

// 能执行到这里,说明当前结点的 thread 字段为 null,则通过让非空前驱结点指向当前结点的后继结点来移除当前结点。
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}

// 能执行到这里,说明前驱结点和当前结点都为 null。
else if (!WAITERS.compareAndSet(this, q, s))
continue retry;
}
break;
}
}
}

代码清单 12 removeWaiter() 方法


FutureTask 源码解析
http://example.com/2024/12/14/FutureTask-源码解析/
作者
Komorebi
发布于
2024年12月14日
许可协议