面试官:给我讲讲线程池(中)

前景回顾

在上一篇中我们通过线程池的继承关系,具体分析了线程池的抽象父类AbstractExecutorService中的submit、invokeAll、invokeAny方法。在本篇中,我们将会把视线放在ThreadPoolExecutor具体实现当中,通过源码分析我们将会明白7个参数是如何在源码中运转的。

使用场景

我们先回顾一下在实际场景下的业务代码,下面模拟了10个线程并行处理任务,然后停止线程池接受,最后等待线程池关闭。

public static void main(String[] args) throws InterruptedException {
        // 开启线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());
        // 开启10个任务并行处理
        for (int i = 0; i < 10; i++) {
            executor.execute(() -> {
                // 模拟业务代码
                try {
                    Thread.sleep(1000);
                    System.out.println("任务结束");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        // 暂停线程池任务接收
        executor.shutdown();
        // 等待线程池结束
        executor.awaitTermination(1,TimeUnit.MINUTES);
    }

构造函数

总共重载了4个构造函数,设置了默认的参数,这种设计思路大家可以借鉴,下面只展示了其中两个重要的构造函数。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             // 设置默认工厂,工厂中返回线程优先级为普通并且为非守护的线程
             Executors.defaultThreadFactory(), 
             // 默认拒绝策略为拒绝发生时直接抛出异常
             defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
            // 判断参数边界
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
            // 设置安全管理器,不在本篇考虑范围内
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
                // 设置核心线程数
        this.corePoolSize = corePoolSize;
            // 设置最大线程数
        this.maximumPoolSize = maximumPoolSize;
            // 设置工作队列
        this.workQueue = workQueue;
            // 设置线程空闲时间
        this.keepAliveTime = unit.toNanos(keepAliveTime);
            // 设置线程工厂
        this.threadFactory = threadFactory;
            // 设置拒绝策略
        this.handler = handler;
}

execute方法

public void execute(Runnable command) {
            // 边界判断
        if (command == null)
            throw new NullPointerException();
            /**
                判断当前工作线程数是否小于核心线程数
                这里的ctl可以先认为它保存了线程池的工作线程数量和线程池状态
                为什么一个变量可以表示两种状态后面会解释到
            **/
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            // 添加工作线程,返回是否成功创建,成功创建则返回
            if (addWorker(command, true))
                return;
            /**
                如果创建没成功,则重新获取线程池状态,关于线程池具体状态会在下文描述
                重新获取的原因在于execute是线程安全的方法
                那么就会存在多线程调用,在此期间线程池状态可能会发生变化,关闭或有新任务添加
                所以重新获取线程池状态保持最新的状态
            **/
            c = ctl.get();
        }
            /**
                运行到这,说明当前工作线程大于核心线程数或者创建工作线程不成功(线程池非Running)
                判断当前线程是否运行并且任务队列是否成功添加任务
            **/
        if (isRunning(c) && workQueue.offer(command)) {
            // 重新检查线程池状态
            int recheck = ctl.get();
            // 如非运行状态且能够删除删除,则拒绝任务
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 如工作线程数为0,则添加工作线程,此种情况发生在工作线程在空闲时间销毁时
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
            /**
                运行到此说明,线程池状态非running或添加任务队列不成功
                则尝试添加工作线程,如果添加不成功,则拒绝任务
            **/
        else if (!addWorker(command, false))
            reject(command);
}

总结:阅读完execute方法后,我们可以总结线程池会先在小于核心线程数的时添加核心工作线程,在任务队列无法添加任务时添加非核心工作线程,在线程池非running状态任务队列满且工作线程满时拒绝任务。

回顾我们上篇提出的问题:当我们创建核心线程数10个,最大线程数20个,任务队列为无界队列的线程池,并同时来了30个任务。

问题一:请问线程池中的线程数为多少?

问题二:那假如我把任务队列改为大小为20的队列,那么现在最多可以接收多少请求?

通过源码的阅读我们现在可以很简单的回答这两个问题。

  • 问题一:通过源码可知前10个任务直接去创建核心工作线程,由于任务队列是无界的因此后20个任务直接加入了任务队列等待核心工作线程消费。
  • 问题二:如把任务队列改为容量为20的队列,那么现可接受最大(最大线程数+队列容量)=40个请求。

在阅读execute方法时,我们把ctl属性、addWorker当做了黑盒,只是通过作者注释和方法命名去判断方法大致做了什么操作,并且我们都知道execute是一个线程安全的方法,它可以由不同的线程去调用,但是在源码中我们也没有发现加锁的部分,小伙伴们肯定非常好奇这些底层方法是如何做到这些的。

CTL

    // 类型为原子整数类,增删改查都是原子操作
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 代表共有32-3=29位
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 代表最大容量为2^29-1 
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
    // 将线程池状态储存在整数字节的高位中,代表高3位代表线程池状态
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 因高3位代表线程池状态,此方法将低29位变为0就可以得到高3位状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 与上相同,将高3位变为0,得到工作线程数量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 将rs和wc的进行或运算
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    // 判断c是否小于s
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
    // 判断c是否大于等于s
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
    // 因为只有Running小于shutdown通过此方法来判断
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    // 尝试使用CAS的方式给ctl+1
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    // 尝试使用CAS的方式给ctl-1
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

通过上方关于ctl的源码,我们可以看出作者将一个整数变量分为了两个部分,一部分用来表示线程池状态,另一部分来表示当前工作线程数,将高3位来表示线程池当前状态,后29位表示线程池大小。通过这里骚的面试官又可以出题了,问最大线程数最大可以设置为多少,又要杀倒一片。

或许会有小伙伴不懂位运算看不懂该段逻辑,又是左移又是右移的各种位运算,但其实先把方法大致的功能了解了并不影响后面源码的阅读。

由于该篇篇幅有限,推荐想要刨根问底的小伙伴查询一下问运算的资料。

线程池状态

在ctl属性的部分,我们会发现有如下几个枚举状态,那么都代表什么意思呢?

  • RUNNING :允许接收新任务并处理在任务队列中的任务。
  • SHUTDOWN:不接收新任务但处理在任务队列中的任务。
  • STOP:不接收新任务、不处理任务队列中任务、中断在处理中的任务
  • TIDYING:所有任务已结束、工作线程数为0、并会调用terminated()钩子方法
  • TERMINATED:terminated()钩子方法成功执行

在线程池中状态是这样子流转的:

  • RUNNING -> SHUTDOWN:调用线程池的shutdown()方法。
  • (RUNNING/SHUTDOWN) -> STOP:调用线程池的shutdownNow()方法。
  • SHUTDOWN -> TIDYING:当任务队列和工作线程都为空时。
  • STOP -> TIDYING:当工作线程为空时。
  • TIDYING -> TERMINATED:当terminated()钩子方法成功执行。

addWorker

该方法将会创建工作线程,并将创建数量控制在核心线程数或最大线程数,其中的firstTask为工作线程创建成功后执行的第一个任务,第二个参数代表是否为核心工作线程,最终返回线程是否创建成功。

private final HashSet<Worker> workers = new HashSet<Worker>();

private final ReentrantLock mainLock = new ReentrantLock();

private boolean addWorker(Runnable firstTask, boolean core) {
        // 给最外层循环设置标志,且该循环为死循环
        retry:
        for (;;) {
            // 获取ctl值
            int c = ctl.get();
            // 获取线程池状态
            int rs = runStateOf(c);
            if (
                // 判断是否为SHUTDOWN、STOP、TIDYING、TERMINATED其中之一
                rs >= SHUTDOWN 
                &&
                /**
                    只有在SHUTDOWN且无任务需要执行且任务队列非空的时候该段逻辑返回true
                    代表需要继续添加工作队列执行任务队列中任务
                **/
                ! (
                   // 状态为SHUTDOWN
                   rs == SHUTDOWN &&
                   // 无第一个任务需要执行
                   firstTask == null &&
                   // 任务队列非空
                   ! workQueue.isEmpty())
                  )
                return false;
            // 死循环
            for (;;) {
                // 获取当前工作线程数
                int wc = workerCountOf(c);
                if (
                    // 如当前数量超过最大容量直接返回
                    wc >= CAPACITY 
                    ||
                    // 如创建为核心工作线程则与最大核心线程大小比较,否则与最大线程数大小比较
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // CAS增加工作线程数,添加超过结束最外层循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // CAS执行没成功,值发生改变,需要重新读取CTL的值
                c = ctl.get();
                // 如线程池状态发生改变,重新执行最外层循环
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 创建工作线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                // 因该段代码将会对HashSet进行操作,所以使用重入锁加锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 重新获取状态
                    int rs = runStateOf(ctl.get());

                    if (
                        // rs为RUNNING状态
                        rs < SHUTDOWN 
                        ||
                        // 这种情况为池中工作线程到达空时时间被销毁但任务队列还有任务时
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 预检查线程是否可以启动
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                        // 将工作线程添加进workers集合中
                        workers.add(w);
                        // 记录工作线程数量最大到达的数量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // 工作线程添加表示设置为成功
                        workerAdded = true;
                    }
                } finally {
                    // 可重入锁解锁
                    mainLock.unlock();
                }
                // 如工作线程添加成功,将工作线程启动
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如工作线程启动失败,将工作线程从集合中去除
            if (! workerStarted)
                addWorkerFailed(w);
        }
        // 返回工作线程是否添加成功
        return workerStarted;
    }

通过该段源码,它在适当的时机把我们的task任务传递给了工作线程并创建,并将创建成功的工作线程加入集合中。其中CAS死循环的模式,是我们开发中可以借鉴学习的模式。

worker

通过源码,将task传递到worker中,并调用了start()方法,那么说明worker中肯定是一个线程并且有它自己的run方法,那么我们就很有必要探寻其中是如何进行编码的。

上图是Worker类的继承关系图,可以看出Worker继承了AQS、实现了Runnable方法,那么我们就可以大胆的猜测他实现了某种锁的机制、并且可以被线程执行。

worker构造器
        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

    Worker(Runnable firstTask) {
            // 这里先看做设置标记
            setState(-1); 
            // 设置第一个将会执行的任务
            this.firstTask = firstTask;
            /**
                通过最开始通过线程池构造器传入的线程池工厂创建线程
                因为worker实现Runnable接口,那么它就可以通过传入新线程中
                可以推断出调用了thread.start()就会执行worker的run()方法
            **/
            this.thread = getThreadFactory().newThread(this);
}

run方法

public void run() {
    runWorker(this);
}

protected void beforeExecute(Thread t, Runnable r) { }

protected void afterExecute(Runnable r, Throwable t) { }

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 设置标志可被中断
        w.unlock();
        boolean completedAbruptly = true;
        try {
            while (
                // firstTask不为空或可以从任务队列中获取到任务
                task != null || (task = getTask()) != null) {
                w.lock();
                // 中断判断,在下篇介绍
               if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 钩子方法,子类实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    // 调用task的run方法,并抓住所有异常,由钩子方法处理
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 钩子方法,子类实现
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 将task置空
                    task = null;
                    // 完成的任务数+1
                    w.completedTasks++;
                    // 标志worker可用
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 执行工作线程退出
            processWorkerExit(w, completedAbruptly);
        }
    }

通过该段代码,可用分析出该段代码通过while循环一直从getTask()中获取任务,那么下面分析getTask方法。

getTask

private Runnable getTask() {
        boolean timedOut = false; 
        
        // 死循环
        for (;;) {
            // 获取当前状态
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (
                // SHUTDOWN、STOP、TIDYING、TERMINATED
                rs >= SHUTDOWN && 
                // STOP、TIDYING、TERMINATED或任务队列为空
                (rs >= STOP || workQueue.isEmpty())) {
                // 工作线程数量-1
                decrementWorkerCount();
                // return null之后上层runWorker将会退出while循环执行工作线程退出
                return null;
            }
            // 获取当前工作线程数量
            int wc = workerCountOf(c);

            // 判断是否允许超时:当允许核心线程超时为true或当前数量超过核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if (
                // 当前工作线程数量超过最大数量或允许超时并且已经超时
                (wc > maximumPoolSize || (timed && timedOut))
                && 
                // 工作线程大于1或者任务队列为空
                (wc > 1 || workQueue.isEmpty())) {
                // 尝试CAS工作线程数量-1
                if (compareAndDecrementWorkerCount(c))
                    return null;
                // CAS不超过,继续下一次循环
                continue;
            }

            try {
                // 如果允许超时则调用poll方法等待设置定的超时时间,否则调用take方法一直阻塞等待任务获取
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 获取到任务直接返回
                if (r != null)
                    return r;
                // 执行到这说明获取任务超时,设置超时标记位
                timedOut = true;
            } catch (InterruptedException retry) {
                // 线程被中断,设置超时标记为false,重新下一次循环
                timedOut = false;
            }
        }
    }

通过getTask方法我们可以看出,在设置允许核心线程超时或当前线程数大于核心线程数则表示超时开启,由此开关来判断调用阻塞队列中的阻塞方法还是非阻塞方法,一旦超时则返回null那么worker的run方法就会退出循环进入worker销毁过程,由此实现线程池线程数量的动态修改。

总结

本文通过通过execute方法作为切入点,带大家认识了CAS模式、锁模式以及是如何处理线程池状态。

在阅读源码的过程中,很多人喜欢刨根问底,但其实阅读源码就是一个不求甚解的过程,在实际阅读源码过程中调用栈可能会达到5-6层甚至可能更多层,这样子阅读源码其实是非常低效的,在一直往下深挖的过程中你会发现你的时间和精力在不断的被消耗,最后只明白了源码中的一部分的逻辑分支,和我们阅读源码的初衷完全不同。

所以我推荐阅读源码先阅读调用栈的1-2层,再往深了就不要去深究了 ,等到整体逻辑都看明白了可以再回过头来去学习哪些具体细节。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容