JDK(1.8)线程池源码探究:ThreadPoolExecutor

结合JDK(1.8)源码分析线程池(ThreadPoolExecutor)实现原理


我们平时所提的线程池,大多指的是ThreadPoolExecutor,而非ThreadPool,关于ThreadPool这里不做赘叙。集中探讨一下ThreadPoolExecutor。

  • 带着问题解读源码,先看问题:
  1. JDK中有多种线程池,我们应该使用哪一种线程池?选择的依据是什么?
  2. 线程池中的线程会一直存活着吗?
  3. 线程池饱和之后,无法接收新任务,线程池如何处理这部分任务(线程池拒绝策略)?
  4. 线程池是如何实现将任务提交与任务执行分离的?
  5. 线程池中使用了“生产者-消费者”模式,那么它是如何实现的?
  6. 线程池中的各个参数分别代表什么?并且线程池中的线程数量如何确定?

如果上述问题,很清楚的话,也请同学留步,下面是从源码角度来分析上述问题。


为了便于后续描述,先贴出使用线程池的基本代码:

/**
 * 演示线程池基本操作
 * @author Fooisart
 * Created on 2018-12-11
 */
public class ExecutorServiceDemo {
    public static void main(String[] args) {
        //1,线程池创建
        ExecutorService executor = Executors.newFixedThreadPool(2);
        //2,新建任务
        Runnable task = new Runnable() {
            @Override
            public void run() {
                System.out.println("The real task is executing!");
            }
        };
        //3,提交任务到线程池
        executor.submit(task);
        //4,线程池关闭
        executor.shutdown();
    }
}

上述代码,演示了线程池的一个简单使用方式。文首提到,线程池一般指的是ThreadPoolExecutor,而上述示例代码中,并未出现ThreadPoolExecutor。那是不是表明,在Java中是不是还有其他类表示线程池呢?其实不然,示例代码中,有两个类:

  • ExecutorService
  • Executors

分别解释一下,这哥俩与ThreadPoolExecutor的关系。

  • Executors 看一下源码注释,
Factory and utility methods for Executor

第一句话表明其真正意义,表明自己是一个工具类,进一步说是一个工厂类。这个工厂的产品是—ThreadPoolExecutor。组装不同的参数,生产出不同功能的线程池。Executors源码解读.

  • ExecutorService 不贴它源码了。从他的名字可以看出,它是一个服务(Service),为线程池服务。它是一个接口,提供了一些操作线程池的接口方法。
    ExecutorService.png

    结合示例代码,分析ThreadPoolExecutor源码。
    第1步创建线程池,已解释。第2部创建任务,线程的基础知识。看第3步,提交任务到线程池,跟进(AbstractExecutorService):
 /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

一共四行代码,第二行是任务封装(封装成FutureTask);第三行,execute,把上一步封装好的任务,在此执行。(BTW,之前读过1.7源码,这个方法是在ThreadPoolExecutor中的,1.8抽象出了一个AbstractExecutorService)。继续跟进,进入ThreadPoolExecutor.execute()方法。此时,正式进入ThreadPoolExecutor,它是线程池的核心类,先简要介绍一下ThreadPoolExecutor。

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

ThreadPoolExecutor中有多个构造函数,其中有一个最基础的构造函数,贴出的是它的说明。其中有7个参数,分别介绍,先从简单地讲起:

  • threadFactory : 用来创建线程,一般是用默认即可。除非你想把你线程池中的线程编号,或者有规律地命名。
  • handler : 拒绝策略。所谓的拒绝策略,即当我们自己所定义的线程池无法再添加任务时(什么时候无法添加一会说),那么会使用拒绝策略。看一下源码中对拒绝策略的说明:
1,In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime
 RejectedExecutionException upon rejection.
简介:此为默认拒绝策略,会抛异常

2,In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute 
itself runs the task. This provides a simple feedback control mechanism that will 
slow down the rate that new tasks are submitted.
简介:把并行改串行,显然会降低线程池的执行效率。与策略1相比,优点是不
会抛异常,已不会丢任务,缺点也很明细,太慢啦!

3,In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped.
简介:这个很痛快,直接抛弃任务,也没有异常。效率是兼顾了,正确性与完整
性丢失了,或者起码得让我知道,任务被抛弃了

