Future 表示了一个任务的生命周期,是一个可取消的异步运算,可以把它看作是一个异步操作的结果的占位符,它将在未来的某个时刻完成,并提供对其结果的访问。在并发包中许多异步任务类都继承自Future,其中最典型的就是 FutureTask,本章我们将通过对 FutureTask 的源码解析来看一下异步任务的实现方式。
概述
FutureTask 为 Future 提供了基础实现,如获取任务执行结果(get)和取消任务(cancel)等。如果任务尚未完成,获取任务执行结果时将会阻塞。一旦执行结束,任务就不能被重启或取消(除非使用
runAndReset
执行计算)。FutureTask 常用来封装 Callable 和 Runnable,也可以作为一个任务提交到线程池中执行。除了作为一个独立的类之外,此类也提供了一些功能性函数供我们创建自定义 task 类使用。FutureTask 的线程安全由CAS来保证。
FutureTask 内部维护了一个由volatile
修饰的int
型变量—state
,代表当前任务的运行状态,state
有七种状态:
- NEW:新建
- COMPLETING:完成
- NORMAL:正常运行
- EXCEPTIONAL:异常退出
- CANCELLED:任务取消
- INTERRUPTING:线程中断中
- INTERRUPTED:线程已中断
在这七种状态中,有四种任务终止状态:NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED。各种状态的转化如下:
数据结构及核心参数
//内部持有的callable任务,运行完毕后置空
private Callable<V> callable;
//从get()中返回的结果或抛出的异常
private Object outcome; // non-volatile, protected by state reads/writes
//运行callable的线程
private volatile Thread runner;
//使用Treiber栈保存等待线程
private volatile WaitNode waiters;
FutureTask 继承了Runnale
和Future
,本身也作为一个线程运行。并且维护了一个内部类WaitNode
,使用简单的Treiber栈(无锁并发栈)实现,用于存储等待线程。
源码解析
AbstractExecutorService.submit(Callable<T> task)
FutureTask 内部的实现方法都很简单,这里我们先从线程池的submit
方法开始分析,一步一步分析FutureTask
。submit
方法默认实现在AbstractExecutorService
中,几种实现源码如下:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
说明:方法比较简单,首先调用newTaskFor
方法构造FutureTask
,然后调用execute
把任务放进线程池中,返回FutureTask
。
FutureTask.run()
public void run() {
//新建任务,CAS替换runner为当前线程
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);//设置执行结果
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);//处理中断逻辑
}
}
说明:
- 运行任务,如果任务状态为
NEW
状态,则利用CAS
修改为当前线程。执行完毕调用set(result)
方法设置执行结果。set(result)
源码如下:
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();//执行完毕,唤醒等待线程
}
}
- 首先利用
cas
修改state
状态为COMPLETING
,设置返回结果,然后使用 lazySet(UNSAFE.putOrderedInt
)的方式设置state
状态为NORMAL
。结果设置完毕后,调用finishCompletion()
方法唤醒等待线程,源码如下:
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//移除等待线程
for (;;) {//自旋遍历等待线程
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);//唤醒等待线程
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//任务完成后调用函数,自定义扩展
done();
callable = null; // to reduce footprint
}
- 回到
run
方法,如果在 run 期间被中断,此时需要调用handlePossibleCancellationInterrupt
方法来处理中断逻辑,确保任何中断(例如cancel(true)
)只停留在当前run
或runAndReset
的任务中,源码如下:
private void handlePossibleCancellationInterrupt(int s) {
//在中断者中断线程之前可能会延迟,所以我们只需要让出CPU时间片自旋等待
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}
FutureTask.runAndReset()
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
说明:runAndReset
是 FutureTas k的另外一个任务运行的方法,它不会返回执行结果,而且在任务执行完之后会重置stat
的状态为NEW
,使任务可以多次执行。runAndReset
的典型应用是在 ScheduledThreadPoolExecutor 中,周期性的执行任务。
FutureTask.get()
//获取执行结果
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
说明:FutureTask 通过get()
方法获取任务执行结果。如果任务处于未完成的状态(state <= COMPLETING
),就调用awaitDone
方法(后面单独讲解)等待任务完成。任务完成后,通过report
方法获取执行结果或抛出执行期间的异常。report
源码如下:
//返回执行结果或抛出异常
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
awaitDone(boolean timed, long nanos)
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {//自旋
if (Thread.interrupted()) {//获取并清除中断状态
removeWaiter(q);//移除等待WaitNode
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;//置空等待节点的线程
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
//CAS修改waiter
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);//超时,移除等待节点
return state;
}
LockSupport.parkNanos(this, nanos);//阻塞当前线程
}
else
LockSupport.park(this);//阻塞当前线程
}
}
说明:awaitDone
用于等待任务完成,或任务因为中断或超时而终止。返回任务的完成状态。函数执行逻辑如下:
- 如果线程被中断,首先清除中断状态,调用
removeWaiter
移除等待节点,然后抛出InterruptedException
。removeWaiter
源码如下:
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;//首先置空线程
retry:
for (;;) { // restart on removeWaiter race
//依次遍历查找
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s)) //cas替换
continue retry;
}
break;
}
}
}
- 如果当前状态为结束状态(
state>COMPLETING
),则根据需要置空等待节点的线程,并返回 Future 状态; - 如果当前状态为正在完成(
COMPLETING
),说明此时 Future 还不能做出超时动作,为任务让出CPU执行时间片; - 如果
state
为NEW
,先新建一个WaitNode
,然后CAS
修改当前waiters
; - 如果等待超时,则调用
removeWaiter
移除等待节点,返回任务状态;如果设置了超时时间但是尚未超时,则park
阻塞当前线程; - 其他情况直接阻塞当前线程。
FutureTask.cancel(boolean mayInterruptIfRunning)
public boolean cancel(boolean mayInterruptIfRunning) {
//如果当前Future状态为NEW,根据参数修改Future状态为INTERRUPTING或CANCELLED
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {//可以在运行时中断
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();//移除并唤醒所有等待线程
}
return true;
}
说明:尝试取消任务。如果任务已经完成或已经被取消,此操作会失败。
如果当前Future状态为NEW
,根据参数修改Future状态为INTERRUPTING
或CANCELLED
。
如果当前状态不为NEW
,则根据参数mayInterruptIfRunning
决定是否在任务运行中也可以中断。中断操作完成后,调用finishCompletion
移除并唤醒所有等待线程。
小结
本章重点:FutureTask 结果返回机制,以及内部运行状态的转变。