Java 是如何实现 Future 模式的

一. Future是什么?

1. Future是什么?

JDK 的 Future 就类似于我们网购买东西的订单号,当我们执行某一耗时的任务时,我们可以另起一个线程异步去执行这个耗时的任务,同时我们可以干点其他事情。当事情干完后我们再根据 future 这个"订单号"去提取耗时任务的执行结果即可。因此 Future 也是多线程中的一种应用模式。

扩展: 说起多线程,那么 Future 又与 Thread 有什么区别呢?最重要的区别就是 Thread 是没有返回结果的,而 Future 模式是有返回结果的。

2. 如何使用Future

前面搞明白了什么是Future,下面我们再来举个简单的例子看看如何使用Future。

假如现在我们要打火锅,首先我们要准备两样东西:把水烧开和准备食材。因为烧开水是一个比较漫长的过程(相当于耗时的业务逻辑),因此我们可以一边烧开水(相当于另起一个线程),一边准备火锅食材(主线程),等两者都准备好了我们就可以开始打火锅了。 /

public class DaHuoGuo {
    public static void main(String[] args) throws Exception {
        FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                println(Thread.currentThread().getName() + ":" + "开始烧开水...");
                // 模拟烧开水耗时
                Thread.sleep(2000);
                println(Thread.currentThread().getName() + ":"  + "开水已经烧好了...");
                return "开水";
            }
        });

        Thread thread = new Thread(futureTask);
        thread.start();

        // do other thing
        println(Thread.currentThread().getName() + ":"  + " 此时开启了一个线程执行future的逻辑(烧开水),此时我们可以干点别的事情(比如准备火锅食材)...");
        // 模拟准备火锅食材耗时
        Thread.sleep(3000);
        println(Thread.currentThread().getName() + ":"  + "火锅食材准备好了");
        String shicai = "火锅食材";

        // 开水已经稍好,我们取得烧好的开水
        String boilWater = futureTask.get();

        println(Thread.currentThread().getName() + ":"  + boilWater + "和" + shicai + "已经准备好,我们可以开始打火锅啦");
    }

    public static void println(String content){
        SimpleDateFormat sdf = new SimpleDateFormat();// 格式化时间 
        sdf.applyPattern("HH:mm:ss");// a为am/pm的标记
        Date date = new Date();// 获取当前时间 
        System.out.println("["+sdf.format(date)+"] "+content);
    }
}

// [14:46:51] main: 此时开启了一个线程执行future的逻辑(烧开水),此时我们可以干点别的事情(比如准备火锅食材)...
// [14:46:51] Thread-0:开始烧开水...
// [14:46:53] Thread-0:开水已经烧好了...
// [14:46:54] main:火锅食材准备好了
// [14:46:54] main:开水和火锅食材已经准备好,我们可以开始打火锅啦

从以上代码中可以看到,我们使用Future主要有以下步骤:

  1. 新建一个 Callable 匿名函数实现类对象,我们的业务逻辑在 Callablecall 方法中实现,其中 Callable 的泛型是返回结果类型;
  2. 然后把 Callable 匿名函数对象作为 FutureTask 的构造参数传入,构建一个 futureTask 对象;
  3. 然后再把 futureTask 对象作为 Thread 构造参数传入并开启这个线程执行去执行业务逻辑;
  4. 最后我们调用 futureTask 对象的 get 方法得到业务逻辑执行结果。

可以看到跟 Future 使用有关的 JDK 类主要有 FutureTaskCallable 两个,下面主要对 FutureTask 进行源码分析。

扩展:还有一种使用 Future 的方式是将 Callable 实现类提交给线程池执行的方式,这里不再介绍,自行百度即可。

二. FutureTask源码分析

1. FutureTask的成员变量和成员方法

  • 1.1 我们先来看下FutureTask的类结构:
    futuretask.png

