线程池是比较重要的一个知识点,情况比较复杂,包含的内容比较多,围绕线程池有很多可挖掘和注意的点,本次彻底啃一遍线程池源码,做到基本情况心中有数。
1.ThreadPoolExecutor原理简介
1.1.类图
使用IDEA生成线程池的UML类图,可以对线程池的结构有个基本的了解,左上角导航栏里的蓝色I图标代表Inner Class,可以显示或者去掉内部类
这个图可以看到顶层就是一个接口Executor
,这个接口只有一个方法用来执行任务java.util.concurrent.Executor#execute
。
顶层接口Executor
提供了一种思想,将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行部分。
1.2.ThreadPoolExecutor总体设计
根据上面的类图来看,继承链路有2个接口,一个抽象类,一个具体的实现类,ThreadPoolExecutor实现最复杂的部份,上面的接口和抽象类定义行为以及公共内容。
下面来介绍一下ThreadPoolExecutor具体要做哪些事,以及内部的设计。
使用池化线程执行每一个提交的任务的ExecutorService
,通常使用Executors
的工厂方法进行配置
线程池解决了2个不同的问题:
- 在执行数量很大的异步任务的时候,由于每个任务的调用开销降低,会提升任务整体执行性能,并且提供了一种方法来约束和管理执行任务集合时消耗的资源(包括线程)
- 每个ThreadPoolExecutor还维护一些基本统计信息,例如已完成任务的数量
为了在广泛的上下文应用有效,ThreadPoolExecutor提供了许多可调整的参数和可扩展的hook。
Excutors提供了一些工厂方法,内置了一些线程池实现,可以很方便的使用线程池,不过内置的这几种实现,会有一些问题,一般还是建议自定义参数构造线程池
- Executors#newCachedThreadPool
- Executors#newFixedThreadPool
- Executors#newSingleThreadExecutor
1.3.自定义线程池线程参数
1.3.1.Core and maximum pool sizes
自定义线程池的话,有如下2个线程数量参数:
- corePoolSize
- maximumPoolSize
线程创建逻辑如下:
当提交一个新任务时,运行的线程数量低于corePoolSize,会新建一个线程去处理请求,即使其他工作线程处于idle(空闲)状态。
如果正在运行的线程数大于corePoolSize,小于maximumPoolSize,只有在队列满的情况下才会创建新的线程。
如果corePoolSize和maximumPoolSize设置为一样,则创建的是一个大小固定的线程池。
如果maximumPoolSize设置为一个无上限的值,例如Integer.MAX_VALUE,则线程池接受的任务也无上限。
corePoolSize和maximumPoolSize一般是在构造函数中传入的,也可以使用setCorePoolSize()和setMaximumPoolSize()进行设置。
一般只有任务到来的时候才会创建线程,不过可以使用prestartCoreThread()、prestartAllCoreThreads()提前创建。如果使用非空队列,可能会想提前创建线程。
1.3.2.ThreadFactory
一般会使用ThreadFactory创建线程,如果没有特别指定,会使用Executors#defaultThreadFactory创建线程,创建的线程属于同一个ThreadGroup,优先级也相同,NORM_PRIORITY,并且没有守护线程。通过自定义ThreadFactory,可以设置线程名称、线程组、优先级、守护线程状态等。如果由于newThread方法返回null,ThreadFactory创建线程失败,线程池可能会继续运行,不过可能会不执行任何任务。
1.3.3.Keep-alive times
如果线程池线程数量超过corePoolSize,多余的线程如果处于idle(空闲)状态,并且存活时间超过keepAliveTime(see {@link #getKeepAliveTime(TimeUnit)}),则多余的线程会被终止。
当线程池任务不繁忙时,提供了一种方法减少资源开销。
如果后面线程池变得繁忙了,会创建新的线程,这个参数也可以使用setKeepAliveTime(long, TimeUnit)方法进行动态更改。
设置keepAliveTime值为Long.MAX_VALUE、TimeUnit#NANOSECONDS,可以有效的使idle(空闲)线程永远不会在线程池关闭之前终止。
默认情况下,keep-alive策略仅仅会在线程数量超过corePoolSize时生效,但是在keepAliveTime值不为0的情况下,可以使用allowCoreThreadTimeOut(boolean)方法将keep-alive策略应用到核心线程上。
1.3.4.Queuing
任何类型的BlockingQueue均可用于传输和保留提交的任务,队列的使用随着线程池的线程数量变化。
如果运行线程数小于corePoolSize,会优先创建新线程,而不是存到队列。
如果运行线程数大于corePoolSize,会优先将请求存队列,而不是继续新增线程。
如果队列满了,则在线程数没超过maximumPoolSize的时候新建一个线程,如果线程超过maximumPoolSize,会触发拒绝策略。
排队策略有3种:
- 直接传递
工作队列的一个比较好的默认选择是SynchronousQueue,SynchronousQueue会将任务直接传递给线程,而不是持有任务。如果尝试入队一个任务,而没有线程可以立刻执行该任务,入队操作会失败,因此会创建一个线程。此策略在处理具有内部依赖性的请求集时,避免了锁定。
直接传递要求maximumPoolSize不设限,以避免拒绝新提交的请求。这可能会导致由于任务一直来,处理速度不够,线程无限增长。
2.无界队列
使用无界队列,例如LinkedBlockingQueue,没有指定队列容量,当核心线程全部繁忙的时候, 会导致任务在队列中等待。因此,最多只能创建corePoolSize个线程,并且maximumPoolSize的值会失效。
3.有界队列
使用有界队列,例如ArrayBlockingQueue,与有限的maximumPoolSize一起使用时,可以防止资源耗尽,但可能会导致线程池调整和控制更加困难。队列大小和maximumPoolSize的值是一个相互权衡的过程,如果队列长度大,而线程数量少,则可以降低CPU使用,OS资源和上下文切换开销,但是导致吞吐量低。如果任务频繁阻塞(I/O绑定),系统可能调度的线程会比允许的多,队列数量小通常线程数量会大,这会使CPU更加繁忙,也有可能遇到无法接受的调度开销这也会降低吞吐量。
1.3.5.Rejected tasks
当Executor已经shutdown,以及Executor的最大线程数和工资队列的大小都有界时,并且Executor已经饱和,新提交的任务会被拒绝。
在遇到以上情况时,会触发拒绝策略,RejectedExecutionHandler#rejectedExecution。
有以下4种预定义handler策略:
1.默认拒绝策略是ThreadPoolExecutor.AbortPolicy,这种策略会在被拒绝时抛出一个运行时异常RejectedExecutionException。
2.ThreadPoolExecutor.CallerRunsPolicy会将任务交还给调用execute()的线程执行。提供了一种降低新任务提交速度的简单反馈机制。
3.ThreadPoolExecutor.DiscardPolicy,不能被执行的任务就简单的丢弃了。
4.ThreadPoolExecutor.DiscardOldestPolicy,如果executor没有shutdown,在工作队列队头的任务会被丢弃,然后会重试执行这个任务(重试也有可能失败,导致重复执行)。
还可以自定义拒绝策略,不过在为特定容量或排队策略设计拒绝策略时需要特别小心。
1.3.6.Hook methods
这个类提供了可重写的方法,beforeExecute(),afterExecute(),可以在每个任务执行前后进行调用。
可以用来操作执行环境,重新初始化ThreadLocals,收集统计数据,或者添加日志条目。另外,terminated()方法可以重写用来在Executor完全终止的时候一些需要立刻执行的特殊处理。
如果hook或者callback方法抛出异常,内部工作线程可能继而失败并突然终止。
1.3.7.Queue maintenance
getQueue()方法允许访问工作队列,用于监视和调试。强烈建议不要将此方法用于其他目的。当大量任务被取消时,remove()和purge()这2个方法可以用来协助回收存储空间。
1.3.8.Finalization
一个线程池在一个程序中不再被引用,而且没有剩余线程,将会被自动shutdown。
如果你想保证未引用的线程池会被回收,即使忘记调用shutdown(),那么必须安排未使用的线程最终会死亡。必须设置合理的过期时间,使用零核心线程的下界,或者设置allowCoreThreadTimeOut(),以使未使用的线程最终死亡。
如上线程池参数的含义和概念清楚了,也对线程池如何执行任务有了一个相对清晰的认识,下面给出一个流程图帮助理解。
1.3.ThreadPoolExecutor成员变量
成员变量ctl,是线程池状态控制参数,类型是atomic integer包装类型,ctl包含2个字段的值
- workerCount 线程池中有效线程个数
- runState 线程池运行状态
为了在一个int变量上存放2个状态,限制workerCount为(229)-1(大概5亿)线程而不是(231)-1(20亿)。
如果这在未来会是一个问题,可以把变量的类型改为AtomicLong,并且调整shift/mask常量。不过直到这个需求出现,使用int都是更快且更方便的选择。
workerCount是被允许执行并且没有结束的worker的数量,这个值可能和真实的存活线程数量有短暂的不一致, 例如ThreadFactory创建线程失败了,并且退出线程在terminateing之前仍在执行任务。用户可见的线程池数量展示的是worker set的当前大小。
runState 提供主要的生命周期控制,有如下几种状态。
RUNNING: 接收新任务,处理队列中的任务
SHUTDOWN: 不接收新任务,会处理队列中的任务
STOP: 不接收新任务,不处理队列中的任务,并且中断处理中的任务
TIDYING: 所有任务已经终止,workerCount为0,线程状态过渡到TIDYING,将会执行terminated()这个hook方法
TERMINATED: terminated()已经执行完成
这些值之间的顺序很重要,以允许有序比较。runState 随时间单调递增,但是不是每个状态都必须达到的,状态变化有以下几种情况:
- RUNNING -> SHUTDOWN
调用shutdown()时,也许也发生在隐式调用finalize()时 - (RUNNING or SHUTDOWN) -> STOP
调用shutdownNow()时 - SHUTDOWN -> TIDYING
当queue和pool都为空时 - STOP -> TIDYING
当pool为空时 - TIDYING -> TERMINATED
当terminated()这个hook方法执行完成后
当线程状态变为TERMINATED,等待在awaitTermination()的线程将会返回。
检测线程状态从 SHUTDOWN 到 TIDYING的转变比想象中的要简单,因为在SHUTDOWN 状态期间,队列会从非空变为空,反之亦然,但是我们仅能在队列为空,且workerCount为0(有时需要重新检查)时终止。
线程池状态流转如下所示
看一下代码里相关的常量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
//111000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
//000000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//001000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
//01000000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
//01100000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
使用Java提供的API算一下二进制的值,ctl高3位表示线程池状态,低29位表示工作线程数量:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
public static void main(String[] args) {
AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
System.out.println("RUNNING: "+Integer.toBinaryString(RUNNING));
System.out.println("SHUTDOWN: "+Integer.toBinaryString(SHUTDOWN));
System.out.println("STOP: "+Integer.toBinaryString(STOP));
System.out.println("TIDYING: "+Integer.toBinaryString(TIDYING));
System.out.println("TERMINATED: "+Integer.toBinaryString(TERMINATED));
System.out.println("ctl: "+Integer.toBinaryString(new AtomicInteger(ctlOf(RUNNING, 0)).intValue()));
System.out.println("runState: "+Integer.toBinaryString(runStateOf(ctl.intValue())));
System.out.println("workerCountOf: "+Integer.toBinaryString(workerCountOf(ctl.intValue())));
}
//结果,ps:长度有的不足32位,左边补0补足32位
RUNNING: 11100000000000000000000000000000
SHUTDOWN: 0
STOP: 100000000000000000000000000000
TIDYING: 1000000000000000000000000000000
TERMINATED: 1100000000000000000000000000000
ctl: 11100000000000000000000000000000
runState: 11100000000000000000000000000000
workerCountOf: 0
2.核心方法源码
2.1.execute
线程池核心方法还是execute方法,基于上面的基础,来看execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 处理过程有3步:
*
* 运行线程数量小于corePoolSize,尝试开启一个新线程,提交的任务作为线程的第一个任务。
* 调用addWorker会自动检查runState和workerCount,并因此通过返回false来防止在不应该添加线程的情况下产生虚假警报
* 如果一个任务可以被成功的放入队列,我们仍需要进行二次检查是否应该添加线程(因为已存在的线程在上次检查之后死掉了)或者线程池自从entry进入到这个方法线程池已经关闭。因此我们重新检查状态并且在必要时回滚排队,如果已停止的话,或者如果没有线程的话开启一个新线程
* 如果任务入队失败,那么会尝试新增一个线程。如果失败了,则知道shutdown了或者饱和了,然后拒绝了任务
*/
int c = ctl.get();
//工作线程数量小于核心线程数量,新建线程
if (workerCountOf(c) < corePoolSize) {
//新增worker,用来管理线程,并将任务交个worker处理(worker的fisrtTask)
if (addWorker(command, true))
return;
c = ctl.get();
}
//线程池处于运行状态,且任务入队成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//需要再次检查线程池状态,入队之后线程池有可能调用shutdown,线程池状态变化并且移除任务,则执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池在 RUNNING 状态下,线程数为 0,则新建线程加速处理 workQueue 中的任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//执行到这,说明线程数大于corePoolSize,且入队失败,创建线程的最大数量为maximumPoolSize,addWorker失败说明线程数量超过了maximumPoolSize,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
2.2.Worker
上面execute方法还是比较简短,创建线程和队列等待的基本操作都封装了,继续往下看细节。
上面创建线程主要是由addWorker做的,我们先来看一下Worker
- Worker主要维护线程运行任务的中断控制状态,以及其他一些次要记录
- Worker类适当扩展了AQS,以简化每一个任务执行时锁的获取和释放
- 这样可以防止旨在唤醒工作线程等待任务的中断,而不是中断正在运行的任务
- Worker继承ASQ实现了一个简单的不可重入互斥锁,而不是使用ReentrantLock,因为不想在执行线程池控制方法,例如setCorePoolSize时,worker tasks可以再次获取锁
- 另外,线程真正开始执行任务前禁止中断,将锁的状态初始化为负值,并且一启动就清除状态(在runWorker方法)
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/**Worker实际运行的线程. 如果工厂方法创建线程失败则为null. */
final Thread thread;
/** 初始化要运行的任务. 可能为null. 核心线程创建不为null,会执行这个任务,非核心线程创建,任务为null,从队列里取任务执行*/
Runnable firstTask;
/** 每个线程的任务计数器 */
volatile long completedTasks;
/**
* 使用ThreadFactory创建的线程和提交的任务构建Worker.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 初始化为 -1,这样在线程运行前(调用runWorker)禁止中断,在 interruptIfStarted() 方法中会判断 getState()>=0
setState(-1);
this.firstTask = firstTask;
//使用ThreadFactory创建线程,将worker对象本身传进去
this.thread = getThreadFactory().newThread(this);
}
/** 将主运行循环委托给外部的runWorker */
public void run() {
//thread启动后会调用该方法
runWorker(this);
}
//锁定方法,0代表未锁定,1代表锁定
protected boolean isHeldExclusively() {
return getState() != 0;
}
//尝试获取锁
protected boolean tryAcquire(int unused) {
//独占锁,更新成功之后别的线程无法拿到锁
//state初始状态设为-1,永远不可能拿到锁,worker中断的条件是先获得锁,所以也不会被中断
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//尝试释放锁
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// 中断线程,这个方法会被 shutdowNow 调用,从中可以看出 shutdownNow 要中断线程不需要获取锁,也就是说如果线程正在运行,照样会给你中断掉,所以一般来说我们不用 shutdowNow 来中断线程,太粗暴了,中断时线程很可能在执行任务,影响任务执行
void interruptIfStarted() {
Thread t;
//状态需大于0,没启动的线程状态为-1,无法中断,线程不为null,线程还未中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
2.3.addWorker
上面简单介绍了一下Worker类,现在继续看一下addWorker的过程。
Worker执行流程如下
addWorker会先检查是否可以根据线程池的状态和给定的边界(core or maximum)。worker数量会相应调整,新的worker会被创建和启动,将firstTask作为worker的第一个任务执行。
如果线程池停止了或者可以shut down,addWorker方法会返回false。ThreadFactory创建线程失败addWorker也会返回false。如果线程创建失败,有可能是因为thread factory返回null,或者产生了异常,会进行回滚。
2个参数,firstTask针对核心线程和非核心线程会不同,核心线程情况下会传入firstTask执行,非核心线程firstTask为null.在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize。
执行流程如下:
下面看一下addWorker的代码
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 仅在必要时检查队列是否为空.
//线程池状态大于等于SHUTDOWN,即SHUTDOWN,STOP,TIDYING,TERMINATED 这四个状态,这种情况下只有一种情况可能会新建线程:
//即线程状态为 SHUTDOWN, 且队列非空时,firstTask == null 代表创建一个不接收新任务的线程(此线程会从 workQueue 中获取任务再执行),这种情况下创建线程是为了加速处理完 workQueue 中的任务
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//线程数量
int wc = workerCountOf(c);
//如果线程数量超过了线程池最大容量(大概5亿,基本不可能)
//或者线程数量超过了corePoolSize (core为true),或者超过了maximumPoolSize(core为false)
//则返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//使用cas增加线程,成功则跳出双循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//线程状态发生变化,跳到外部循环继续
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
//由于workerCount改变,CAS失败,内部循环重试
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//经过前置校验,下面就可以创建worker了
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//加锁,workers是hashSet结构,将w添加进workers是非线程安全的
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 在持有锁的时候再次check
// ThreadFactory failure退出 或者如果
// 在获取锁之前shut down了
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 前置校验,线程是否处于启动状态
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
//记录最大的线程池大小方便线程池监控
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//w成功添加进workers,启动线程,worker启动成功
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
创建线程时把worker自己传给了线程,启动线程后会调用Worker的run方法
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
2.4.runWorker
我们下面再看一下runWorker方法,先看一下任务是如何执行的:
1.while循环不断地通过getTask()方法获取任务。
2.getTask()方法从阻塞队列中取任务。
3.如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
4.执行任务。
5.如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。
这个方法的注释写的最多的应该是如何处理异常和中断的情况了,设计方法的时候对一些异常情况确实要考虑到位,避免异常情况考虑不到位造成损失
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//从worker中取出任务执行
Runnable task = w.firstTask;
//取出后w.firstTask置为null,
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//创建worker提交了线程,则从worker中取task执行
//否则从队列中取任务执行(getTask方法)
while (task != null || (task = getTask()) != null) {
w.lock();
//如果线程池正在停止(runState>=STOP),保证线程是中断的
//如果没有停止,则保证线程不被中断。这种情况下要求在第二种情况下进行二次检查,防止在清除中断标志位时调用了shutdownNow()
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行任务前,子类可实现此钩子方法作为统计之用
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行任务后,子类可实现此钩子方法作为统计之用
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//执行到这一是抛出异常,而是队列里没有任务了
//表明线程没有核心和非核心之分,正常退出或发生异常或没有任务都会走这个方法,并会根据情况将线程数-1
processWorkerExit(w, completedAbruptly);
}
}
上面那个关于中断清理的再看一下,逻辑再清晰一些
if ( (runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) {
wt.interrupt();
}
外层共2个逻辑判断,wt.isInterrupted()是判断线程是否中断的,不会影响线程的中断状态。我们重点看一下第一个判断逻辑
(runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
是由下面2个或语句组成的,
runStateAtLeast(ctl.get(), STOP)
判断线程状态是否>= STOP(即 STOP,TIDYING,TERMINATED ),如果大于,这种情况线程则应该中断
Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)
走到这,则线程状态< stop,线程不应该中断。
如果中断了(Thread.interrupted() 返回 true,并清除标志位),再次判断线程池状态(防止在清除标志位时执行了 shutdownNow() 这样的方法),如果此时线程池为 STOP,执行线程中断。
Thread.interrupted()
会影响线程状态,这个方法如果连续调用2次,第二次会返回false(除非当前线程在第一次调用清除其中断的状态之后且在第二次调用检查其状态之前再次被中断)。
2.5.Worker回收
线程池中线程的销毁依赖JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。
Worker被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。
当Worker无法获取到任务,也就是获取的任务为空时,循环会结束,Worker会主动消除自身在线程池内的引用。
try {
while (task != null || (task = getTask()) != null) {
//执行任务
}
} finally {
processWorkerExit(w, completedAbruptly);//获取不到任务时,主动回收自己
}
线程回收的工作是在processWorkerExit方法完成的。
图10 线程销毁流程
事实上,在这个方法中,将线程引用移出线程池就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程。
来看一下processWorkerExit()
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//因用户任务异常退出,使用CAS尝试降低ctl的workerCount字段
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
//加锁,安全移除worker
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
//worker可能异常退出,那么线程池可能状态变了,尝试终止
tryTerminate();
int c = ctl.get();
//如果线程池状态小于STOP(RUNNING,SHUTDOWN)
if (runStateLessThan(c, STOP)) {
//如果不是异常退出
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果允许核心线程过期,且队列不为空
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果workerCount > min
if (workerCountOf(c) >= min)
return; // 不需要替换worker
}
//异常退出的,新增一个worker或者workerQueue非空的情况下,确保至少有1个线程处理workerQueue里的任务
addWorker(null, false);
}
}
2.6.getTask
下面还有一个从队列里取任务的过程,看一下代码
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 必要情况下检查队列是否为空,符合条件减少worker数量,返回null
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 如果 allowCoreThreadTimeOut 为 true,代表任何线程在 keepAliveTime 时间内处于 idle 状态都会被回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//阻塞获取task,核心线程会阻塞在take,不会超时,会一直阻塞,非核心线程取任务会超时
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
3.小结
线程池算是比较重要的一个知识点,不论是日常写代码还是出去撸面试都是比较重要的,里面的设计思想以及一些关于边界值的考虑,自己写代码的时候还是要多多注意的。
JDK的源码注释可以帮助理解代码,不过翻译过来可能不太通顺,可能会有点难以理解,可以配合在实际生产中的使用帮助理解。
一些不错的参考文章
Java线程池实现原理及其在美团业务中的实践
2万字长文带你深入理解线程池
有的线程它死了,于是它变成一道面试题