4,In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down, 
the task at the head of the work queue is dropped, and then execution is retried 
(which can fail again, causing this to be repeated.)
简介:无
  • corePoolSize : 核心线程数,即一直存活在线程池中。当有新任务提价时,开始创建,直至创建的线程数达到corePoolSize值。
  • workQueue : 工作队列,如果说线程数已经达到了corePoolSize,此时,又有新任务进入,咋办?那么这些任务会进入到等待队列中。
  • maximumPoolSize : 最大线程数。前面提到了工作队列,既然是队列,就会有满的那一天。满了之后,咋办?则会接着创建临时线程(有点像快递公司在大促期间找的临时工),临时处理一下这些任务。
  • unit : 时间单位。
  • keepAliveTime : 存活时间。既然是存活时间,那代表什么存活时间呢?代表的是,上面提到的临时线程存活时间,配合时间单位(unit)参数,确定临时线程干完活之后,还能活多久(快递公司大促期间雇的临时工,雇佣时间)。

介绍完线程池各个参数的意义,回到ThreadPoolExecutor.execute方法:

 /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     * 介绍:该方法是任务执行,从注释中看出任务会在某个时间点执行。所有这个方法,只是保证了任务
     *      提交。提交了之后,具体如何执行,还得参照线程池中线程的实际情况。是立即执行还是排队,
     *      是用已有线程执行,还是创建新线程处理它,都是未知的。
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         * 以下为线程池的核心三步,仔细看上文中7个参数的介绍,下面三步也就容易理解了
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         * 第一步,如果线程池中线程数量小于核心线程数,则创建线程,然后调用执行。
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 第二步,达到核心线程数之后,加入队列。加入队列成功后,等待被取出执行。
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         * 第三步,如果队列满了(加入队列失败),继续创建线程,直至达到max
         * 线程数。如果达到max线程数,开始使用某一种拒绝策略,拒绝任务
         */

          /**
           * ctl这个参数使用了二进制方式存了线程池的两个信息:
           * ①当下线程池中线程数量;②线程池状态,比如线程池关闭了等。
           * 
           * workerCountOf,获取线程池线程数量;
           * isRunning() 判断线程池是否处于运行态
         */
        int c = ctl.get();
        //第一步
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //第二步
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //第三步
        else if (!addWorker(command, false))
            reject(command);
    }

源码中已添加了一些我的个人理解,应该很容易理解了。回想一下,咱们使用线程池,主要是为了高效地执行任务。执行任务,才是我们最关心的。可是至今,仍未明显地看到任务执行,所以继续跟进。我已经趟平了,直接进入addWorder():

 private boolean addWorker(Runnable firstTask, boolean core) {
        //线程创建前的一些合理性校验
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        //任务执行
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
             
            /**
             * 其实这里的firstTask是我们最初传进来要执行的任务,这里把它封装成了一个Worker。
             * 看Worker源码可以知道,其实它是Runnable接口的一个实现类。在这里,Worker充当的
             * 是一个线程的角色,它参数中有thread。那么线程的启动肯定要调用start()方法。
             * 在下方也确实找到了start()方法,启动之后,根据线程的基本知识,肯定是进入了线程
             * 的run()方法。
             */
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

根据上文代码注释中,进入Worker.run()方法:

public void run() {
            runWorker(this);
        }

继续跟进runWorker(),

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
         /**
         * firstTask是我们自己提交的任务
         * 
         * 可以看到,下午中task直接调用了run()方法。根据线程的基础知识,只用调用线程的start()方法,
         * 才会启动一个线程。而调用线程的run()方法,则会当做普通的方法执行,不会异步执行。
         * 其实在这里就解释了任务提交与执行分离。线程池帮我们创建了线程,然后使用它创建的线程串行地
         * 调用了我们的run()方法,从而执行了我们的任务。
         */
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //getTask()此处是不断地获取队列中的任务。
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

至此,线程池已基本分析完了。文章开头介绍的那几个问题,没有一一解释,但如果是认真读到这里的话,那几个问题应该已经了然于胸了。


As always and let me know what you think, leave the comments below. Bye :)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容