可以看到 FutureTask 实现了 RunnableFuture 接口,而RunnableFuture接口又继承了 FutureRunnable 接口。因为FutureTask间接实现了Runnable接口,因此可以作为任务被线程Thread执行;此外,最重要的一点就是FutureTask还间接实现了Future接口,因此还可以获得任务执行的结果。

  • 1.2 成员变量
    我们首先来看下FutureTask的成员变量有哪些,理解这些成员变量对后面的源码分析非常重要。
    /** 封装的Callable对象,其call方法用来执行异步任务 */
    private Callable<V> callable;
    /** 用来装异步任务的执行结果 */
    private Object outcome;
    /** 执行callable任务的线程 */
    private volatile Thread runner;
    /** 线程等待节点,reiber stack的一种实现 */
    private volatile WaitNode waiters;
    /** 任务执行状态 */
    private volatile int state;
    
    private static final sun.misc.Unsafe UNSAFE;
    // 使用 Unsafe 执行 cas 修改成员变量时, 用到的字段偏移量
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
    
    // 静态块
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
    

2. FutureTask的状态变化

前面讲了FutureTask的成员变量,有一个表示状态的成员变量state我们要重点关注下,state变量表示任务执行的状态。

private volatile int state;
/** 任务新建状态 */
private static final int NEW          = 0;
/** 任务正在完成状态,是一个瞬间过渡状态 */
private static final int COMPLETING   = 1;
/** 任务正常结束状态 */
private static final int NORMAL       = 2;
/** 任务执行异常状态 */
private static final int EXCEPTIONAL  = 3;
/** 任务被取消状态,对应cancel(false) */
private static final int CANCELLED    = 4;
/** 任务中断状态,是一个瞬间过渡状态 */
private static final int INTERRUPTING = 5;
/** 任务被中断状态,对应cancel(true) */
private static final int INTERRUPTED  = 6;

可以看到任务状态变量state有以上7种状态,0-6分别对应着每一种状态。任务状态一开始是NEW,然后由FutureTask的三个方法set,setExceptioncancel来设置状态的变化,其中状态变化有以下四种情况:

  • NEW -> COMPLETING -> NORMAL:
    这个状态变化表示异步任务的正常结束,其中COMPLETING是一个瞬间临时的过渡状态,由set方法设置状态的变化;
  • NEW -> COMPLETING -> EXCEPTIONAL:
    这个状态变化表示异步任务执行过程中抛出异常,由setException方法设置状态的变化;
  • NEW -> CANCELLED:
    这个状态变化表示被取消,即调用了cancel(false),由cancel方法来设置状态变化;
  • NEW -> INTERRUPTING -> INTERRUPTED:
    这个状态变化表示被中断,即调用了cancel(true),由cancel方法来设置状态变化。

3. run() 方法

public void run() {
        // 为了确保只有1个线程在执行futureTask, 需要确保两个提交同时满足, 否则直接从run()方法返回
        //  (1) futureTask 的状态是 new
        //  (2) futureTask 此时的执行线程为 null, 即还没有线程执行该 futureTask
        // 什么样的调用方式会让多个线程执行痛经一个 futureTask 呢? 
        //  答: 实例化了一个 futureTask 对象, 然后调用了多次 new Thread(futureTask).start()
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            // 代码执行到这里, 已经确保只有1个线程可以执行 futureTask, 
            // 所以直接在当前线程中调用 callable.call() 即可; 调用中: 
            //  (1) 如果发生异常: 更新状态为 EXCEPTIONAL , 通过方法 setException()?
            //  (2) 如果没有发生异常, 更新状态为 NORMAL, 通过方法 set()
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // 代码执行到这里, 还是已经确保了只有1个线程可以执行 futureTask
            // 无论当前线程执行是否抛出异常, 执行后都应该把 futureTask 的 runner 属性置 null
            // 表示当前线程已执行完毕
            runner = null;
            // 后面3行是在处理执行过程中被 interrupt 的情况, 因为 run() 方法并不能实时响应中断,
            // 只是通过代码逻辑检测中断(参考while(!Thread.currentThread.isInterrupted())循环),
            // 因此, 在代码执行后响应中断, s >= INTERRUPTING 的情形, 处理方法为: 
            //  private void handlePossibleCancellationInterrupt(int s) {
            //      if (s == INTERRUPTING)
            //          while (state == INTERRUPTING)
            //              Thread.yield();
            //  }
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield();
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

这里值得注意的是判断线程满不满足执行异步任务条件时, runner 是否为 null 是调用 UNSAFE 的 CAS 方法 compareAndSwapObject 来判断和设置的,同时 compareAndSwapObject 是通过成员变量 runner 的偏移地址 runnerOffset 来给 runner 赋值的,此外,成员变量 runner 被修饰为 volatile 是在多线程的情况下, 一个线程的 volatile 修饰变量的设值能够立即刷进主存,因此值便可被其他线程可见。

4. FutureTask的状态更改方法: set()setException()

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 将run()最后的执行结果保存到 outcome 成员
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state NORMAL
        finishCompletion();
    }
}

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 将run()最后的执行结果保存到 outcome 成员
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state EXCEPTIONAL
        finishCompletion();
    }
}

