ThreadPoolExecutor 深度分析

一般情况下我们创建一个线程我们会直接继承一个Thread 类或者实现一个Runnable接口,然后通过new Thread().start()去启动一个线程执行相应的操作。可是这样来说的话,我们需要对于每一个操作都需要创建一个线程,比如我们在处理web请求的时候,如果我们针对每一个请求都创建一个线程来处理它的话,而且一次请求处理的时间比较短。这就意味着我们需要频繁的创建和销毁一个线程,这样也就会大大降低系统的效率,因为创建和销毁线程不但需要时间而且会消耗系统资源。那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?这个时候我们就需要一个线程池了。

使用线程池有什么好处?

  • 降低资源消耗:通过线程复用,可以减少线程的创建和销毁带来的消耗。
  • 提高响应速度:一般任务被submit就可以执行。
  • 提高线程的可管理性:对提交的线程进行统一的管理监控协调等等。

ThreadPoolExecutor

Java中 的线程池是通过ThreadPoolExecutor实现。它位于java.util.concurrent包下面,通过它我就可以直接使用线程池提供的好处和功能了。来看下它的简单结构类图:


ThreadPoolExecutor类图.png

来看下它的简单结构,从上面的类图中可以看到:它继承了AbstractExecutorService接口,AbstractExecutorService继承了ExecutorService接口,ExecutorService继承了Executor接口。Executor是一个顶层接口。

我们通常创建一个线程池的时候首先使用的就是newThreadPoolExecutor(),来看看ThreadPoolExecutor构造函数做了什么事情?来看看源码

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • corePoolSize:核心线程池的大小。默认情况下,一个线程池被创建后是没有线程的,只有当任务过来的时候才会创建线程。只要任务的数量没有达到corePoolSize的个数限制都会不断创建新的线程。但是当线程池被创建以后并且调用了prestartAllCoreThreads()或者prestartCoreThread()方法后就会直接创建了corePoolSize线程。
  • maximumPoolSize:最大线程池的数量。当线程池中的数量已经达到了corePoolSize后,并且有新的任务不断的提交过来,这个时候就会把新提交的任务执行的任务的阻塞队列中去。当队列满了以后,这个时候就会创建新的线程来处理任务,线程池中的线程数量将会超过corePoolSize。而maximumPoolSize就是能够创建的最大的线程数量。
  • keepAliveTime:只有当线程池中的线程数量大于corePoolSize之后才会生效。也就是当对于大于corePoolSize的线程,它在没有接受到新的处理任务的空转等待时间是keepAliveTime。超过的时候就会被销毁回收。
  • unit:keepAliveTime的单位信息。包括(TimeUnit.DAYS:天;TimeUnit.HOURS:小时;TimeUnit.MINUTES:分钟;TimeUnit.SECONDS:秒;TimeUnit.MILLISECONDS:毫秒;
    TimeUnit.MICROSECONDS:微妙;TimeUnit.NANOSECONDS:纳秒。)
  • workQueue:线程阻塞队列,用来存放待处理任务的线程队列。
    • 1.ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
    • 2.LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于
    • 3.ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    • 4.SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    • 5.PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
    • 6.DelayQueue:一个无界的阻塞队列但是只有当等待时间过了之后才能取出保存在队列中的元素。
    • 7.LinkedTransferQueue:基于链表实现的无界队列,按照FIFO
  • threadFactory:(ThreadFactory)用于创建线程的工厂,所有的线程都是通过这个工厂来创建的。
  • handler:(RejectedExecutionHandler)当任务不能被线程池所处理的时候会被调用来进行相应的处理,这是一个接口。
    • 1.CallerRunsPolicy:对于线程池无法处理的任务直接使用调用任务的线程来处理
    • 2.AbortPolicy:对于线程池无法处理的任务直接拒绝,抛出异常
    • 3.DiscardPolicy:对于线程池无法处理的任务直接丢弃不执行。
    • 4.DiscardOldestPolicy:对于线程池无法处理的任务直接将阻塞队列里面等待时间最长的任务舍弃,然后把该任务加到队列尾部。

Executor接口

