深入浅出Java(Android )线程池ThreadPoolExecutor

前言

关于线程池
在Java/Android开发中,设计到并发的请求,那基本上是离不开线程池了。用线程池的好处:

  • 1、减少线程频繁创建、销毁的开销;
  • 2、好控制并发量,降低OOM的可能,至于原因文中会说;
  • 3、提高程序的响应速度,因为可以省去部分创建的过程;

要不要深度学习线程池

  • 对于服务端的同学来说应该会比较重视这一块,因为需要做高并发;而移动端的同学可能比较容易忽略这一块。有些人觉得平时也用不到,移动端没有那么大并发量,或者说第三方框架中已经完成了,比如OkHtttp其实只能说有这种想法的同学还没有遇到大一点的项目或者说没有太多多线程优化的经验。如果你真的遇到了这种项目瓶颈,你连线程池的运行原理都不知道,那又如何解决项目问题呢?

  • 如果你要寻求一份中高级开发工程师的工作,那线程池是基本是必问题目之一,而且还要有一定深度。

如何深度学习线程池
这也是我们今天的重点,本文将从下面几点带大家快速掌握线程池的要点:

  • 1、从API使用到原码解析,基于JDK1.8版本;
  • 2、从源码阅读(深入)中总结出(浅出)线程池工作原理;
  • 3、对应用场景的分析以及异常处理

预览

线程池类图.png

先对线程池的部分核心类/接口做个简介,大家有个印象就好。
Executor接口

public interface Executor {

    /**
     * 就一个方法,用来执行线程任务的,类似于Thread的start()方法
     */
    void execute(Runnable command);
}

由于Executor是一个接口,所以execute是由具体的实现类来完成的,调用这个方法,可能会出现如下情况:

  • 1.创建一个新线程并立即启动;
  • 2.复用线程池中空闲的线程来执行任务;
  • 3.进入一个阻塞队列中排队;
  • 4.抛出异常/拒绝接收该任务,这个要看具体的拒绝策略,默认抛出异常。

ExecutorService接口
继承自Executor接口,我们常用的很多方法就是在这个接口中定义的。主要涉及到:提交任务关闭线程获取结果


public interface ExecutorService extends Executor {

    /**
     * 关闭线程池,新提交的任务会被拒绝,但是已经提交的任务会继续执行
     */
    void shutdown();

    /**
     * 关闭线程池,新提交的任务会被拒绝,并且尝试关闭正在执行的任务
     */
    List<Runnable> shutdownNow();

    /**
     * 线程池是否已关闭
    */
    boolean isShutdown();

    /**
     * 如果调用了shutdown或者shutdownNow之后,所有的任务都结束了,那么返回true,否则返回false
     */
    boolean isTerminated();

    /**
     * 当调用shutdown 或 shutdownNow之后,再调用这个方法,会
     *等待所有的任务执行完成,直到超时(超过timeout)或者说当前的线程被中断了
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;


    /**
     * 提交一个Runnable 任务
     */
    Future<?> submit(Runnable task);

