今天来说一说线程池ThreadPoolExecutor,线程池主要解决两个问题:一是当执行大量异步任务时线程池能够提供较好的性能。在不使用线程池时,每当需要执行异步任务时直接new一个线程来运行,而线程的创建和销毁都需要开销。线程池中的线程是可以复用的,不需要每次执行异步任务都进行创建线程,从而减少了开销。二是线程池提供了一种资源限制和管理的手段,例如限制线程的个数、动态增加线程的个数、缓存异步任务等。
ThreadPoolExecutor内部有一个原子性变量ctl,用来存储线程池的状态以及线程的个数,变量ctl高3位存储线程池状态,低29位存储线程个数。其中线程池状态有如下5中:
RUNNING:接受新任务并且处理阻塞队列里的任务。
SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务。
STOP:拒绝新任务并抛弃阻塞队列中的任务,并同时中断正在处理的任务。
TIDYING:所有任务执行完(包括阻塞队列里的任务)后当前线程池活动线程数位0,线程状态变为次TIDYING,之后将要调用terminated方法。
TERMINATED:调用terminated()方法后的状态。
线程池之间状态变化如下所示:
RUNNING -> SHUTDOWN:显式调用了shutdown()方法,或者隐式调用finalize()方法里面的shutdown()。
(RUNNING or SHUTDOWN) -> STOP:显式调用shutdownNow()方法。
SHUTDOWN -> TIDYING:当阻塞队列和线程池都为空时。
STOP -> TIDYING:当线程池为空时。
TIDYING -> TERMINATED:当terminated()方法执行完成时。
下面通过线程池构造函数的参数来说说线程池的执行步骤。构造函数如下
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize:线程池核心线程个数。
maximumPoolSize:线程池最大线程个数。
keepAliveTime:存活时间,当非核心线程处于闲置状态,这些闲置的非核心线程能存活的最大时间。
unit:存活时间的时间单位。
workQueue:任务队列,用来保存等待执行任务的阻塞队列。
threadFactory:创建线程的工厂。
handler:饱和策略。当线程个数已达到最大线程个数,并且任务队列也已满,继续添加任务则会指定该饱和策略,比如:AbortPolicy(抛出异常)、CallerRunsPolicy(使用调用则所在线程来运行任务)、DiscardOldestPolicy(调用poll丢弃一个任务,执行当前任务)和DiscardPolicy(默默丢弃,不抛异常)。
当调用线程池的executor方法执行任务的时候都会创建一个core线程(即使之前创建的core线程处于空闲状态),直到core线程数达到corePoolSize,当core线程数达到corePoolSize并且所有的core线程都在执行任务,这时再添加任务就会往BlockingQueue阻塞队列中放,当BlockingQueue阻塞队列也达到设置的容量之后,就会再创建线程来执行新添加的任务,直到创建的线程达到maximumPoolSize,如果线程达到maximumPoolSize还在添加任务,这时就会执行饱和策略(抛弃、报错或当前执行executor方法的线程去执行该任务)。
创建ThreadPoolExecutor 时会指定一个keepAliveTime的参数,用来指定非core线程在空闲keepAliveTime的时候之后就会被回收,这个非core线程keepAliveTime超时之后被回收是通过在获取BlockingQueue阻塞队列中任务时添加一个超时时间(BlockingQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)),从而实现了非core线程 空闲时间超过keepAliveTime之后被回收。
下面对ThreadPoolExecutor内部主要方法源码进行讲解。
execute(Runnable command)
提交任务。
public void execute(Runnable command) {
//(1)检查提交的任务是否为null,是则抛出 NullPointerException异常。
if (command == null)
throw new NullPointerException();
//(2)获取原子变量ctl,高3位表示线程池状态, 低29位表示线程个数
int c = ctl.get();
//(3)如果当前线程池的线程小于核心线程数,则尝试添加一个核心work线程到线程池中,并将当前任务作为新创建线程的第一个任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//(4)如果线程池处于RUNNING状态,则添加任务到阻塞队列。
if (isRunning(c) && workQueue.offer(command)) {
//(5)因为是向阻塞队列中插入元素,这个操作有可能阻塞,在阻塞的这段时间有可能线程池状态发生了变化,所以这里需要再进行一次状态检查。
int recheck = ctl.get();
//(6)如果当前线程状态不是RUNNING则从队列中删除任务,并执行拒绝策略。
if (! isRunning(recheck) && remove(command))
reject(command);
//(7)如果当前线程为空,则尝试添加一个非核心线程,代码走到这个分支有可能就是执行的核心线程为0。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//(8)如果阻塞队列已满,则尝试创建一个非核心线程,并将当前任务作为该非核心线程的第一个任务。创建失败则执行拒绝策略。
else if (!addWorker(command, false))
reject(command);
}
addWorker(Runnable firstTask, boolean core)
添加工作线程,可以是核心线程也可以是非核心线程。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//(1)获取ctl变量的值,并获取线程池状态。
int c = ctl.get();
int rs = runStateOf(c);
//(2)检查阻塞队列是否只在必要时为空。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//(3)循环CAS增加线程个数。
for (;;) {
//(4)如果线程个数超限则返回false,下面3中情况都会返回false
//一:线程个数大于最大容量数。
//二:如果创建的是核心线程,线程池的核心线程个数已大于corePoolSize指定的核心线程数大小。
//三:如果创建的是非核心线程,线程池的线程数已大于 maximumPoolSize指定的线程数大小。
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//(5)增加线程数,使用CAS算法为ctl变量的低29位表示的数字加1。
if (compareAndIncrementWorkerCount(c))
break retry;
//(6)CAS失败,则看线程池状态是否变化了,变化则跳到外层循环重新尝试获取线程池状态,否则在内层循环重新CAS。
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//(7)创建worker,在获取独占锁之前进行创建,减少锁占用的时间。
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//(8)尝试获取独占锁。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//(9)再次检查线程池状态,有可能在获取锁前有别的线程调用了线程池的shutdown接口。
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
//(10)释放获取的独占锁。
mainLock.unlock();
}
//(11)添加worker成功后,则调用该新增worker的start()方法开始执行任务。
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//(12)如果添加worker失败,则通过CAS算法将ctl变量的低29位表示的数字减1,将该创建的worker移除,并检查线程池状态。
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
提交任务之后worker是如何执行任务的呢?下面来看一下worker是如何处理任务的。
runWorker(Worker w)
运行worker,从阻塞队列中持续获取任务进行执行。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// (1)调用getTask()方法从阻塞队列中持续获取任务进行执行。
while (task != null || (task = getTask()) != null) {
//(2)在执行任务之前获取worker中的锁,防止任务执行的时候被中断,除非线程池正在stop。
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//(3)如果线程池正在stop,保证worker中的线程是中断状态,否则保证worker中的线程为非中断状态。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//(4)执行任务。
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
//(5)释放worker的锁。
w.unlock();
}
}
completedAbruptly = false;
} finally {
//(6)worker结束前执行清理任务。
processWorkerExit(w, completedAbruptly);
}
}
getTask()
从阻塞队列中获取任务,该方法实现了线程空闲过期策略。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
//(1)获取线程池状态。
int rs = runStateOf(c);
//(2) 检查阻塞队列是否为空,空则将worker数减一并返回null,之后在 runWorker()方法中会对worker进行回收。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//(3)worker是否设置闲置超时 ,核心worker也是可以设置进行闲置超时回收。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//(4)worker已经闲置超时,并且阻塞队列为空,则将worker数减一,并返回null,之后在 runWorker()方法中会对worker进行回收。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// (5)使用阻塞队列带超时时间的poll(long timeout, TimeUnit unit)方法来实现闲置超时,闲置超时后在下一次进到循环中会执行(4)的代码。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
上面讲完了ThreadPoolExecutor如何进行提交任务以及worker如何获取任务并执行任务。下面讲一下ThreadPoolExecutor中的一些其他常用方法。
shutdown()
调用shutdown方法后,线程池就会不再接受新的任务,但是阻塞队列中的任务还是会执行。该方法会立刻返回,并不会等待队列中任务完成后返回。
public void shutdown() {
//(1)尝试获取独占锁。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//(2)检查权限。
checkShutdownAccess();
//(3)将当前线程池状态修改为SHUTDOWN状态,如果已为SHUTDOWN状态则直接返回。
advanceRunState(SHUTDOWN);
//(4)设置worker的中断标识。
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
//(5)释放锁。
mainLock.unlock();
}
//(6)尝试将线程池状态设置为TERMINATED,以下两种情况会将线程池状态设置为TERMINATED
//1.当线程池状态为SHUTDOWN并且线程池和阻塞队列都为空
//2.当线程池状态为STOP并且线程池为空。
tryTerminate();
}
shutdownNow()
调用shutdownNow方法后,线程池不再接受新的任务,并且会丢弃工作队列里面的任务,中断正在执行的任务(中断是通过设置中断标识完成的,如果任务没有对中断标识进行响应则不会停止该任务),该方法会立刻返回,返回值为阻塞队列里被丢弃的任务列表。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
//(1)尝试获取锁。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//(2)检查权限
checkShutdownAccess();
//(3)设置线程池状态为stop
advanceRunState(STOP);
//(4)设置所有worker中的线程的中断标志。
interruptWorkers();
//(5)将阻塞队列中的任务移动到tasks中。
tasks = drainQueue();
} finally {
//(6)释放锁。
mainLock.unlock();
}
//(7)尝试将线程池状态设置为TERMINATED,以下两种情况会将线程池状态设置为TERMINATED
//1.当线程池状态为SHUTDOWN并且线程池和阻塞队列都为空
//2.当线程池状态为STOP并且线程池为空。
tryTerminate();
return tasks;
}
线程池巧妙地使用了原子变量来记录线程池的状态以及线程个数。通过线程池状态来控制任务的执行,每个worker可以处理多个任务。线程池通过线程的复用减少了线程创建和销毁的开销。
今天的分享就到这,有看不明白的地方一定是我写的不够清楚,所有欢迎提任何问题以及改善方法。