5. FutureTask的唤醒等待线程方法

因为 set(V v)setException(Throwable t) 方法最后都调用了 finishCompletion() , 就是表示异步任务不管正常还是异常结束, 都要执行一部分统一的操作, 这些操作主要是来唤醒所有因为 "调用 get() 方法时因异步任务还未执行完而阻塞" 的线程. 这些阻塞线程会被包装成 WaitNode 类形成栈存储. 因此唤醒(移除)的顺序是"后进先出"即后面先来的线程先被先唤醒(移除),关于这个线程等待链表是如何成链的,后面再继续分析。

private void finishCompletion() {
    // waiters 是 FutureTask 的成员变量, 每个因调用 get() 而阻塞的线程, 都会被
    // 包装为 WaitNode 对象(定义见下方), 所有的阻塞线程会组成一个链表存储. 首先看到的这个外层
    // for 循环其实是一种 "彻底清空所有WaitNode" 的保证, 真正遍历链表进行唤醒的是
    // 内部的 for (;;) 循环; 需要这个保证是因为动作的起点是: 
    //     if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) 
    // 判断, 这个判断只能确保当时没有新的线程因get()被加入等待队列, 所以需要加上外层的for检测
    for (WaitNode q; (q = waiters) != null;) {
        // 判断没有新线程加入get()的等待队列
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            // 如下所有代码都是普通的遍历链表, 执行唤醒 WaitNode 内部线程的操作
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    // 无意义, done()的方法体1.8版本中为空 {}
    done();  
    // 因为异步任务已经执行完且结果已经保存到outcome中,因此此时可以将callable对象置空了
    callable = null;
}

[注]: WaitNode 定义:

static final class WaitNode {
    volatile Thread thread;   // 包装线程
    // 成链表的标志 (实际为栈, 用栈顶元素执行cas判断, 确定是否有新线程加入get()等待队列)
    volatile WaitNode next;   
    WaitNode() { thread = Thread.currentThread(); }
}

6. FutureTask.get方法,获取任务执行结果

前面我们起一个线程在其run方法中执行异步任务后,此时我们可以调用FutureTask.get方法来获取异步任务执行的结果。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // (1) 如果任务状态state<=COMPLETING,说明异步任务正在执行过程中,
    //     此时会调用awaitDone方法阻塞等待
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    // (2) 代码执行到这里, 说明等待的线程已被唤醒, 任务执行完毕: 
    //     任务可能执行成功也可能执行失败, report() 会根据执行的状态
    //     选择正常返回还是抛异常. 定义详见下面
    return report(s);
}
  • 6.1 awaitDone( )方法