我这边就按照从上到下的顺序来分析ThreadPoolExecutor的实现和功能,首先从顶层的Executor接口来看:

public interface Executor {

    /**
     * 在将来某一个时候来运行给定的任务,这个任务可以是一个新的线程也可以是一个线程池线程
     */
    void execute(Runnable command);
}

很简单就是一个接口,接受一个Runnable参数,用来接受提交的任务,没有返回值。

ExecutorService接口

public interface ExecutorService extends Executor {

    /**
     * 关闭线程池,不再接受新的任务,但是此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出
     */
    void shutdown();

    /**
     *  这个会尝试停止正在执行的任务,并且暂停等待运行的任务,并且吧没有机会执行的任务列表返回。它是通过通过调用Thread.interrupt()方法来终止线程的,所以ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。
     */
    List<Runnable> shutdownNow();

    /**
     返回线程池是否关闭的状态
     */
    boolean isShutdown();

    /**
     * 当所有的任务都完成了才会返回true.
     */
    boolean isTerminated();

    /**
     * 在给定的时间内,都会被阻塞,除非任务完成或者到达超时时间
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 提交一个可以有返回值的任务,返回值的结果保存在Future中
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * 提交一个Runnable任务,并且通过制定的Result 返回结果到Future中
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交一个Runnable任务
     */
    Future<?> submit(Runnable task);

    /**
     * 执行给定的多个任务集合,并且把他们的执行都成功执行完成之后将状态和结果保存的Future列表中返回。
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    /**
     *  相对于上面的一个方法,增加了一个超时时间
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 运行给定的方法,可是只要有一个任务执行成功就会立即返回结果
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    /**
     *  和上面的方法类似,指定了一个超时时间
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

如上ExecutorService丰富了线程池接受任务的形式,还可以提交Callable类型的任务,可以有返回值的任务,还提供了关闭线程池的任务接口定义。

AbstractExecutorService抽象类

public abstract class AbstractExecutorService implements ExecutorService {

  /**
   *  返回一个给定Runnable任务的FutureTask。
  */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

     /**
   *  返回一个给定Callable任务的FutureTask。
  */
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
 
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
 
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
 
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

     
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        // For efficiency, especially in executors with limited
        // parallelism, check to see if previously submitted tasks are
        // done before submitting more of them. This interleaving
        // plus the exception mechanics account for messiness of main
        // loop.

        try {
            // Record exceptions so that if we fail to obtain any
            // result, we can throw the last exception we got.
            ExecutionException ee = null;
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // Start one task for sure; the rest incrementally
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;

            for (;;) {
                Future<T> f = ecs.poll();
                if (f == null) {
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0)
                        break;
                    else if (timed) {
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    else
                        f = ecs.take();
                }
                if (f != null) {
                    --active;
                    try {
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }

            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }

    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return doInvokeAny(tasks, true, unit.toNanos(timeout));
    }

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    try {
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));

            final long deadline = System.nanoTime() + nanos;
            final int size = futures.size();

            // Interleave time checks and calls to execute in case
            // executor doesn't have any/much parallelism.
            for (int i = 0; i < size; i++) {
                execute((Runnable)futures.get(i));
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L)
                    return futures;
            }

            for (int i = 0; i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    if (nanos <= 0L)
                        return futures;
                    try {
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    nanos = deadline - System.nanoTime();
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
    }

}

AbstractExecutorService抽象类增加了newTaskFor方法还有doInokeAny方法,其他的就是对ExecutorService接口中任务提交和任务调用方法的具体实现。newTaskFor主要是用于创建一个FutureTask任务,而submit方法的实现都大同小异,都是首先判断提交的任务是否为空,不为空则调用newTaskFor创建RunnableFuture任务,然后通过调用顶层的execute方法来添加任务。重点看下invokeAny和invokeAll的实现,invokeAny主要是通过调用doInvokeAny来实现业务逻辑;

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,723评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,080评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,604评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,440评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,431评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,499评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,893评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,541评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,751评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,547评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,619评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,320评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,890评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,896评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,137评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,796评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,335评论 2 342

推荐阅读更多精彩内容