这几天秋招面试的时候问到了线程池原理,因为线程池这块都是只了解API,当时没能很好的回答面试官提出的问题,花了整整一晚上结合别人的博客看了下源码,了解了线程池的大概执行流程,写一篇博客总结一下,这里我不细扣逻辑,网上的博客大部分都有对处理进行总结,我在这里主要通过源码来分析这些逻辑的实现。
网上随便找了篇博客,也对关键参数和拒绝策略以及线程池的优缺点有说明,这里不在复述:
https://www.cnblogs.com/spec-dog/p/11149741.html
通常我们创建线程池有两种方式,一种是通过线程池工厂Executor创建线程,另一种是通过new自己传参创建线程池
如下:
点开Executors的newCachedThreadPool()方法我们可以看到,其实本质上他也是通过new的方式传参创建线程池并返回,但是返回的引用类型为ExecutorService。
我们通过这个ExecutorService类型的引用就可以使用线程池的几乎所有API,那么我们可以推测出这个ExecutorService就是线程池的规范接口。
ThreadPoolExecutor直接继承自抽象类AbstractExecutorService
AbstractExecutorService实现了ExecutorService接口并实现了一些方法。
ExecutorService这个接口继承自Executor接口,其中定义了包括shotdownNow(),isShutDown()等我们使用线程池中可能会直接使用到的方法,从这里可以看出ExecutorService确实是线程池的规范接口,并且这些线程池的关键方法没有在AbstractExecutorService中实现,而是由ThreadPoolExecutor直接实现。
ExecutorService继承的Executor接口只定义了一个方法execute,也就是我们平时将任务添加到线程池中直接使用的方法。这个接口也是线程池的顶级接口。这里我们可以大致整理出ThreadPoolExecutor->AbstractExecutorService->ExecutorService->Executor
这里提一下,Executor和Executors虽然只差了一个s,却是完全不同的两个东西,Executor是线程池的顶级接口,Executors则是创建线程池的工厂类,是一个有很多静态方法用于创建不同类型线程池的工具类。
在了解源码之前我们要记住几个关键的变量(这里说的不是常见的一些创建线程池使用到的参数):
1.ctl
这是一个很神奇的变量,它通过一个变量,同时存储了当前线程数量以及线程池状态。Doug
lea也定义了一些方法如runStateOf,这些不同的方法对同一个变量进行操作可以返回不同的数据,如runStateOf返回的就是当前线程池的状态,workerCountOf返回的就是线程池的线程数量,而他们读取的参数为同一个参数ctl。ctl是AtomicInteger类型的变量。
点开这个类我们发现,其实这个类只有在一个int类型的参数value,只不过Doug lea在这里封装了对这个Int的许多原子操作方法。这些原子操作的方法底层使用的是Unsafe的原子操作方法。因为ctl存储的信息,可以说是线程池的每一个线程都一定会使用它来读取线程池状态,所以这里保证ctl的原子性还是很有必要的。
2.workQueue
工作队列,当核心线程数满了后,用于存储待执行任务的队列,这里要注意两件事,
一:他是BlockingQueue,这是一个线程安全的队列,也就是说不同的线程对这个队列执行添加或取出任务的时候不需要加锁。
二:他的存储类型为runable,说到这里,我们猜都可以猜到,如果需要执行任务,那么一定是工作线程将队列中的任务取出直接调用其run方法,也可以推测出我们需要添加的任务必须实现runable接口并把具体任务实现写在run方法中。
3.mainLock
一个ReentrantLock类型的锁,是线程池的内置锁,那么我们可以推测出可能线程池的许多方法都可能会使用到这把锁,虽然存储任务的队列workQueue是线程安全的,但是还是有一些变量,如下面所说的workers,HashSet类型线程不安全的集合,对这种非线程安全的集合进行操作,肯定会存在竞争情况(如多个线程尝试同时取任务),那么就需要mainlock进行加锁,以保证线程安全性。
4.workers
这里很关键,有些博客也会提到这个变量,它是存储worker的HashSet类型集合,作用是存储那些工作线程。但是那些博客写到这里就戛然而止了,我想看的东西全都没有!worker是怎么工作的呢?worker是怎么被回收的呢?worker是怎么创建的呢?
我们点开这个Worker
它实现了AbstractQueueSynchronizer接口,也就是我们常说的AQS,这里就不细扣AQS了,有时间的话我再花时间总结一个Reentrantlock的博客再具体分析。
这里我们关键看三个地方
一:他实现了runable接口
实现runable一定就有run方法,看这个类的名字Worker,我们推测他是以start的方式执行run方法,而不会是通过直接调用run方法的形式运行run方法(要说为什么这么推测,任务的run方法肯定是直接调用,工人(worker)的run方法肯定是需要启动)。那么就一定会有去start他的线程。
二:Thread thread
根据Worker的构造方法
我们可以发现,这个Thread类型的参数确实就是运行Worker run方法的启动线程。
三:firstTask
这是一个runable类型的变量,正好对应上我们之前看到的存储待执行工作workQueue的存储类型,再根据变量名,我们可以推测出他是Worker启动后第一个执行的任务,由上面的构造函数我们也可以看到他是在Worker创建时就被添加的。
最关键的run方法我们后面再说,了解了worker的基本实现,我们可以开始讲流程实现不带停了!
由上面的接口关系我们就可以大概了解executor这个方法的重要性,那么它又是怎么实现的呢?
我们直接点进源码:
这段代码其实网上的大部分博客都能看的到,这里也简单分析一下,
1.首先判断当前线程数是否小于核心线程数,小于的话addWorker创建核心线程执行任务。
2.不小于的话尝试将任务添加到任务队列。
3.如果线程池不处于运行状态,执行拒绝策略。否则通过addWorker创建非核心线程执行任务。
4.如果达到线程池最大容量即非核心线程也无法创建,执行拒绝策略。
除了第二种情况,和第四种情况,创建核心线程和非核心线程都需要调用addWorker方法。我们点开这个方法。
这个方法主要分为两个部分,
我们先看第一部分:
这里主要是对线程池状态进行判断,在两个死循环中。
先看外面的循环:外层循环首先会通过ctl获取当前线程的数量,程池被shutdownNow,那么return false,即添加失败,这里还有一个判断条件firstTask==null&&!workQueue.isEmpty()用于判断线程池被shotdown但不是shutdownNow的情况,但是怎么会有提交的任务为null呢?这里就涉及到了线程池后面的线程回收机制。我们后面再提。
再到内层循环:这时候传入的参数boolean类型的core就发挥作用了,根据添加的是否为核心线程,这里的判断条件也不一样,若为核心线程,那么会根据创建线程池设置的核心线程数量比较,若大于设置的核心线程数量,则创建失败。如果添加的不是核心线程,那么就会和创建线程池设置的最大线程数量比较,如果当前线程数已经不小于最大线程数量,那么即使你是非核心线程,也无法成功被添加了。
接着通过cas操作尝试增加线程池的当前线程数,如果成功添加,则通过goto语句跳出当前的两个死循环,执行之后的语句,如果cas操作失败,先会判断当前线程数量有没有被修改,如果已经被修改,那么通过continue跳出二重循环,并重新从最外层的循环开始执行,重新通过ctl读取线程池的最新状态。如果没有被修改,那么正常执行,即继续内层循环,继续尝试通过cas添加当前线程数。
再看第二部分:
如果第一部分我们执行成功,那么已经修改ctl,对线程池的当前线程数进行了更新,第二部分就是对worker具体添加过程的实现。
一上来就通过new Worker(firstTask)的方式创建了Worker,接着加锁,加锁是为了保证对workers操作的安全性,接着更新如largestPoolSize线程池最大线程数量、workerAdded是否添加成功这样的一些参数状态。
通过if(workerAdded)判断线程是否添加成功
添加成功后直接执行worker的内置线程,即开始运行firstTask。
这里提一下,finally下面有一个:
若添加失败,则执行addWorkerFailed
这个方法就是在addWorker添加失败后对之前的操作执行撤回的一个方法。
首先加锁对workers进行操作,remove掉之前添加的worker,接着调用decrementWorkerCount()
这个方法底层是一个CAS操作,对ctl里的线程数量进行-1操作,可以理解为撤销我们addWorker这个方法第一部分的操作。
接着调用tryTerminate()方法,这个方法会判断当前线程池的状态,并可能对线程池的状态进行修改。因为addWorker失败可能是遇到shutDownNow等修改线程池状态的操作,这里会判断当前线程池的状态是否是遇到了那样的情况,是否需要进行对应操作。
由上面的描述我们可以知道,addWorker添加完worker后会直接启动worker,那么worker的run方法又是什么样的呢。worker是怎么执行任务的呢?
上代码:
run方法直接调用了runWorker方法,我们点开runWorker:
这里代码比较长我分两次截图。
核心部分了:
while (task != null || (task = getTask()) != null)
首先会通过run方法执行firstTask,执行完毕后会将task置为null,那么task!=null的判断条件肯定不通过,它就会尝试通过getTask(),从任务队列中获取任务。
但这个getTask()可不是想拿就拿,它的功能很简单,从任务队列中取任务,但是判断条件却非常繁琐。各种特殊情况,它都会返回null。
这里的判断比较绕,建议反复理解
(如果对细节不感兴趣可以直接跳过getTask该部分)。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
首先:
会判断线程池状态,如果线程池状态为shutdown等特殊状态,直接返回null。如果线程池处于正常状态,通过ctl拿到当前线程数量。
接着:
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
他通过||来进行判断,线程池是否允许核心线程被回收,线程池当前线程数量是否大于核心线程最大数量。只有当线程池不允许核心线程被回收且当前线程数量小于核心线程最大数时,Timed才会为false。
然后:
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount©)
return null;
continue;
}
我们来分析这段判断条件,首先判断当前线程是否大于线程池最大线程数,是的话不用看后面的直接返回null。
如果不大于,那么接着判断timed(上面已经讲过)和timedOut,timeOut在进入getTask默认为false,如果尝试取任务时队列里已经没有任务了,那么timeOut会改为true,第二次循环判断同样的条件可能出现不同的结果。
(wc > 1 || workQueue.isEmpty())
最后判断线程数是否>1或任务队列已经为空
(这个条件用于判断当前线程池是否还有线程或者任务队列已无任务)
若当前线程池没有线程并且队列中仍有任务,那么即使可能存在超时情况,也任然会继续跳过该判断执行后面的语句。
(从判断条件来看,若当前线程数已经大于核心线程数,那么就一定会返回null,可以理解为非核心线程执行完自己的firsttask后,是无法通过gettask拿到队列中的任务的。但是若设置了允许核心线程超时回收那么即使当前线程数没有达到核心线程数,gettask依旧会返回null,保底只维持一个线程执行任务)
走到这里终于通过了以上两个可能返回null的判断
第一个判断当前线程池状态。
第二个判断当前线程池是否满足创建线程池的参数设置。
走到这里如果都没问题,那么则拿任务,如果任务不为null,返回任务,如果任务为null,那么设置timeOut为true(这时候其实就是线程拿不到线程进入超时状态了),因为是在死循环中,那么会在getTask方法中再次尝试取任务,但是这次timeOut已经为true,如果timed也为true,那么就会出现timed&&timeOut的情况,根据上面第二个返回为null的判断我们可以推测出,该线程getTask会返回null。
(这里的判断条件非常繁琐,各种判断的组合太过复杂,我没办法描述出各种具体情况,有点只可意会不可言传的味道,想要理解的老哥需要自己结合代码理解情况。)
回到runWorker,我们知道在这里线程通过循环尝试从任务队列取任务,但是当getTask取任务失败之后呢?
会执行finllay里的processWorkerExit()方法,该方法可以说是线程池中worker的回收机制实现。
上代码:
我们看到它首先加锁,不管三七二十一先把当前worker从workers中remove,接着又是一顿判断
先判断线程池状态。
接着给min赋值,根据是否允许核心线程超时不被回收,min会被赋0或corePoolSize(允许的核心线程最大数量)。
如果任务队列不为空,则设置min=1
(这里我的理解是,当线程进入processWorkerExit,那么最可能的情况就是任务队列已经空了跳出了runWorker的循环,才会进入到processWorkerExit,那么在processWrokerExit中判断的workQueue.isEmpty()添加进来的任务最大的可能是刚刚添加的新任务,此时queue大概率没有满且只有一点点刚添加进来的新任务,那么线程池就会留一个线程去执行这个任务)
最关键的地方来了:
若allowCoreThreadTimeOut==true,且当前线程数不大于corePoolSize,就会执行 addWorker(null, false);
怎么样,是不是很熟悉,没有错,线程池线程回收机制就是通过processWorkerExit这样维持线程池核心线程数量 还记得addWorker的第一部分两个循环的外层循环吗?!(rs==shutdown&&first==nul&&!workqueue.isEmpty),当时判断的条件我们现在可以看出,即使线程池已经被shutdown,只要当前队列不为空,那么通过ProcessWrokerExit创建用于维持核心线程数的addWorker就不会创建失败。
但是还有一个让人疑惑的地方,既然已经知道了当线程不允许核心线程超时被回收(超时即任务队列里没任务了),且当前线程数小于创建线程时所规定的核心线程数时,就会以addWorker(null, false)的方式创建新的核心线程,但是到这里肯定有人会疑惑,如果没有达到核心线程的大小且不允许核心线程超时被回收,那么这里创建的应该是核心线程才对啊,应该为addWorker(null,true)。
其实这里true和false的差别不大,根据上面的代码分析我们可以看到,即使是核心线程,且线程池设置不允许回收超时的核心线程,执行完任务后processWorkerExit还是不管三七二十一先把worker给回收了,接着再判断是否需要添加新的worker以维持核心线程数量。
那么addWorker(null,true)和addWorker(null,false)的区别在哪呢?我们回到addWorker第一部分两个循环判断里的内循环,找到这么一段。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
在这里根据core为true或false对线程池的线程数又进行了一次判断,但这里源码使用的时addWorker(null,false),所以哪怕超过了核心线程数,只要不大于线程池最大线程容量,也依旧会执行addWorker的后续代码。
所以我们可以得出区别:
在这么一种情况下:若两个线程同时执行processWorkerExit,若源码为addWorker(null,true),则当第一个线程addWorker成功后,第二个线程会因为第一个线程addWorker成功而导致在addWorker的时候无法通过内层循环的判断,因为为core为true,所以判断当前线程数已经等于coolPoolSize,所以添加失败,若源码为addWorker(null,false),则会因为当前线程数虽然大于coolPoolSize却小于maximumPoolSize而添加成功。
(Doug lea的设计具体为什么要这样我只能理解皮毛,但目前这么设计只能理解为任务队列里存在任务时,比coolPoolSize数多一点线程去执行任务也不是什么坏事。如果任务队列已经为空,那么下一次执行到processWorkerExit,多出来的线程一样会因为线程数大于coolPoolSize而被回收,无伤大雅)
线程池中线程在回收线程的时候首先通过getTask方法建议回收,如果允许核心线程超时被回收那么每个在任务队列里已经没有任务,即线程进入超时状态后,核心线程执行完任务,getTask只会保留一个线程工作,其他worker都会被返回null,在runWorker方法中跳出while循环,执行后面的procrsworkerexit回收机制,processworkerexit则会具体执行回收机制,再次通过是否允许回收超时核心线程这一参数来决定通过addworker维持的线程池线程数量,如果允许被回收,那么仅维持一个,如果不允许被回收,那么维持corepoolsize个。
可以看到在核心线程不允许被超时回收且当前线程数量没有大于corePoolSize时,线程池队列中没有任务(即进入超时状态)线程池的核心线程会卡在getTask中,不停的尝试获取任务,但如果设置了允许核心线程被超时回收,那么仅仅会维持一个线程卡在循环中,其余的都会通过processWorkekExit进行回收。
发现当corePoolSize设置的比较大时,其实哪怕即使不使用线程池,线程池中的线程也不是被挂起,而是卡在死循环中不断尝试获取任务,这样是十分损耗性能的!
所以初始化线程池设置参数,其实也非常讲究啊!