并发编程之Future&FutureTask深入解析

点赞再看,养成习惯,搜一搜【一角钱技术】关注更多原创技术文章。本文 GitHub org_hejianhui/JavaStudy 已收录,有我的系列文章。

Java线程实现方式主要有四种:

  • 继承Thread类
  • 实现Runnable接口
  • 实现Callable接口通过FutureTask包装器来创建Thread线程
  • 使用ExecutorService、Callable、Future实现有返回结果的多线程。

其中前两种方式线程执行完后都没有返回值,后两种是带返回值的。

Callable 和 Runnable 接口

Runnable接口

// 实现Runnable接口的类将被Thread执行,表示一个基本的任务
public interface Runnable {
    // run方法就是它所有的内容,就是实际执行的任务
    public abstract void run();
}

没有返回值

run 方法没有返回值,虽然有一些别的方法也能实现返回值得效果,比如编写日志文件或者修改共享变量等等,但是不仅容易出错,效率也不高。

不能抛出异常

public class RunThrowExceptionDemo {

    /**
     * 普通方法可以在方法签名中抛出异常
     *
     * @throws IOException
     */
    public void normalMethod() throws IOException {
        throw new IOException();
    }

    class RunnableImpl implements Runnable {

        /**
         * run 方法内无法抛出 checked Exception,除非使用 try catch 进行处理
         */
        @Override
        public void run() {
            try {
                throw new IOException();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

可以看到普通方法 normalMethod 可以在方法签名上抛出异常,这样上层接口就可以捕获这个异常进行处理,但是实现 Runnable 接口的类,run 方法无法抛出 checked Exception,只能在方法内使用 try catch 进行处理,这样上层就无法得知线程中的异常。

设计导致

其实这两个缺陷主要原因就在于 Runnable 接口设计的 run 方法,这个方法已经规定了 run() 方法的返回类型是 void,而且这个方法没有声明抛出任何异常。所以,当实现并重写这个方法时,我们既不能改返回值类型,也不能更改对于异常抛出的描述,因为在实现方法的时候,语法规定是不允许对这些内容进行修改的。

Runnable 为什么设计成这样?

假设 run() 方法可以返回返回值,或者可以抛出异常,也无济于事,因为我们并没有办法在外层捕获并处理,这是因为调用 run() 方法的类(比如 Thread 类和线程池)是 Java 直接提供的,而不是我们编写的。
所以就算它能有一个返回值,我们也很难把这个返回值利用到,而 Callable 接口就是为了解决这两个问题。

Callable接口

public interface Callable<V> {
    //返回接口,或者抛出异常
    V call() throws Exception;
}

可以看到 Callable 和 Runnable 接口其实比较相识,都只有一个方法,也就是线程任务执行的方法,区别就是 call 方法有返回值,而且声明了 throws Exception。

Callable 和 Runnable 的不同之处

  • 方法名 :Callable 规定的执行方法是 call(),而 Runnable 规定的执行方法是 run();
  • 返回值 :Callable 的任务执行后有返回值,而 Runnable 的任务执行后是没有返回值的;
  • 抛出异常 :call() 方法可抛出异常,而 run() 方法是不能抛出受检查异常的;

与 Callable 配合的有一个 Future 接口,通过 Future 可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果,这些功能都是 Runnable 做不到的,Callable 的功能要比 Runnable 强大。

Future接口

Future的作用

简单来说就是利用线程达到异步的效果,同时还可以获取子线程的返回值。

比如当做一定运算的时候,运算过程可能比较耗时,有时会去查数据库,或是繁重的计算,比如压缩、加密等,在这种情况下,如果我们一直在原地等待方法返回,显然是不明智的,整体程序的运行效率会大大降低。

我们可以把运算的过程放到子线程去执行,再通过 Future 去控制子线程执行的计算过程,最后获取到计算结果。这样一来就可以把整个程序的运行效率提高,是一种异步的思想。

Future的方法

Future 接口一共有5个方法,源代码如下:


public interface Future<V> {

  /**
   * 尝试取消任务,如果任务已经完成、已取消或其他原因无法取消,则失败。
   * 1、如果任务还没开始执行,则该任务不应该运行
   * 2、如果任务已经开始执行,由参数mayInterruptIfRunning来决定执行该任务的线程是否应该被中断,这只是终止任务的一种尝试。若mayInterruptIfRunning为true,则会立即中断执行任务的线程并返回true,若mayInterruptIfRunning为false,则会返回true且不会中断任务执行线程。
   * 3、调用这个方法后,以后对isDone方法调用都返回true。
   * 4、如果这个方法返回true,以后对isCancelled返回true。
   */
    boolean cancel(boolean mayInterruptIfRunning);

   /**
    * 判断任务是否被取消了,如果调用了cance()则返回true
    */
    boolean isCancelled();

   /**
    * 如果任务完成,则返回ture
    * 任务完成包含正常终止、异常、取消任务。在这些情况下都返回true
    */
    boolean isDone();

   /**
    * 线程阻塞,直到任务完成,返回结果
    * 如果任务被取消,则引发CancellationException
    * 如果当前线程被中断,则引发InterruptedException
    * 当任务在执行的过程中出现异常,则抛出ExecutionException
    */
    V get() throws InterruptedException, ExecutionException;

   /**
    * 线程阻塞一定时间等待任务完成,并返回任务执行结果,如果则超时则抛出TimeoutException
    */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

get方法(获取结果)

get 方法最主要的作用就是获取任务执行的结果,该方法在执行时的行为取决于 Callable 任务的状态,可能会发生以下 7 种情况。

  • 任务已经执行完,执行 get 方法可以立刻返回,获取到任务执行的结果。

  • 任务还没有开始执行,比如我们往线程池中放一个任务,线程池中可能积压了很多任务,还没轮到我去执行的时候,就去 get 了,在这种情况下,相当于任务还没开始,我们去调用 get 的时候,会当前的线程阻塞,直到任务完成再把结果返回回来。

  • 任务正在执行中,但是执行过程比较长,所以我去 get 的时候,它依然在执行的过程中。这种情况调用 get 方法也会阻塞当前线程,直到任务执行完返回结果。

  • 任务执行过程中抛出异常,我们再去调用 get 的时候,就会抛出 ExecutionException 异常,不管我们执行 call 方法时里面抛出的异常类型是什么,在执行 get 方法时所获得的异常都是 ExecutionException。

  • 任务被取消了,如果任务被取消,我们用 get 方法去获取结果时则会抛出 CancellationException。

  • 任务被中断了,如果任务被当前线程中断,我们用 get 方法去获取结果时则会抛出InterruptedException。

  • 任务超时,我们知道 get 方法有一个重载方法,那就是带延迟参数的,调用了这个带延迟参数的 get 方法之后,如果 call 方法在规定时间内正常顺利完成了任务,那么 get 会正常返回;但是如果到达了指定时间依然没有完成任务,get 方法则会抛出 TimeoutException,代表超时了。

参考示例:

package com.niuh.future;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<Integer> future = executorService.submit(new FutureTask());
        try {
            Integer res = future.get(2000, TimeUnit.MILLISECONDS);
            System.out.println("Future线程返回值:" + res);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    static class FutureTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            Thread.sleep(new Random().nextInt(3000));
            return new Random().nextInt(10);
        }
    }
}

isDone方法(判断是否执行完毕)

isDone() 方法,该方法是用来判断当前这个任务是否执行完毕了

package com.niuh.future;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureIsDoneDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<Integer> future = executorService.submit(new FutureTask());
        try {
            for (int i = 0; i < 3; i++) {
                Thread.sleep(1000);
                System.out.println("线程是否完成:" + future.isDone());
            }
            Integer res = future.get(2000, TimeUnit.MILLISECONDS);
            System.out.println("Future 线程返回值:" + res);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    static class FutureTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            Thread.sleep(2000);
            return new Random().nextInt(10);
        }
    }
}

执行结果:

线程是否完成:false
线程是否完成:false
线程是否完成:true
Future 线程返回值:9

可以看到前两次 isDone 方法的返回结果是 false,因为线程任务还没有执行完成,第三次 isDone 方法的返回结果是 ture。

注意:这个方法返回 true 则代表执行完成了,返回 false 则代表还没完成。但返回 true,并不代表这个任务是成功执行的,比如说任务执行到一半抛出了异常。那么在这种情况下,对于这个 isDone 方法而言,它其实也是会返回 true 的,因为对它来说,虽然有异常发生了,但是这个任务在未来也不会再被执行,它确实已经执行完毕了。所以 isDone 方法在返回 true 的时候,不代表这个任务是成功执行的,只代表它执行完毕了。

我们将上面的示例稍作修改再来看下结果,修改 FutureTask 代码如下:

static class FutureTask implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        Thread.sleep(2000);
        throw new Exception("故意抛出异常");
    }
}

