背景
我们自己创建的线程其只能start()执行一次,一旦执行完毕或被中断,即走terminated终止状态结束线程了,你难道没有这样的疑问为何线程池中的线程却可以一直执行?核心及非核心线程是如何实现的呢?
线程池回顾
- 如何创建一个线程池
//实际调用对象
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:核心线程池
- maxinumPoolSize:表示最大允许被创建的线程数
- keepAliveTime,unit: 非核心线程数的存活时间
- workQueue: 用来暂时保存任务的工作队列
- threadFactory:用来创建线程,可定义名称及优先级
- handler: 任务拒绝策略
- 当调用shutdown 等方法关闭线程池后,这时候即使线程池内部还有没执行完的任务正在执行,但是由于线程池已经关闭,我们再继续想线程池提交任务就会遭到拒绝
- 当达到最大线程数,线程池已经没有能力继续处理新提交的任务时,这是也就拒绝。
-
参考此分享文章:深入理解线程池线程池任务执行流程如下:
-
对应线程池运行流程图如下:
任务调度
- 当用户提交一个任务会通过Executor.execute()方法执行,他的步骤上方已经总结过了,我们直接看新增线程执行任务的addWorker(Runnable firstTask, boolean core)方法,参数Runnable:运行任务,core是否为核心线程
private final HashSet<Worker> workers = new HashSet<>(); //线程池中的所有工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
//根据当前状态,判断是否添加成功,上方执行方法中的addWorker两个参数firstTask = null ,core = true /false 具体分析
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); //获取运行状态
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && //状态 > shutDown 表示此时已经不再接受任务
//shutdown状态不接受新任务,但可以执行已经加入队列中的任务,所以当进入shutdown状态,且传进来的任务为null时,并且任务队列不为null时,是允许添加新线程的,把这个条件取反就是不允许
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) { //使用CAS操作避免加锁
int wc = workerCountOf(c); //获取工作线程
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false; //大于线程最大容量2的29次方量(所以newCacheExecutor并不能得到Integer.MAX_Value的),或者大于最大允许线程量则不能添加啦
if (compareAndIncrementWorkerCount(c)) //可添加就CAS操作线程数+1,成功说明可添加
break retry; //break跳出retry对应的循环,执行循环后面的添加worker逻辑
c = ctl.get(); // Re-read ctl 重新读取状态
if (runStateOf(c) != rs)
continue retry; //状态改变了,跳到外层循环继续重新执行循环
// else CAS failed due to workerCount change; retry inner loop
//在内存层循环中不停的尝试CAS操作增加线程数
}
}
//找了上方break retry可以正常使用CAS新增线程数
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); //通过Worker包装runnable任务,稍后我们分析
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //加锁
try {
int rs = runStateOf(ctl.get());
//如果线程池状态rs < Shutdown即只能是Running
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { //或者shutDown状态但是没有新任务
if (t.isAlive()) // 线程已经启动,并且当前没有任何异常的话,则是true,否则为false
throw new IllegalThreadStateException(); //我还没有启动呢
workers.add(w); //正常添加到线程池中workers工作线程
int s = workers.size();
if (s > largestPoolSize) //largestPoolSize:记录着线程池中出现过最大线程数量
largestPoolSize = s;
workerAdded = true; //可以正常工作的标记
}
} finally {
mainLock.unlock();
}
if (workerAdded) { //如果正常工作,则开启线程任务
t.start();
workerStarted = true; //开始工作标记
}
}
} finally {
if (! workerStarted) //该任务没有开始,则添加到失败
addWorkerFailed(w);
}
return workerStarted;
}
- 真正的执行工作交给了Worker(firstTask)类完成的
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable //实现了Runnable接口,因此t.start()执行的就是worker的run方法啊
{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); //创建thread(this:Worker) ,则t.start()调用worker的run,同时原来的Runnable被封装为Worker的属性firstTask
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
//getThreadFactory即为ThreadPoolExecutor创建thread工厂(实现ThreadFactory)可修改Thread名称,优先级等操作实现的
public ThreadFactory getThreadFactory() {
return threadFactory;
}
- 创建线程交由线程池设定的ThreadFactory
- 当线程执行thread.start()其实就是执行worker.run() 调用runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; //这个就是我们执行线程池executor.execute()方法时候的runnable
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果task不为null,并且从workQueue中获取任务不为null,则会一直执行下去
while (task != null || (task = getTask()) != null) { //task是需要执行的任务,不一定是刚刚添加的那个了,这样其实worker线程并没有完成工作,自然也就不会销毁了
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) || //检查线程状态,若线程池处于中断状态,调用interrupt将线程中断
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt(); //中断线程
try {
beforeExecute(wt, task); //可以在任务真正执行之前做点啥,空实现
Throwable thrown = null;
try {
task.run(); //执行execute()方法中的run方法,在t.start()线程内,这只是一个方法执行哈!
} catch (Throwable x) {
} finally {
afterExecute(task, thrown); //线程之后可以做啥,空实现
}
} finally {
task = null;
w.completedTasks++; //该线程执行完成任务+1
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- runWorker执行过程如下:
- while循环调取getTask()获取task,若不为null,则一直执行下去
- 检查线程是否被中断
- beforeExecute:可以在任务真正执行之前做点啥,空实现
- 执行task.run() 即执行execute()方法中的run方法,在t.start()线程内,这只是一个方法执行哦!
- afterExecute:任务执行之后可以做啥,空实现
- 重点逻辑是while循环,其是线程池中线程能够一直运行的原因,当我们第一次创建worker并执行任务后,并没有结束线程,而是通过while循环调用getTask()方法从阻塞队列中去task继续调用task.run()执行任务,注意这里run()只是一个普通的方法调用,并不是start()哦!运行线程就是Worker线程中
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 对应ShutDown虽然不添加任务,但是可以执行阻塞队列中的,Stop以后就不能子在执行任务了
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null; //返回null,停止执行任务
}
int wc = workerCountOf(c);
// allowCoreThreadTimeOut 表示是否允许核心线程超时销毁,默认false不销毁.若设置成true,核心线程也会销毁的
//只有正在工作的线程数大于核心线程数才会为true,佛足额返回false
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果timed为true(wx > 核心线程),通过poll取任务,如果为false,通过take取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //这两个参数就是创建线程池中保存时间量
workQueue.take();
if (r != null) //如果有任务就退出死循环,返回任务交给上方的worker线程运行
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
- 通过以上代码分析:根据wc记录已运行线程数与核心线程数比较
- 若wc > 核心线程数,则通过poll()从队列中取任务
- 若wc <= 核心线程数,则通过take()取任务
- 那么poll()与take()区别是什么呢?workQueue创建线程池时设置的阻塞队列,即实现BlockingQueue接口常用的ArrayBlockingQueue,LinkedBlockingQueue,PriorityQueue,SynchronizedQueue;
- 以ArrayBlockingQueue为例查看其poll和take方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) { //是否队列中的元素个数为0,说明空队列
if (nanos <= 0L) //等待时间到了,队列中还未有数据加入,则返回null,
return null;
/**
* 调用该方法的前提是,当前线程已经成功获得与该条件对象绑定的重入锁,否* * 则调用该方法时会抛出IllegalMonitorStateException。
* nanosTimeout指定该方法等待信号的的最大时间(单位为纳秒)。若指定时间* * 内收到signal()或signalALL()则返回nanosTimeout减去已经等待的时间;
*若指定时间内有其它线程中断该线程,则抛出InterruptedException并清除当前线程的打断状态;
* 若指定时间内未收到通知,则返回0或负数。
*/
nanos = notEmpty.awaitNanos(nanos); //每次signal唤醒重新等待
}
return dequeue(); //如果有元素取出
} finally {
lock.unlock();
}
}
//如果poll超时返回null,则回调到
f ((wc > maximumPoolSize || (timed && timedOut)) //true
&& (wc > 1 || workQueue.isEmpty())) { //队列也是空的,走进去
if (compareAndDecrementWorkerCount(c)) //CAS可以减少c的个数
return null; //返回了null,该线程不能再上方的while循环中继续获取就结束线程啦,非核心线程就over啦,嘿嘿!
continue;
}
- 分析以上流程
- poll()中参数表示超时等待时间
- 若超过该时间阻塞队列依然为空,则返回null,退出while循环,线程执行结束,非核心线程被销毁
- 线程获取lock锁,执行awaitNanos最大等待nanos秒之内若收到signal()或signalALL()被唤醒,执行dequeue去阻塞队列中取任务执行
- take方法执行核心线程
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) //不能使用if,避免虚假唤醒
notEmpty.await(); //一旦count队列为空,会一致await阻塞在这里的,直到workQueue.offer()添加元素时唤醒
return dequeue(); //取出队头元素
} finally {
lock.unlock();
}
}
- await不设置超时等待时间,notEmpty.await()一直阻塞,那这个阻塞又是何时被唤醒的呢?
- 当然是下一个任务达到的时候也就是调用execute的时候添加一个新的任务Task;
//这个就是调用当前核心线程已经满了,则添加到阻塞队列中,
//刚刚上方的核心线程在等待任务,添加以后肯定就调用notEmpty.signal()唤醒等待线程取任务执行啦
if (isRunning(c) && workQueue.offer(command))
- 我们来验证一下我们的想法:workQueue就是选择的队列,这里看ArrayBlockingQueue,当然对于其他队列也是相同的
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock; //获取锁,跟上方加锁时同一把锁
lock.lock();
try {
if (count == items.length)
return false; //如果当前队列已满,不能再加入了false
else {
enqueue(e); //正常添加到队列中
return true;
}
} finally {
lock.unlock();
}
}
//enqueue添加到数组循环队列中后调用notEmpty.signal()唤醒一个await线程取任务开始工作啦!
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
- 通过生成-消费者模式,将execute加入队列的任务通知等待的核心线程取阻塞队列中的任务开始执行!
总结
- 线程池核心线程一直在运行,不会终止的原因是由于使用while循环轮训阻塞队列中是否存在任务,若没有CPU不会空转而是调用await()等待函数,当阻塞队列中添加任务时会被唤醒,去取任务继续执行while循环;
- 核心和非核心是由等待函数决定的,设置是否有等待超时时间,若超时后返回null退出while循环,线程执行结束被销毁;