原创文章,转载请注明出处。
java.util.concurrent.Future是一个表示异步计算结果的接口,经典的实现子类有:
- JDK自己的实现类:java.util.concurrent.FutureTask
- Netty扩展的实现类:io.netty.util.concurrent.DefaultPromise
我们对比看一下两者的区别,更好的理解Future的实现。
1. java.util.concurrent.FutureTask
-
状态说明
/** * 初始化时为NEW,只有在set,setException,cancel三个方法中才会有最终的状态改变。 * 可能的状态转换: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ //使用UNSAFE.putOrderedInt来set值,还要保证可见性,这两个原因所以为volatile。一共有7个状态值。 private volatile int state; //初始化状态,新建 private static final int NEW = 0; //返回结果或者异常后,set,setException方法使用的瞬时状态, //表示已经结束了,正在set结果值,还没有set完结果值 private static final int COMPLETING = 1; //正常结束 private static final int NORMAL = 2; //异常结束 private static final int EXCEPTIONAL = 3; //已取消。state必须是NEW,才可以取消,如果取消成功,状态改为INTERRUPTING或者CANCELLED。 private static final int CANCELLED = 4; //已取消-线程中断中。调用了cancel(true),才可能有这个状态值。 private static final int INTERRUPTING = 5; //已取消-线程已中断。调用了cancel(true),才可能有这个状态值。 private static final int INTERRUPTED = 6; //从结果返回来看,即future.get(),分成三种情况 //1.NORMAL正常结束,返回outcome //2.EXCEPTIONAL异常结束,抛new ExecutionException(outcome)异常 //3.CANCELLED,INTERRUPTING,INTERRUPTED已取消,抛new CancellationException()异常
-
执行逻辑
public void run() { //可能出现过取消等情况 if (state != NEW || //保证并发调用时,只有一个线程获得执行权 !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) //这里有意思,可以先看下cancel、set、setException方法的内部逻辑 //很有可能在run的执行过程中,调其他线程调用了cancel(true)方法, //将state由NEW改为INTERRUPTING,这样set和setException内部都不会执行 //此时cancel方法中t.interrupt();还没有执行,如果这时就返回了, //那么很有可能在这个线程执行其他代码时,t.interrupt()正好被执行了,导致线程中断。 //所以这里一定要处理state为INTERRUPTING的情况,保证结束以后不会再调用t.interrupt()。 handlePossibleCancellationInterrupt(s); } } /** * Ensures that any interrupt from a possible cancel(true) is only * delivered to a task while in run or runAndReset. */ private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) //逻辑很简单,就是一直等着state状态改变,可能改为INTERRUPTED。 //让出执行权 Thread.yield(); // wait out pending interrupt // assert state == INTERRUPTED; // We want to clear any interrupt we may have received from // cancel(true). However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); }
2. io.netty.util.concurrent.DefaultPromise
- 状态说明
```java
//先说几个变量
//表示异步计算正常结束,但是返回结果类型为void,统一使用该对象表示这种情况。
private static final Object SUCCESS = new Object();
//uncancellable:无法取消,表示该future被设置为不可取消的状态。
private static final Object UNCANCELLABLE = new Object();
//包装调用cancel取消异常。包装类、方法名、异常堆栈信息
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
new CancellationException(), DefaultPromise.class, "cancel(...)"));
private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace();
/**
* 异步计算的结果,可以类比JUC.FutureTask中的outcome
* 不像FutureTask,这个Promise类型没有设置专门的状态变量,而是使用result的值来判断当前promise处于什么状态。
* 一共有6种状态:
* 1. null:初始化时为null,以及处于计算中状态时为null。分别对应FutureTask的NEW 和 COMPLETING。
* 2. UNCANCELLABLE:表示该future不可以被取消。netty扩展的一个状态,FutureTask没有。
* 3. CANCELLATION_CAUSE_HOLDER:CauseHolder类型静态常量,封装了取消成功时,即调用cancel返回true时的取消异常。此对象的属性字段Throwable cause的值一定是CancellationException类型。对应FutureTask的CANCELLED,INTERRUPTING,INTERRUPTED。
* 4. CauseHolder类型对象:异常结束时,set的异常包装对象,此对象的属性字段Throwable cause的值不会为CancellationException类型。对应FutureTask的EXCEPTIONAL。
* 5. SUCCESS:表示计算正常结束,但是因为返回类型为void,又不能使用null,此时统一使用该值表示这种情况。对应FutureTask的NORMAL。
* 6. 其他对象:表示计算正常结束,返回类型不为void时的结果对象。对应FutureTask的NORMAL。
*/
private volatile Object result;
```
- 执行逻辑
```java
//异步转同步,阻塞等待异步计算结果,类似于future.get()
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
public V get() throws InterruptedException, ExecutionException {
Object result = this.result;
if (!isDone0(result)) {
//阻塞等待
await();
result = this.result;
}
if (result == SUCCESS || result == UNCANCELLABLE) {
return null;
}
Throwable cause = cause0(result);
if (cause == null) {
//只有是null 或者 成功结果对象时返回null
return (V) result;
}
if (cause instanceof CancellationException) {
//取消异常
throw (CancellationException) cause;
}
//计算异常
throw new ExecutionException(cause);
}
```