执行结果:



虽然抛出了异常,但是 isDone 方法的返回结果依然是 ture。

这段代码说明了:

  • 即便任务抛出异常,isDone 方法依然会返回 true。
  • 虽然抛出的异常是 IllegalArgumentException,但是对于 get 而言,它抛出的异常依然是 ExecutionException。
  • 虽然在任务执行到2秒的时候就抛出了异常,但是真正要等到我们执行 get 的时候,才看到了异常。

cancel方法(取消任务的执行)

如果不想执行某个任务了,则可以使用 cancel 方法,会有以下三种情况:

  • 第一种情况最简单,那就是当任务还没有开始执行时,一旦调用 cancel,这个任务就会被正常取消,未来也不会被执行,那么 cancel 方法返回 true。

  • 第二种情况也比较简单。如果任务已经完成,或者之前已经被取消过了,那么执行 cancel 方法则代表取消失败,返回 false。因为任务无论是已完成还是已经被取消过了,都不能再被取消了。

  • 第三种情况比较特殊,就是这个任务正在执行,这个时候执行 cancel 方法是不会直接取消这个任务的,而是会根据我们传入的参数做判断。cancel 方法是必须传入一个参数,该参数叫作 mayInterruptIfRunning,它是什么含义呢?

    • 如果传入的参数是 true,执行任务的线程就会收到一个中断的信号,正在执行的任务可能会有一些处理中断的逻辑,进而停止,这个比较好理解。
    • 如果传入的是 false 则就代表不中断正在运行的任务,也就是说,本次 cancel 不会有任何效果,同时 cancel 方法会返回 false。

