并发系列之 Future 框架详解

本文将主要讲解 J.U.C 中的 Future 框架,并分析结合源码分析其内部结构逻辑;

一、Future 框架概述

JDK 中的 Future 框架实际就是 Future 模式的实现,通常情况下我们会配合线程池使用,但也可以单独使用;下面我们就单独使用简单举例;

1. 应用实例

FutureTask future =newFutureTask<>(() -> {  log.info("异步任务执行...");  Thread.sleep(2000);  log.info("过了很久很久...");return"异步任务完成";});log.info("启动异步任务...");newThread(future).start();log.info("继续其他任务...");Thread.sleep(1000);log.info("获取异步任务结果:{}", future.get());

打印:

[15:38:03,231INFO ] [main]    - 启动异步任务...

[15:38:03,231INFO ] [main]    - 继续其他任务...

[15:38:03,231INFO ] [Thread-0] - 异步任务执行...

[15:38:05,232INFO ] [Thread-0] - 过了很久很久...

[15:38:05,236INFO ] [main]    - 获取异步任务结果:异步任务完成

如上面代码所示,首先我们将要执行的任务包装成 Callable,这里如果不需要返回值也可以使用 Runnable;然后构建 FutureTask 由一个线程启动,最后使用 Future.get() 获取异步任务结果;

2. Future 运行逻辑

对于 Future 模式的流程图如下:

对比上面的实例代码,大家可能会发现有些不一样,因为在 FutureTask 同时继承了 Runnable 和 Future 接口,所以再提交任务后没有返回Future,而是直接使用自身调用 get;下面我们就对源码进行实际分析;

二、源码分析

1. FutureTask 主体结构