    /**
     * 执行所有任务,返回 Future 类型的一个 list
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
}

AbstractExecutorService
抽象类,实现了ExecutorService接口。主要封装了通过submit方式提交任务的一些操作。

注意: 不需要获取结果,可以用 execute 方法;需要获取结果(FutureTask)用 submit 方法。

由于篇幅有限,本文只针对execute方式做讲解,想了解submit 方式的同学可以参考深度解读 java 线程池设计思想及源码实现

Executors
这是大多数人最常用的一个类,实质上就是一个工具类。可以快速的构建一个线程池对象,常见的操作有如下:

   /**
     * 创建一个固定大小的线程池,而且全是核心线程,
     * 会一直存活,除非特别设置了核心线程的超时时间
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

   /**
     * 创建了一个没有大小限制的线程池,全是非核心线程;如果线程
     * 空闲的时间超过60s就会被移除
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

   /**
     * 这个线程池只有1个唯一的核心线程
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

   /**
     * 创建一个定长的线程池,可以执行周期性的任务
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

可以看出这几种方式最后都是通过ThreadPoolExecutor来实现的,所以下面就来研究一下今天的主角ThreadPoolExecutor,等理解了这个类,也就可以掌握线程池等工作原理,甚至可以根据自己的策略来自定义线程池。

ThreadPoolExecutor(关键类)

继承与抽象方法AbstractExecutorService,也就间接实现了ExecutorServiceExecutor等接口。

从构造方法谈起

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • corePoolSize:
    核心线程数量,所谓核心线程就是一直保留在线程池中,及时处于空闲状态也不会销毁等线程,除非手动调用allowCoreThreadTimeOut才可以在超时销毁。
  • maximumPoolSize:
    线程池允许创建的最大线程数量。
  • keepAliveTime:
    线程池中除了有核心线程之外,还有非核心线程,非核心线程处于空闲的时候会在一定时间范围内被关闭,而这个空闲的时间就是keepAliveTime。
  • unit:
    keepAliveTime的时间单位,比如秒、分、时等
  • workQueue:
    保存待执行任务的阻塞队列。如果一个任务进入线程池之后,如果核心线程满了的话,就会先尝试添加到队列中,当然未必添加成功,而且队列也有多种实现,具体的后面再说,先简单理解为排队即可。
  • threadFactory:
    如果没有设置的话,使用默认的ThreadFactory来创建线程;当然你也可以通过ThreadFactory自己创建线程,比如设置线程名称,优先级等
  • handler:
    当达到线程池的最大容量时的拒绝策略。当线程池饱和,继续提交任务,需要一种策略来处理该任务。线程池提供了4种策略:

    AbortPolicy:直接抛出异常,这是默认策略;
    CallerRunsPolicy:用调用者所在的线程来执行任务;
    DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    DiscardPolicy:直接丢弃任务;

一些重要属性和方法

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

这里很关键,一定要认真看,后面分析任务执行execute()方法就需要用到这些基础。

Integer.SIZE =32,这代表了java中,int最大是32位,所以COUNT_BITS等于29;CAPACITY等于1*2^29 -1,用它来表示线程池的最大容量是足够了的。

从线程池的生命周期来看,线程池有5种状态:

  • RUNNING: 能接受新的任务,也能处理队列中的任务;
  • SHUTDOWN:不接受新的任务,但是会处理队列中的任务;
  • STOP: 不接受新的任务,也不处理队列中的任务,而且会中断正在进行的任务;
  • TIDYING: 所有的任务都完成了,workCount等于 0。线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()
  • TERMINATED: terminated() 方法结束后,线程池的状态就会变成这个

关于状态转换

  • RUNNING -> SHUTDOWN:
    当调用shutdown()方法后,会发生这个状态转换;
  • (RUNNING or SHUTDOWN) -> STOP:
    当调用 shutdownNow() 后,会发生这个状态转换;
  • SHUTDOWN -> TIDYING:
    当队列和线程池都变成空的时候,会发生这个状态转换;
  • STOP -> TIDYING:
    当线程池是空的时候,会发生这个状态转换;
  • TIDYING -> TERMINATED:
    当terminated() 方法结束后,会发生这个状态转换。
线程状态转换.png

关于状态转换就讲完了,特别是前2个状态转换,更是常用。还有一个关键的属性ctl需要讲一下,初学者可能不太好理解,需要一点计算机基础。

首先ctl是一个AtomicInteger类型的对象,它其实是对int的包装,可以在多线程并发的情况下保证原子性,它传入的参数就是它表示的值。这里是通过ctlOf()方法来计算的。

在计算之前先补充2个小知识点:

1、 <<:是移位运算符,具体俩说是左移;右移用>>表示。左移的意思是将一个二进制数向左边移动1位,那么左移1位就等于这个数乘以2,左移n位的话就是乘以2^n;右移的话就是除以;

2、 由于10进制数有正负之分,所以转换成二进制数的时候,需要在最高位加上0/1来表示正负,正数用0表示;负数用1来表示。

3、原码:加上符号为之后的二进制数;反码:正数的反码是其本身,负数的反码:符号位不变,其余各位取反;补码:正数的补码就是其本身;负数的补码:即在反码的基础上+1。

4、针对二进制数的&、|、~。与(&)运算:2个都是1才为1,否则为0;或(|)运算:只要有1个为1就是1,否则为0;非(~)运算:取反,1变0,0变1。

首先RUNNING = -1 << COUNT_BITS;其中-1的二进制数1001,那转换成补码就是1111,然后左移29位就变成1110 0000 0000 0000 0000 0000 0000 0000,因为int最多32位,所以高位的1没了;然后再和0做或运算,所以结果还是它本身,所以ctl初始值为1110 0000 0000 0000 0000 0000 0000 0000,其中高3位存放线程状态,后面29位存放线程数量。

还有几个方法

  • runStateOf:获取运行状态;
  • workerCountOf:取出ctl低29位,来表示当前线程数;
  • ctlOf:获取运行状态和活动线程数的值。

execute(关键方法)

这是线程池的关键方法,用来提交任务的。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //表示 “线程池状态” 和 “线程数量” 的整数
        int c = ctl.get();
        /*
         * 如果当前活跃线程数小于核心线程数,就会添加一个worker来执行任务;
         * 具体来说,新建一个核心线程放入线程池中,并把任务添加到该线程中。
         */
        if (workerCountOf(c) < corePoolSize) {
            /*
             * addWorker()如果返回true表示添加成功,线程池会执行这个任务,那么本方法可以结束了,返回 false 代表线程池不允许提交任务,那么就会执行后面的方法。
             */
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

      //程序执行到这里,说明要么活跃线程数大于核心线程数;要么addWorker()失败

        /*
         * 如果当前线程池是运行状态,会把任务添加到队列
         */
        if (isRunning(c) && workQueue.offer(command)) {
            /*
            *这里的逻辑比较有意思,又重新检查了线程状态和数量;
            *如果线程不处于 RUNNING 状态,就会移除刚才添加到队列中的任务;
            *如果线程池还是 RUNNING 状态,并且线程数为 0,那么开启新的线程;
            * addWorker(null, false)参数分析:
            * 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;
            * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,
            * 添加线程时根据maximumPoolSize来判断; 
          */

            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

 //程序执行到这里,说明要么线程状态不是RUNNING;要么workQueue队列已经满了

         /*
          * 这时,再调用addWorker方法去创建线程,
          * 会把线程池的线程 数量的上限设置为maximum;
          * 如果失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
          */
        else if (!addWorker(command, false))
            reject(command);
    }

为什么当任务添加到队列后,内部还执行了那么复杂的判断?

因为担心任务提交到队列中了,但是线程池却关闭了。

当执行execute方法提交一个任务的时候,如果线程池一直处于RUNNING状态,那流程如下:

  • 1、当工作线程数量 < 核心线程数量,会尝试创建一个核心线程并提交任务;
  • 2、当工作线程数量 >= 核心线程数量,如果阻塞队列没有满,则把任务添加到队列中;如果队列满了,则尝试启动一个新的非核心线程来提交任务;
  • 3、当工作线程数量 > maximumPoolSize,则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

注意:
addWorker(null, false);也是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,当worker在执行的时候,会直接从workQueue中获取任务。在workerCountOf(recheck) == 0时执行addWorker(null, false);也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。

关于Worker类和addWorker方法

addWorker()是尝试在线程池中创建一个线程并执行任务,firstTask表示作为新创建的线程的第一个任务,core参数为true的时候,会用核心线程数做创建线程的边界;如果为false,会用最大线程数maximumPoolSize做为边界。如果addWorker()返回true,表示创建线程成功

private boolean addWorker(Runnable firstTask, boolean core) {}
任务执行流程.png

业务场景(分析2种常用的线程池)

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

固定大小的线程池。最大线程数与核心线程数相等,keepAliveTime的设置无效,因为核心线程默认不会销毁,阻塞队列为LinkedBlockingQueue,它是无界队列。这种线程池适合CPU密集型任务.

关于CPU密集型任务和IO密集型任务可以参考这篇文章
线程池核心线程数多少最为合适(IO密集型和CPU密集型)?

工作流程:

  • 提交任务
  • 如果线程数小于核心线程数,创建核心线程并执行任务
  • 如果线程数大于核心线程,把任务添加到LinkedBlockingQueue阻塞队列
  • 如果线程执行完任务,去阻塞队列取任务,继续执行。

虽然线程数量是固定的,但是由于使用了无界队列LinkedBlockingQueue,如果线程的并发量比较大,任务的执行时间比较长,那还是可能会OOM的。适用于CPU密集型的任务,也就是那种长期的任务。

newCachedThreadPool

   public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

核心线程数为 0,最大线程数为 Integer.MAX_VALUE,所有线程空闲时间 为 60 秒,任务队列采用 SynchronousQueue。

用它去处理那种并发量很大的任务就不合适,由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool 不会占用任何资源。适用于那种任务可以快速完成的任务。

总结

线程池的内容其实是很多的,绝不是1,2篇文章就能讲完的。本文也主要是针对提交任务之后线程池的工作原理以及线程状态变化来做讲解。核心要义如下:
1、Executors这个工具类下创建的几种线程池的工作原理。

newFixedThreadPool、newCachedThreadPool等,需要注意每一个的优缺点和使用场景。

2、ThreadPoolExecutor这个类的构造方法和一些关键成员属性

Executors创建的多种线程池都是通过它的构造方法来实现的,读者需要熟悉它的参数的意义,这样的话,就可以自定义满足个性化需求的线程池。在文中列举出的一些成员属性也很重要,后面对线程池的各种操作离不开它们。

3、理解深刻线程池中的线程创建时机
主要是那个execute()方法和addWorker()方法,主要是根据线程池状态、当前线程数、核心线程数、队列大小、线程池最大线程数来结合来判断。

4、拒绝策略

添加任务到线程池,不一定会被接受。主要看一下哪些情况会执行reject(command)方法;还有几种不同的拒绝策略,默认是抛异常。

5、异常处理

如果某个任务执行出现异常,那么执行任务的线程会被关闭。

感谢以下作者

优雅的使用Java线程池
深入理解 Java 线程池:ThreadPoolExecutor
面试必备:Java线程池解析
Java线程池中的核心线程是如何被重复利用的?
Java线程池中的核心线程是如何被重复利用的?

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容