参考示例:

package com.niuh.future;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureCancelDemo {

    static ExecutorService executorService = Executors.newSingleThreadExecutor();

    public static void main(String[] args) {
        // 当任务还没有开始执行
        // demo1();

        // 如果任务已经执行完
        // demo2();

        // 如果任务正在进行中
        demo3();
    }

    private static void demo1() {
        for (int i = 0; i < 1000; i++) {
            executorService.submit(new FutureTask());
        }

        Future<String> future = executorService.submit(new FutureTask());
        try {
            boolean cancel = future.cancel(false);
            System.out.println("Future 任务是否被取消:" + cancel);
            String res = future.get(2000, TimeUnit.MILLISECONDS);
            System.out.println("Future 线程返回值:" + res);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }


    private static void demo2() {
        Future<String> future = executorService.submit(new FutureTask());
        try {
            Thread.sleep(1000);
            boolean cancel = future.cancel(false);
            System.out.println("Future 任务是否被取消:" + cancel);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }

    private static void demo3() {
        Future<String> future = executorService.submit(new FutureInterruptTask());
        try {
            Thread.sleep(1000);
            boolean cancel = future.cancel(true);
            System.out.println("Future 任务是否被取消:" + cancel);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }


    static class FutureTask implements Callable<String> {

        @Override
        public String call() throws Exception {
            return "正常返回";
        }
    }

    static class FutureInterruptTask implements Callable<String> {

        @Override
        public String call() throws Exception {
            while (!Thread.currentThread().isInterrupted()) {
                System.out.println("循环执行");
                Thread.sleep(500);
            }
            System.out.println("线程被中断");
            return "正常返回";
        }
    }
}

这里,我们来分析下第三种情况(任务正在进行中),当我们设置 true 时,线程停止

循环执行
循环执行
Future 任务是否被取消:true

当我们设置 false 时,任务虽然也被取消成功,但是线程依然执行。

循环执行
循环执行
Future 任务是否被取消:true
循环执行
循环执行
循环执行
循环执行
......

那么如何选择传入 true 还是 false 呢?

  • 传入 true 适用的情况是,明确知道这个任务能够处理中断。
  • 传入 false 适用于什么情况呢?
    • 如果我们明确知道这个线程不能处理中断,那应该传入 false。
    • 我们不知道这个任务是否支持取消(是否能响应中断),因为在大多数情况下代码是多人协作的,对于这个任务是否支持中断,我们不一定有十足的把握,那么在这种情况下也应该传入 false。
    • 如果这个任务一旦开始运行,我们就希望它完全的执行完毕。在这种情况下,也应该传入 false。

需要注意的是,虽然示例中写了 !Thread.currentThread().isInterrupted() 方法来判断中断,但是实际上并不是通过我们的代码来进行中断,而是 Future#cancel(true) 内部调用 t.interrupt 方法修改线程的状态之后,Thread.sleep 会抛出 InterruptedException 异常,线程池中会执行异常的相关逻辑,并退出当前任务。 sleep 和 interrupt 会产生意想不到的效果。

比如我们将 FutureInterruptTask 代码修改为 while(true) 形式,调用 cancel(true) 方法线程还是会被中断。

static class FutureInterruptTask implements Callable<String> {
    @Override
    public String call() throws Exception {
        while (true) {
            System.out.println("循环执行");
            Thread.sleep(500);
        }
    }
}

isCancelled方法(判断是否被取消)

isCancelled 方法,判断是否被取消,它和 cancel 方法配合使用,比较简单,可以参考上面的示例。

Callable 和 Future 的关系

Callable 接口相比于 Runnable 的一大优势是可以有返回结果,返回结果就可以用 Future 类的 get 方法来获取 。因此,Future 相当于一个存储器,它存储了 Callable 的 call 方法的任务结果。

除此之外,我们还可以通过 Future 的 isDone 方法来判断任务是否已经执行完毕了,还可以通过 cancel 方法取消这个任务,或限时获取任务的结果等,总之 Future 的功能比较丰富。

FutureTask

Future只是一个接口,不能直接用来创建对象,其实现类是FutureTask,JDK1.8修改了FutureTask的实现,JKD1.8不再依赖AQS来实现,而是通过一个volatile变量state以及CAS操作来实现。FutureTask结构如下所示:


我们来看一下 FutureTask 的代码实现:

public class FutureTask implements RunnableFuture {...}

可以看到,它实现了一个接口,这个接口叫作 RunnableFuture。

RunnableFuture接口

我们来看一下 RunnableFuture 接口的代码实现:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

既然 RunnableFuture 继承了 Runnable 接口和 Future 接口,而 FutureTask 又实现了 RunnableFuture 接口,所以 FutureTask 既可以作为 Runnable 被线程执行,又可以作为 Future 得到 Callable 的返回值。

FutureTask源码分析

成员变量

/*
 * 当前任务运行状态
 * NEW -> COMPLETING -> NORMAL(正常结束,返回结果)
 * NEW -> COMPLETING -> EXCEPTIONAL(返回异常结果)
 * NEW -> CANCELLED(任务被取消,无结果)
 * NEW -> INTERRUPTING -> INTERRUPTED(任务被打断,无结果)
 */
private volatile int state;
private static final int NEW          = 0; // 新建 0
private static final int COMPLETING   = 1; // 执行中 1
private static final int NORMAL       = 2; // 正常 2
private static final int EXCEPTIONAL  = 3; // 异常 3
private static final int CANCELLED    = 4; // 取消 4
private static final int INTERRUPTING = 5; // 中断中 5
private static final int INTERRUPTED  = 6; // 被中断 6

/** 将要被执行的任务 */
private Callable<V> callable;
/** 存放执行结果,用于get()方法获取结果,也可能用于get()方法抛出异常 */
private Object outcome; // non-volatile, protected by state reads/writes
/** 执行任务Callable的线程; */
private volatile Thread runner;
/** 栈结构的等待队列,该节点是栈中最顶层的节点 */
private volatile WaitNode waiters;

为了后面更好的分析FutureTask的实现,这里有必要解释下各个状态。

  • NEW :表示是个新的任务或者还没被执行完的任务。这是初始状态。
  • COMPLETING :任务已经执行完成或者执行任务的时候发生异常,但是任务执行结果或者异常原因还没有保存到outcome字段(outcome字段用来保存任务执行结果,如果发生异常,则用来保存异常原因)的时候,状态会从NEW变更到COMPLETING。但是这个状态会时间会比较短,属于中间状态。
  • NORMAL :任务已经执行完成并且任务执行结果已经保存到outcome字段,状态会从COMPLETING转换到NORMAL。这是一个最终态。
  • EXCEPTIONAL :任务执行发生异常并且异常原因已经保存到outcome字段中后,状态会从COMPLETING转换到EXCEPTIONAL。这是一个最终态。
  • CANCELLED :任务还没开始执行或者已经开始执行但是还没有执行完成的时候,用户调用了cancel(false)方法取消任务且不中断任务执行线程,这个时候状态会从NEW转化为CANCELLED状态。这是一个最终态。
  • INTERRUPTING :任务还没开始执行或者已经执行但是还没有执行完成的时候,用户调用了cancel(true)方法取消任务并且要中断任务执行线程但是还没有中断任务执行线程之前,状态会从NEW转化为INTERRUPTING。这是一个中间状态。
  • INTERRUPTED :调用interrupt()中断任务执行线程之后状态会从INTERRUPTING转换到INTERRUPTED。这是一个最终态。

有一点需要注意的是,所有值大于COMPLETING的状态都表示任务已经执行完成(任务正常执行完成,任务执行异常或者任务被取消)。

构造方法

// Callable 构造方法
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

// Runnable 构造方法
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

Runnable的构造器,只有一个目的,就是通过Executors.callable把入参转化为RunnableAdapter,主要是因为Callable的功能比Runnable丰富,Callable有返回值,而Runnable没有。

/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

这是一个典型的适配模型,我们要把 Runnable 适配成 Callable,首先要实现 Callable 的接口,接着在 Callable 的 call 方法里面调用被适配对象(Runnable)的方法。

内部类

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

run方法

/**
 * run方法可以直接被调用
 * 也可以开启新的线程调用
 */
public void run() {
    // 状态不是任务创建,或者当前任务已经有线程在执行了,直接返回
    if (state != NEW ||
        !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        // Callable 不为空,并且已经初始化完成
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //调用执行
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;//执行失败
                //通过CAS算法设置返回值(COMPLETING)和状态值(EXCEPTIONAL)
                setException(ex);
            }
            //执行成功通过CAS(UNSAFE)设置返回值(COMPLETING)和状态值(NORMAL)
            if (ran)
                //将result赋值给outcome
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        //将任务runner设置为null,避免发生并发调用run()方法
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        //须重新读取任务状态,避免不可达(泄漏)的中断
        int s = state;
        //确保cancle(ture)操作时,运行中的任务能接收到中断指令
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

  1. run方法是没有返回值的,通过给outcome属性赋值(set(result)),get时就能从outcome属性中拿到返回值。
  2. FutureTask 两种构造器,最终都转化成了 Callable,所以在 run 方法执行的时候,只需要执行 Callable 的 call 方法即可,在执行 c.call()代码时,如果入参是 Runnable 的话, 调用路径为 c.call() -> RunnableAdapter.call() -> Runnable.run(),如果入参是 Callable 的话,直接调用。

setException(Throwable t)方法

//发生异常时,将返回值设置到outcome(=COMPLETING)中,并更新任务状态(EXCEPTIONAL)
protected void setException(Throwable t) {
    //调用UNSAFE类封装的CAS算法,设置值
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
    UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
    //唤醒因等待返回值而阻塞的线程
    finishCompletion();
    }
}

set(V v)方法

//任务正常完成,将返回值设置到outcome(=COMPLETING)中,并更新任务状态(=NORMAL)
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

finishCompletion方法

//移除所有等待线程并发出信号,调用done(),以及将任务callable清空
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            //循环唤醒阻塞线程,直到阻塞队列为空
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                //一直到阻塞队列为空,跳出循环
                if (next == null)
                    break;
                q.next = null; // unlink to help gc   方便gc在适当的时候回收
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

handlePossibleCancellationInterrupt方法

private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    //自旋等待cancle(true)结束(中断结束)
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
             Thread.yield(); // wait out pending interrupt

    // assert state == INTERRUPTED;

    // We want to clear any interrupt we may have received from
    // cancel(true).  However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
}

cancle方法

//取消任务执行
public boolean cancel(boolean mayInterruptIfRunning) {
    //对NEW状态的任务进行中断,并根据参数设置state
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        //任务已完成(已发出中断或已取消)
        return false;       
    //中断线程
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {//cancel(true)
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                //通过CAS算法,更新状态
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        //唤醒阻塞线程
        finishCompletion();
    }
    return true;
}

get方法

 /**
 * 获取执行结果
 * @throws CancellationException {@inheritDoc}
 */
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        //假如任务还没有执行完,则阻塞则塞线程,直至任务执行完成(结果已经存放到对应变量中)
        s = awaitDone(false, 0L);
    //返回结果
    return report(s);
}

/**
 * 获取任务执行结果,指定时间结束,则超时返回,不再阻塞
 * @throws CancellationException {@inheritDoc}
 */
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

awaitDone方法

/**
 * Awaits completion or aborts on interrupt or timeout.
 * 如英文注释:等待任务执行完毕或任务中断或任务超时
 *
 * @param timed true if use timed waits
 * @param nanos time to wait, if timed
 * @return state upon completion
 */
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    //循环等待
    for (;;) {
        //线程已经中断,则移除等待任务
        if (Thread.interrupted()) {
            removeWaiter(q);
            //移除当前任务后,抛出中断异常
            throw new InterruptedException();
        }

        //任务已经完成,则返回任务状态,并对当前任务清场处理
        int s = state;
        if (s > COMPLETING) {
            if (q != null) //任务不为空,则将执行线程设为null,避免并发下重复执行
                q.thread = null;
            return s;
        }
        //设置结果,很快就能完成,自旋等待
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();  //任务提前退出
        //正在执行或者还没开始,则构建新的节点
        else if (q == null)
            q = new WaitNode();
        //判断是否入队,新节点一般在下一次循环入队列阻塞
        else if (!queued)
            //没有入队列,设置q.next=waiters,并将waiters设为q
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
        //假如有超时限制,则判断是否超时
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                //超时则将任务节点从阻塞队列中移除,并返回状态
                removeWaiter(q);
                return state;
            }
            //阻塞调用get方法的线程,有超时限制
            LockSupport.parkNanos(this, nanos);
        }
        else
            //阻塞调用get方法的线程,无超时限制
            LockSupport.park(this);
    }
}