publicinterface RunnableFuture<V> extends Runnable, Future<V> {}publicclass FutureTask<V> implements RunnableFuture<V> {privatevolatileintstate;// 任务运行状态privateCallable callable;// 异步任务privateObject outcome;// 返回结果privatevolatileThread runner;// 异步任务执行线程privatevolatileWaitNode waiters;// 等待异步结果的线程栈(通过Treiber stack算法实现)public FutureTask(Callable<V> callable) {// 需要返回值if(callable ==null)thrownewNullPointerException();this.callable = callable;this.state = NEW;// ensure visibility of callable}public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;// ensure visibility of callable}  ...}

另外在代码中还可以看见有很多地方都是用了 CAS 来更新变量,而 JDK1.6 中甚至使用了 AQS 来实现;其原因就是同一个 FutureTask 可以多个线程同时提交,也可以多个线程同时获取; 所以代码中有很多的状态变量:

// FutureTask.state 取值privatestaticfinalintNEW          =0;// 初始化到结果返回前privatestaticfinalintCOMPLETING  =1;// 结果赋值privatestaticfinalintNORMAL      =2;// 执行完毕privatestaticfinalintEXCEPTIONAL  =3;// 执行异常privatestaticfinalintCANCELLED    =4;// 任务取消privatestaticfinalintINTERRUPTING =5;// 设置中断状态privatestaticfinalintINTERRUPTED  =6;// 任务中断

同时源码的注释中也详细给出了可能出现的状态转换:

NEW -> COMPLETING -> NORMAL // 任务正常执行

NEW -> COMPLETING -> EXCEPTION // 任务执行异常

NEW ->CANCELLED // 任务取消

NEW -> INITERRUPTING -> INTERRUPTED // 任务中断

注意这里的 COMPLETING 状态是一个很微妙的状态,正因为有他的存在才能实现无锁赋值;大家先留意这个状态,然后在代码中应该能体会到;另外这里还有一个变量需要注意,WaitNode ;使用 Treiber stack 算法实现的无锁栈;其原理说明可以参考下面第三节;

2. 任务执行

public void run() {if(state != NEW ||// 确保任务执行完成后,不再重复执行!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))// 确保只有一个线程执行return;try{    Callable c = callable;if(c !=null&& state == NEW) {      V result;booleanran;try{        result = c.call();        ran =true;      }catch(Throwable ex) {        result =null;        ran =false;        setException(ex);// 设置异常结果}if(ran) set(result);// 设置结果}  }finally{    runner =null;ints = state;if(s >= INTERRUPTING) handlePossibleCancellationInterrupt(s);// 确保中断状态已经设置}}

// 设置异步任务结果protected void set(V v) {if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 保证结果只能设置一次outcome = v;    UNSAFE.putOrderedInt(this, stateOffset, NORMAL);// final statefinishCompletion();// 唤醒等待线程}}

protected void setException(Throwable t) {if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 保证结果只能设置一次outcome = t;    UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);// final statefinishCompletion();  }}

3. 任务取消

public boolean cancel(boolean mayInterruptIfRunning) {if(!(state == NEW &&// 只有在任务执行阶段才能取消UNSAFE.compareAndSwapInt(this, stateOffset, NEW,// 设置取消状态mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))returnfalse;try{// in case call to interrupt throws exceptionif(mayInterruptIfRunning) {try{        Thread t = runner;if(t !=null)          t.interrupt();      }finally{// final stateUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);      }    }  }finally{    finishCompletion();  }returntrue;}

注意 cancel(false) 也就是仅取消,并没有打断;异步任务会继续执行,只是这里首先设置了 FutureTask.state = CANCELLED ,所以最后在设置结果的时候会失败,UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING) ;

4. 获取结果

public V get() throws InterruptedException, ExecutionException {ints = state;if(s <= COMPLETING)    s = awaitDone(false,0L);// 阻塞等待returnreport(s);}private V report(int s) throws ExecutionException {// 根据最后的状态返回结果Object x = outcome;if(s == NORMAL)return(V)x;if(s >= CANCELLED)thrownewCancellationException();thrownewExecutionException((Throwable)x);}

private int awaitDone(boolean timed, long nanos)  throws InterruptedException {finallongdeadline = timed ? System.nanoTime() + nanos :0L;  WaitNode q =null;booleanqueued =false;for(;;) {if(Thread.interrupted()) {      removeWaiter(q);// 移除等待节点thrownewInterruptedException();    }ints = state;if(s > COMPLETING) {// 任务已完成if(q !=null)        q.thread =null;returns;    }elseif(s == COMPLETING)// 正在赋值,直接先出让线程Thread.yield();elseif(q ==null)// 任务还未完成需要等待q =newWaitNode();elseif(!queued)      queued = UNSAFE.compareAndSwapObject(this, waitersOffset,                        q.next = waiters, q);// 使用 Treiber stack 算法elseif(timed) {      nanos = deadline - System.nanoTime();if(nanos <=0L) {        removeWaiter(q);returnstate;      }      LockSupport.parkNanos(this, nanos);    }elseLockSupport.park(this);  }}

三、Treiber stack

在《Java 并发编程实战》中讲了, 创建非阻塞算法的关键在于,找出如何将原子修改的范围缩小到单个变量上,同时还要维护数据的一致性 。

@ThreadSafepublicclass ConcurrentStack <E> {  AtomicReference> top =newAtomicReference<>();privatestaticclass Node <E> {publicfinalE item;publicNode next;public Node(E item) {this.item = item;    }  }public void push(E item) {    Node newHead =newNode<>(item);    Node oldHead;do{      oldHead = top.get();      newHead.next = oldHead;    }while(!top.compareAndSet(oldHead, newHead));  }public E pop() {    Node oldHead;    Node newHead;do{      oldHead = top.get();if(oldHead ==null)returnnull;      newHead = oldHead.next;    }while(!top.compareAndSet(oldHead, newHead));returnoldHead.item;  }}

总结

总体来讲源码比较简单,因为其本身只是一个 Future 模式的实现

但是其中的状态量的设置,还有里面很多无锁的处理方式,才是 FutureTask 带给我们的精华!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容