// 2个参数出现的原因是: 有的线程指调用 get() 只想等待有限时间
// 等到任务结束返回的普通 get(), timed 参数为false   
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // 计算最大等待的时间点. 不限制等待时长的时间点取0
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        // 还未入栈
        boolean queued = false;
        for (;;) {
            // (1) 等待线程被执行中断, 抛异常退出
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            // 任务执行状态
            int s = state;
            // (2) s > COMPLETING 表示任务执行完毕, 返回最终状态退出. 
            //     任务可能正常结束(NORMAL),可能抛出异常(EXCEPTIONAL) ,
            //     或任务被取消(CANCELLED,INTERRUPTING或INTERRUPTED状态的一种)
            if (s > COMPLETING) {
                // 【问】run()方法在任务结束时,也会调用finishCompletion(), 诸个将等待栈中的
                //      WaitNode节点的thread置空,这里为什么又要再调用一次 q.thread = null 清空呢?
                // 【答】因为若很多线程来获取任务执行结果,在任务执行完的那一刻,此时获取任务的线程
                //      要么已经在线程等待链表中; 要么此时还是一个孤立的WaitNode节点。
                //      (1)在线程等待链表中的的所有WaitNode节点将由finishCompletion来移除(同时唤醒)所有
                //         等待的WaitNode节点,以便垃圾回收;
                //      (2)而孤立的线程WaitNode节点此时还未阻塞,因此不需要被唤醒,此时只要把其属性置为
                //         null,然后其有没有被谁引用,因此可以被GC。
                if (q != null)
                    q.thread = null;
                return s;
            }
            // 任务还在执行中, 继续等待
            else if (s == COMPLETING) 
                Thread.yield();
            // 如果节点还未构造, 构造节点
            else if (q == null)
                q = new WaitNode();
            // 将构造的节点加入该线程等待栈的头部
            // [问]: 为什么节点加入栈的动作要写在循环内呢? 
            // [答]: 这是多线程下cas节点入栈的标准写法. 因为入栈动作可能失败, 所以写在
            //       死循环内持续入栈; 这也是循环内判断 else if (q == null) 分支的
            //       原因: 这个分支是保证节点只构造一次, 但入栈动作可执行无数次知道成功
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // 处理get()线程限时等待的情况: 
            else if (timed) {
                nanos = deadline - System.nanoTime();
                // 等待已超时
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                // 等待未超时, 继续等待预期时间
                LockSupport.parkNanos(this, nanos);
            }
            // 处理不限时get()的情况: 
            else
                // 线程进入阻塞等待状态
                LockSupport.park(this);
        }
    }

总的来说, 将本来可以写在一起的代码逻辑, 比如构造节点后入栈, 然后将节点中的线程阻塞这3个先后动作, 拉平成同一等级的分支写在死循环里的做法, 是一种兼顾 cas 操作失败的写法. 即保证无限次 cas 尝试, 又保证无需 cas 的连贯动作可以在下一次 for 循环中like执行.

  • 6.2 report( )方法
private V report(int s) throws ExecutionException {
    // 执行结果
    Object x = outcome;
    // (1) 正常返回
    if (s == NORMAL)
        return (V)x;
    // (2) 因取消任务而抛异常退出
    if (s >= CANCELLED)
        throw new CancellationException();
    // (3) 任务失败退出
    throw new ExecutionException((Throwable)x);
}

7. FutureTask.cancel方法,取消执行任务

下面可以看到, 只有当执行cancel动作时, 还没有线程执行任务时才能执行取消

public boolean cancel(boolean mayInterruptIfRunning) {
    // 状态 != NEW, 则已有现成在执行任务, 不能取消
    // cas修改状态时发现状态不是 NEW 了, 说明有新线程执行任务了, 也不能取消任务
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    
        // 如果润徐中断的话, 对线程中断
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        // 最终唤醒阻塞栈中的等待线程
        finishCompletion();
    }
    return true;
}

三. 总结

总的来说, 最简单的实现 future 模式, 只要:

  • (1) 声明一个 volatile 的标记变量, 标记任务是否执行完毕
  • (2) 未执行完毕时, 调用 get() 的线程执行 flag.wait() 即可. 利用了jvm内部的条件等待队列
  • (3) 用线程执行run()方法

反观 javaSE 的实现, 有几方面扩展:

  • (1) 标记变量不止是 true/false ,取而代之的是一系列状态: new, completing, NORMAL, EXCEPTIONAL等. 这主要是为了配合run(),get(),cancel()在多线程下的逻辑
  • (2) get()线程阻塞的问题上, javaSE没有使用synchronized的条件等待队列, 而是用 cas 操作等待栈的方法. 当新线程执行get()阻塞时, 其它线程感值到新线程是通过 cas 查看栈顶节点是否发生变化得来的
  • (3) 对于run()方法上, 通过javaSE的实现通过设置成员变量 volatile Thread runner , 来限制同一时刻最多只有一个线程执行run()方法
  • (4) 除此之外, javaSE版本还实现了 cancel 等方法
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,009评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,808评论 2 378
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,891评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,283评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,285评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,409评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,809评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,487评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,680评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,499评论 2 318
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,548评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,268评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,815评论 3 304
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,872评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,102评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,683评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,253评论 2 341