removeWaiter方法

//移除任务节点
private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                if (q.thread != null)
                    pred = q;
                else if (pred != null) {
                    pred.next = s;
                    if (pred.thread == null) // check for race
                        continue retry;
                    }
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                    continue retry;
            }
            break;
        }
    }
}

done()方法

protected void done() { }

默认实现不起任何作用。子类可以重写,此方法调用完成回调或执行。注意:也可以在实现此方法来确定此任务是否已取消。

Future的使用

FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。另外,FutureTask还可以确保即使调用了多次run方法,它都只会执行一次Runnable或者Callable任务,或者通过cancel取消FutureTask的执行等。

FutureTask执行多任务计算的使用场景

利用FutureTask和ExecutorService,可以用多线程的方式提交计算任务,主线程继续执行其他任务,当主线程需要子线程的计算结果时,在异步获取子线程的执行结果。

package com.niuh.future;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

/**
 * <p>
 * FutureTask执行多任务计算的使用场景
 * </p>
 */
public class FutureTaskForMultiCompute {
    public static void main(String[] args) {

        FutureTaskForMultiCompute inst = new FutureTaskForMultiCompute();

        // 创建任务集合
        List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();

        // 创建线程池
        ExecutorService exec = Executors.newFixedThreadPool(5);

        for (int i = 0; i < 10; i++) {
            // 传入Callable对象创建FutureTask对象
            FutureTask<Integer> ft = new FutureTask<Integer>(inst.new ComputeTask(i, "" + i));

            taskList.add(ft);
            
            // 提交给线程池执行任务,也可以通过exec.invokeAll(taskList)一次性提交所有任务;
            exec.submit(ft);
        }

        System.out.println("所有计算任务提交完毕, 主线程接着干其他事情!");

        // 开始统计各计算线程计算结果
        Integer totalResult = 0;
        for (FutureTask<Integer> ft : taskList) {
            try {
                //FutureTask的get方法会自动阻塞,直到获取计算结果为止
                totalResult = totalResult + ft.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

        // 关闭线程池
        exec.shutdown();
        System.out.println("多任务计算后的总结果是:" + totalResult);

    }

    private class ComputeTask implements Callable<Integer> {

        private Integer result = 0;
        private String taskName = "";

        public ComputeTask(Integer iniResult, String taskName) {
            result = iniResult;
            this.taskName = taskName;
            System.out.println("生成子线程计算任务: " + taskName);
        }

        public String getTaskName() {
            return this.taskName;
        }

        @Override
        public Integer call() throws Exception {
            // TODO Auto-generated method stub

            for (int i = 0; i < 100; i++) {
                result = +i;
            }
            // 休眠5秒钟,观察主线程行为,预期的结果是主线程会继续执行,到要取得FutureTask的结果是等待直至完成。
            Thread.sleep(5000);
            System.out.println("子线程计算任务: " + taskName + " 执行完成!");
            return result;
        }
    }
}

执行结果:

生成子线程计算任务: 0
生成子线程计算任务: 1
生成子线程计算任务: 2
生成子线程计算任务: 3
生成子线程计算任务: 4
生成子线程计算任务: 5
生成子线程计算任务: 6
生成子线程计算任务: 7
生成子线程计算任务: 8
生成子线程计算任务: 9
所有计算任务提交完毕, 主线程接着干其他事情!
子线程计算任务: 0 执行完成!
子线程计算任务: 1 执行完成!
子线程计算任务: 3 执行完成!
子线程计算任务: 4 执行完成!
子线程计算任务: 2 执行完成!
子线程计算任务: 5 执行完成!
子线程计算任务: 7 执行完成!
子线程计算任务: 9 执行完成!
子线程计算任务: 8 执行完成!
子线程计算任务: 6 执行完成!
多任务计算后的总结果是:990

FutureTask在高并发环境下确保任务只执行一次

在很多高并发的环境下,往往我们只需要某些任务只执行一次。这种使用情景FutureTask的特性恰能胜任。举一个例子,假设有一个带key的连接池,当key存在时,即直接返回key对应的对象;当key不存在时,则创建连接。对于这样的应用场景,通常采用的方法为使用一个Map对象来存储key和连接池对应的对应关系,典型的代码如下面所示:

package com.niuh.future;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @program: 错误示例
 * @description: 在很多高并发的环境下,往往我们只需要某些任务只执行一次。
 * 这种使用情景FutureTask的特性恰能胜任。举一个例子,假设有一个带key的连接池,
 * 当key存在时,即直接返回key对应的对象;当key不存在时,则创建连接。对于这样的应用场景,
 * 通常采用的方法为使用一个Map对象来存储key和连接池对应的对应关系,典型的代码如下
 * 在例子中,我们通过加锁确保高并发环境下的线程安全,也确保了connection只创建一次,然而却牺牲了性能。
 */
public class FutureTaskConnection1 {
    private static Map<String, Connection> connectionPool = new HashMap<>();
    private static ReentrantLock lock = new ReentrantLock();

    public static Connection getConnection(String key) {
        try {
            lock.lock();
            Connection connection = connectionPool.get(key);
            if (connection == null) {
                Connection newConnection = createConnection();
                connectionPool.put(key, newConnection);
                return newConnection;
            }
            return connection;
        } finally {
            lock.unlock();
        }
    }

    private static Connection createConnection() {
        try {
            return DriverManager.getConnection("");
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }
}

在上面的例子中,我们通过加锁确保高并发环境下的线程安全,也确保了connection只创建一次,然而却牺牲了性能。改用ConcurrentHash的情况下,几乎可以避免加锁的操作,性能大大提高。

package com.niuh.future;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @description: 改用ConcurrentHash的情况下,几乎可以避免加锁的操作,性能大大提高。
 * <p>
 * 但是在高并发的情况下有可能出现Connection被创建多次的现象。
 * 为什么呢?因为创建Connection是一个耗时操作,假设多个线程涌入getConnection方法,都发现key对应的键不存在,
 * 于是所有涌入的线程都开始执行conn=createConnection(),只不过最终只有一个线程能将connection插入到map里。
 * 但是这样以来,其它线程创建的的connection就没啥价值,浪费系统开销。
 */
public class FutureTaskConnection2 {
    private static ConcurrentHashMap<String, Connection> connectionPool = new ConcurrentHashMap<>();

    public static Connection getConnection(String key) {
        Connection connection = connectionPool.get(key);
        if (connection == null) {
            connection = createConnection();
            //根据putIfAbsent的返回值判断是否有线程抢先插入了
            Connection returnConnection = connectionPool.putIfAbsent(key, connection);
            if (returnConnection != null) {
                connection = returnConnection;
            }
        } else {
            return connection;
        }
        return connection;
    }

    private static Connection createConnection() {
        try {
            return DriverManager.getConnection("");
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }

}

但是在高并发的情况下有可能出现Connection被创建多次的现象。
为什么呢?

因为创建Connection是一个耗时操作,假设多个线程涌入getConnection方法,都发现key对应的键不存在,于是所有涌入的线程都开始执行conn=createConnection(),只不过最终只有一个线程能将connection插入到map里。但是这样以来,其它线程创建的的connection就没啥价值,浪费系统开销。

这时最需要解决的问题就是当key不存在时,创建Connection的动作(conn=createConnection();)能放在connectionPool.putIfAbsent()之后执行,这正是FutureTask发挥作用的时机,基于ConcurrentHashMap和FutureTask的改造代码如下:

package com.niuh.future;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

/**
 * @description: FutureTask在高并发环境下确保任务只执行一次
 * 这时最需要解决的问题就是当key不存在时,创建Connection的动作(conn=createConnection();)
 * 能放在connectionPool.putIfAbsent()之后执行,这正是FutureTask发挥作用的时机,
 * 基于ConcurrentHashMap和FutureTask的改造代码如下:
 */
public class FutureTaskConnection3 {
    private static ConcurrentHashMap<String, FutureTask<Connection>> connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>();

    public static Connection getConnection(String key) {
        FutureTask<Connection> connectionFutureTask = connectionPool.get(key);
        try {
            if (connectionFutureTask != null) {
                return connectionFutureTask.get();
            } else {
                Callable<Connection> callable = new Callable<Connection>() {
                    @Override
                    public Connection call() throws Exception {
                        return createConnection();
                    }
                };
                FutureTask<Connection> newTask = new FutureTask<>(callable);
                FutureTask<Connection> returnFt = connectionPool.putIfAbsent(key, newTask);
                if (returnFt == null) {
                    connectionFutureTask = newTask;
                    newTask.run();
                }
                return connectionFutureTask.get();
            }
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    private static Connection createConnection() {
        try {
            return DriverManager.getConnection("");
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }
}

FutureTask任务执行完回调

FutureTask有一个方法 void done()会在每个线程执行完成return结果时回调。
假设现在需要实现每个线程完成任务执行后主动执行后续任务。

package com.niuh.future;

import lombok.extern.slf4j.Slf4j;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

/**
 * FutureTask#done()
 */
@Slf4j
public class FutureTaskDemo1 {

    public static void main(String[] args) throws InterruptedException {
        // 月饼生产者
        final Callable<Integer> productor = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                log.info("月饼制作中。。。。");
                Thread.sleep(5000);
                return (Integer) new Random().nextInt(1000);
            }
        };

        // 月饼消费者
        Runnable customer = new Runnable() {
            @Override
            public void run() {
                ExecutorService es = Executors.newCachedThreadPool();
                log.info("老板给我来一个月饼");
                for (int i = 0; i < 3; i++) {
                    FutureTask<Integer> futureTask = new FutureTask<Integer>(productor) {
                        @Override
                        protected void done() {
                            super.done();
                            try {
                                log.info(String.format(" 编号[%s]月饼已打包好", get()));
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            } catch (ExecutionException e) {
                                e.printStackTrace();
                            }
                        }
                    };
                    es.submit(futureTask);
                }
            }
        };
        new Thread(customer).start();
    }
}

执行结果:

11:01:37.134 [Thread-0] INFO com.niuh.future.FutureTaskDemo1 - 老板给我来一个月饼
11:01:37.139 [pool-1-thread-1] INFO com.niuh.future.FutureTaskDemo1 - 月饼制作中。。。。
11:01:37.139 [pool-1-thread-2] INFO com.niuh.future.FutureTaskDemo1 - 月饼制作中。。。。
11:01:37.139 [pool-1-thread-3] INFO com.niuh.future.FutureTaskDemo1 - 月饼制作中。。。。
11:01:42.151 [pool-1-thread-2] INFO com.niuh.future.FutureTaskDemo1 -  编号[804]月饼已打包好
11:01:42.151 [pool-1-thread-3] INFO com.niuh.future.FutureTaskDemo1 -  编号[88]月饼已打包好
11:01:42.151 [pool-1-thread-1] INFO com.niuh.future.FutureTaskDemo1 -  编号[166]月饼已打包好

参考

PS:以上代码提交在 Githubhttps://github.com/Niuh-Study/niuh-juc-final.git

文章持续更新,可以公众号搜一搜「 一角钱技术 」第一时间阅读, 本文 GitHub org_hejianhui/JavaStudy 已经收录,欢迎 Star。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342