续上篇所说java线程池的算法,我们知道java线程池内部维护了一个状态机以及工作线程与任务的动态调控,前者可以理解是线程池的控制信号,掌握线程池的生杀大权;后者可以理解成线程池人力与任务的权衡。
今天就从几个源码方法来看这一个状态机,一个工作线程组和一个任务组怎样工作的。这几个方法位于java.util.concurrent.ThreadPoolExecutor类中。
首先看状态机的实现,线程池一共有五种状态,分别为RUNNING(-1),SHUTDOWN(0),STOP(1),TIDYING(2)和TERMINATED(3)。线程池用一个AtomicInteger类型值ctl的高3位来存储当前线程池的状态,低28位来存储正在工作的线程数,这样存储的目的一是节省存储空间,二是减少访问内存的次数(一次性读取两个值,减少一次读内存的操作),而且只需要维护一个变量的原子操作,提高并发效率。
线程池初始化为RUNNING状态,在这个状态下,工作线程和任务有机调度;当shutdown()方法被执行时线程池状态改为SHUTDOWN,此时线程池会停掉所有空闲线程(core线程也会停止,此时线程还会执行收尾的逻辑,不是不会执行程序),并尝试判断是否符合最终停止的条件(工作线程全停止,任务队列为空),此后,线程池不接受新添加的任务,但会完成线程池已有的任务(包括正在执行的任务和任务队列里暂存的任务),这是一种比较负责的的停止线程的方式。怎样实现这种方式呢?首先得看execute(Runable)方法中添加任务的分析:
int c= ctl.get();
if (workerCountOf(c) < corePoolSize) { // 此时workQueue为empty
if (addWorker(command, true)) // 如果是SHUTDOWN状态这里会不成功
return;
c= ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
// 这里double-check 是为了避免刚添加进workQueue后,线程池就变SHUTDOWN,导致工作线程全停止,但workQueue还有任务
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))// 如果是SHUTDOWN,返回false直接拒绝
reject(command);
execute(Runable)方法配合AddWorker(...)直接从源头阻断了添加新任务的可能性,addWorker(...)代码片段如下:
// 当处于SHUTDOWN状态下,添加新任务firstTask==null就会直接返回false,如果是worker由于异常退出且workQueue里还有任务,此时是允许添加新工作线程的.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
接下来是当workQueue为空,工作线程执行完,它是怎么退出的?顺序执行的代码片段如下:
首先执行runWorker(...) 的代码片段:
try {
while (task!= null || (task= getTask()) != null) {...}
completedAbruptly= false;
} finally {
processWorkerExit(w, completedAbruptly);
}
接着执行getTask(...)代码片段:
// 满足rs >= SHUTDOWN 和workQueue.isEmpty(),直接返回null.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
接着执行processWorkerExit(...)方法:
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount+= w.completedTasks;
workers.remove(w);// 从工作线程队列退出,然后执行一些判断后线程自然退出
} finally {
mainLock.unlock();
}
// 满足终止条件后:tryTerminate()里
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
这里就有一个问题,为什么decrementWorkerCount()在getTask()调用,而不是在processWorkerExit(...)里调用?减少线程数在真正 workers.remove()前调用的好处是可以加快线程池的收尾工作而不用等所有的线程执行完processWorkerExit(...)才退出,只需要所有线程退出runWorker(...)的while循环就达到了线程池的终止条件了,提高并发的性能。
当shutDownNow() 被执行时,线程池状态变成STOP,此后,线程池会中断所有的工作线程,空闲线程会被直接中断掉,因为他们在getTask()里阻塞(原理是使用了Object.wait())在获取任务上,此时会响应线程的中断信号,从而最后退出线程(跟SHUTDWON状态一样方式退出);而那些active工作线程如果任务里没有对线程的中断信号做出响应的话,还是会继续执行任务直到任务结束。而那些workQueue的任务会被转存为一个list返回给调用者。
shutDown() 和shutDownNow()只是放出中断信号后就返回了,不会等线程池真正终止。而awaitTermination(...)会阻塞等待线程最终终止。
TIDYING和TERMINATED状态都在线程池满足终止条件时转化,比较简单。
上面已经描述线程池整个状态机的运转流程,主要阐述了SHUTDOWN和STOP的退出流程,而当线程池处于RUNNING状态时,这里发生是工作线程和任务的动态调节,主要受corePoolSize,maximumPoolSize,workQueue的具体实现,keepAliveTime,allowCoreThreadTimeOut和RejectedExecutionHandler调节。接下来介绍各自的作用:
corePoolSize:核心线程数,可以理解为线程池最少常驻线程,当workQueue为空时,核心线程会阻塞在getTask()方法里。
maximumPoolSize:最大线程数,这个值有个上限(1<<Integer.SIZE-3),当核心线程数和workQueue都达上限时,需要增加工作线程来执行任务,增加的线程数不得大于(maximumPoolSize-corePoolSize).
workQueue的具体实现:阻塞队列可以有不同的实现方式,有基于数组实现和基于链表实现的,具体细节后续再议。
keepAliveTime和allowCoreThreadTimeOut:这两个是针对空闲线程的处理,分别用于getTask()阻塞获取任务的时间和核心线程是否一直阻塞。
RejectedExecutionHandler:拒绝策略,当线程池线程池线程数达到maximumPoolSize时或线程处于terminating时,需要使用拒绝策略。默认是AbortPolicy(抛RejectedExecutionException),另外还有CallerRunsPolicy(调用线程里执行),DiscardPolicy(直接丢弃)和DiscardOldestPolicy(选队列里最老的任务丢弃,即队列头)。
下一篇我们讲线程池并发锁机制。