手写线程池只需了解7个线程池核心参数
参数名 | 中文名 | 说明 |
---|---|---|
corePoolSize | 核心线程数 | 默认不会销毁,需要设置allowCoreThreadTimeOut为true时会销毁 |
maximumPoolSize | 最大线程数量 | 线程数量要大于核心线程数,且不能小于等于0 |
keepAliveTime | 空闲时间 | 最终存入的是nanos的值 |
unit | 空闲时间单位 | unit.tonanos(long time),最终转为纳秒值存入成员变量 |
workQueue | 存放任务的队列 | jdk实现的queue在初始化时需要设置队列长度 |
threadFactory | 线程创建的工厂 | 默认会提供工厂,Executors.DefaultThreadFactory |
handler | 拒绝策略 | 默认拒绝策略为AbortPolicy,直接抛出异常 |
文章开头直接表格丢出七个核心参数,以及简单的说明。
如果已经是老司机,那么看到之后就能直接唤醒深处已经淡忘的记忆。
如果还是小萌新,不妨先简单的有个印象,等看完示例之后再深刻记忆。无论是对面试还是源码走读,或是线程池二开优化都会有帮助。
前导
手写线程池之前,你得知道线程池是什么,如何工作的。这样踩在Doug Lea
的肩膀上你才能看的更远,走的更高。
如果你还不知道什么是线程池,可能本文不太适合你,看看我其他文章如线程池的前世今生、池化技术之后再来吧。
JDK中ThreeadPoolExecutor线程池的运行原理借用我生活中的例子来说明,帮助理解记忆。
故事
第零章
我是隶属于一个包工头(ThreadPoolExecutor
)下的一个小员工,是一名Worker
。曾经他手下一个员工都没有(默认线程数为0,没有线程启动),他自己也好吃懒作,从来不干活。
一次酒后,我们头儿吐露出他计划要招4个长工(coolPoolSize
为4),而且以他的能力最多也只能管理6个下属(maximumPoolSize
为6),那么也就说他最多也还会招2个临时工。他老婆兼职着Hr(threadFactory
,理解为每一个线程都是经过她的手创建),每一个人入职都会发给我们一个安全帽,安全帽上标着编号,而我的编号是0x11。
第一章:第一个下属
有一天,一个老板遇见头儿,问他100块搬一块砖干不干,头儿说干。我们都知道的,他是不干活的。在下雨的公交站,只有我们两个人,他神秘的问我,你也是搬砖的吧,我有一个1块钱一块砖的活,你敢不敢?我想到我们老板以前只给一毛一块,我二话没说,成了他第一个员工(此时,核心线程数为1)。
第二章:干活
看着涨了10倍的工资,我的工作积极性被调动了起来。从来没有一块砖需要等我,一来砖我就搬完了,只有我等砖,没有砖等我。
第三章:两块砖
有一天,工地门口一下子来了两块砖。
我站在门口呆了,毕竟我一次只能搬一块砖,还有一块砖怎么办?工头来了,不知道他使出了什么技巧,一眨眼的时间,又拉到了一个工友,和我一起搬砖。不知道他是不是也是1块钱一块砖,我也没敢问。
第四章:两个人
活来的不是那么稳定。我和小王,对了忘记说了,那个新来的同事叫小王。小王和我一样,也是手脚麻利,干活卖力,一次来两块砖时,我们一人一块,如果只到了一块砖,我们看谁先抢到,谁就能赚这一块钱。幸好,我的手脚挺快,基本上一块砖的时候,我也能和他抢个55开。
第五章:平淡的生活
砖是一天天搬,工头的生意也越做越大,最多的时候能有一次四块砖。和小王来的那次一样,在门口岗亭那,工头和他们聊不到一支烟的功夫,我又多了两个工友。
第六章:门口的沙地
门口有一块沙地,我一直都没有注意到它。但是今天例外,因为今天一下来了五块砖,工头也没向往常一样出来解决。我知道他躺在那一定是看见了,因为我看到他故意的压了压帽檐,应该是想让我们以为他睡得比较死,帽子盖得严。
我们四个人一人搬走了一块砖,还剩下一块砖,它就静静的摆在沙地上(中间插一句,workQueue
中插入了第一个任务,这毕竟还是一篇技术向的文章,别忘了正事)。它就躺在那块沙地上,我从来没有注意到的沙地,仿佛以前从未存在,但是今天它在了。沙地上还摆着我的罪恶感,不知道其他的工友是怎么想的,至少我是。我内心里从来不能忍受有砖是在等我搬的。明明只有我能等砖啊。
第七章:砖块
想聊的太多了,我好像都忘记介绍我搬的砖块了。这可不是普通的砖块,长宽高我不太有概念,只知道很重,那天工头跟我们说,这砖不耐压,最底下的砖要是再压上三块(共4块,workQueue
的长度为4),如果再加一块就会把最底下的砖压坏。我甚至还听到一个比较恐怖的传说,门口那块沙地风水也不太好,要是沙地上一共压了五块砖,能把下面挖的地下室压塌,甚至整个工地也会倒掉。(队列中积压大对象过多,导致OOM)
这就是为什么我一次只能搬一块砖,这砖可不好搬呀。你也别说你来你也行。要是没有经过长期的搬砖训练,这砖你应该是搬不动的。
第八章:摇人
沙地上偶然会出现已经积压四块砖的情况,不过一般很快危险就被排除了。毕竟我们四个工友都是好样的,基本上不会让这种情况出现太久。
但是危险的事情总会出现,两辆自卸车带着第五块砖、第六块砖来了。门口也没有其他人,不管怎么样,第五块砖都会压上去吧。
然而,这件事我没有亲眼所见,是我工友告诉我的。
工友告诉我,这些自卸车到的几分钟前,工头就打了一个电话。也就等自卸车刚停稳,边上一辆小面包就过来了。面包车里跳下来两个人,一人搬起一块砖,就开始工作。
工头原来还会摇人,从没有见过,还刚好摇了两个人来搬超额的两块砖。我们私下都认为老板应该是事先知道今天肯定会来不及搬。
第九章:门房大爷
原来我不知道的事情还有这么多。门房大爷(RejectedExecutionHandler
)是早在我之前,就已经和工头谈好的,帮忙看着那些砖,防止转压的过多。事情是这样的。
不是我们不勤快,是砖越来越多。即便我们六个人,也搬不过来,堆在沙地上的砖也一直到了四块。
我们一直都以为门房大爷和蔼可亲,和我们关系也老好了,不像是个社会人。但是那天自卸车到了,大家都没注意,第五块砖要压上来了,唯独大爷看到了。只见到大爷抄起菜刀脱掉安全帽带起钢盔冲到门口,大声嚷嚷挥舞菜刀赶走了自卸车和带着的砖块(默认AbortPolicy策略,直接抛异常,任务不处理)。原来大爷是这么一个大爷,有他在,这辈子是验证不了都市传说到底是不是真的了。
第十章:后疫情时代
疫情来了,送砖头的车也少了。我们知道,裁员肯定也是来了。
工头说他很人性化,他不做决断,让我们自己说了算。但是他定下规矩如果三天(时间单位TimeUnit.DAYS
,时间keepAliveTime
为3)一块砖都没搬,让我们自己选择离开吧。我还以为我们最开始四个不是临时工,不会被裁,原来我们都是。
工头只保证最后会留四个。
第十一章:内卷还在继续
虽然我们不拿底薪,按照搬砖的数量计薪。不过工头还是嫌我们人多活少碍事,在工地里瞎晃悠(线程还是存在进程中,多少占用了系统资源)。工头调整了规则(allowsCoreThreadTimeOut
为true
,允许核心线程销毁),我们四个常任搬砖工也变成临时工,根据之前的规则要被淘汰了。
每一个人都为了一口饭,互相内卷着,希望不被开除。
手写线程池实现
终于开始自己个儿实现一个线程池了,思路就是模仿jdk中的线程池。或者说的再直白点,那就是默写jdk线程池的核心代码。
定义成员变量
/**
* 线程池中用了一个ctl来实现, 不过自己实现的时候并不想写的这么复杂, 因此还是拆成两个变量, state与threadNum
* 线程池状态,简单理解为-1是正常,大于0线程池已经异常, 简单起见,就没有做判断.
*/
private AtomicInteger state = new AtomicInteger(0);
/**
* 线程数量
*/
private AtomicInteger threadNum = new AtomicInteger(0);
/**
* 完成任务量
*/
private volatile int compliteNum = 0;
/**
* 持有所有的线程
*/
private HashSet<Worker> workers = new HashSet<>();
/**
* 缓存工作任务
*/
private BlockingQueue<Runnable> taskQueue;
/**
* 线程池创建的工厂
*/
private volatile ThreadFactory threadFactory;
/**
* 核心线程数
*/
private volatile int corePoolSize;
/**
* 最大线程数
*/
private volatile int maximumPoolSize;
/**
* 拒绝策略
*/
private MyRejectedExecutionHandler handler;
/**
* 锁, 处理一些公用变量时需要加锁
*/
private final ReentrantLock mainLock = new ReentrantLock();
构造方法
只提供了一个三个参数的构造方法,其他参数都不是关键,所以直接设置了默认值。
/**
* 提供一种构造方法, 只需要配置线程数和queue的容量
* @param corePoolSize
* @param maximumPoolSize
* @param taskQueue
*/
public MyPool(Integer corePoolSize, Integer maximumPoolSize, BlockingQueue taskQueue) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.taskQueue = taskQueue;
this.threadFactory = Executors.defaultThreadFactory();
this.handler = new MyRejectedExecutionHandler();
}
核心内部类,worker
/**
* 内部类, worker,持有线程,执行任务
*/
private class Worker implements Runnable {
Thread thread;
Runnable task;
/**
* 构造方法, 构造出work
* @param task
*/
public Worker( Runnable task) {
this.task = task;
this.thread = getThreadFactory().newThread(this);
}
@Override
public void run() {
System.out.println("覆盖run方法,对传入的任务做增强");
runWorker(this);
}
}
获取线程工厂的方法
线程工厂用的是jdk中自带的DefaultThreadFactory
,在worker
初始化时获取线程。
/**
* 通过线程工厂创建线程
* @return
*/
private ThreadFactory getThreadFactory() {
return this.threadFactory;
}
自定义拒绝策略
自定义拒绝策略。
/**
* 自定义拒绝策略, 并没有去继承jdk中的接口
* 直接打印, 不做其他处理
*/
private class MyRejectedExecutionHandler {
public void rejectedExecution(Runnable r, MyPool p) {
System.out.println("任务过载,丢弃任务");
}
}
在后续使用拒绝策略时,调用的方法:
private void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
向线程池提交任务
向线程池提交任务,在自己实现的线程池中,是为数不多的对外暴露的接口。
而在官方的线程池中,也几乎没有什么能直接操作到线程或者任务的API,对外暴露的多为管理线程池方面的API。
这个方法需要好好掌握,也是面试中问的最多的部分。
/**
* 对外暴露的方法, 执行任务
* @param r
*/
public void execute(Runnable r) {
if (r == null) throw new NullPointerException();
int n = threadNum.get();
// 小于核心线程数
if (n < corePoolSize) {
addWorker(r, true);
return;
}
// 队列未满, 未满时offer返回true
if (taskQueue.offer(r)) {
addWorker(null, false);
}
// 队列已满, 增加线程直到最大线程数为止
else if (!addWorker(r, false)){
// 执行拒绝策略
reject(r);
}
}
将任务交给worker
内部类
execute
中基本上只是一个中转,决定任务到底是由核心线程接收,还是队列接收,还是非核心线程接收或是拒绝策略接收。
在这里是要把任务添加给work
,加入到works
中,并且调用了线程的t.start()
方法启动一个新的线程。
/**
* 将任务添加到执行过程中
* @param r
* @param core
*/
private boolean addWorker(Runnable r, boolean core) {
int num = threadNum.get();
// 乐观锁
for(;;) {
// 线程数是否超出
if (num >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
if (threadNum.compareAndSet(num, num + 1)) {
// 添加成功
break;
}
}
Worker w = null;
boolean workerStarted = false;
try {
w = new Worker(r);
Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
workers.add(w);
} finally {
mainLock.unlock();
}
t.start();
workerStarted = true;
}
} finally {
}
return workerStarted;
}
真正执行任务
/**
* 线程由addWorker中t.start已启动, 启动之后将执行Worker中的run方法
* 这个run方法传入的是worker对象,
* 对普通的任务做增强, 也用于对于所执行的任务做前置后置的管理
* @param w
*/
private void runWorker(Worker w) {
Runnable task = w.task;
try {
// 一直获取任务
while (task != null || (task = getTask()) != null) {
try {
beforeExecute();
try {
task.run();
} finally {
afterExecute();
}
} finally {
// 演示用, 直接加锁, 统计任务完成数
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
compliteNum += 1;
} finally {
mainLock.unlock();
task = null;
}
}
}
} finally {
// 线程退出的后置操作
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
workers.remove(w);
threadNum.decrementAndGet();
} finally {
mainLock.unlock();
}
if (threadNum.get() < corePoolSize) {
addWorker(null, false);
}
}
}
获取任务,不让线程闲下来
/**
* 从队列中获取任务
* @return
*/
private Runnable getTask() {
try {
Runnable r = taskQueue.take();
if (r != null) {
return r;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
前后置增强
private void afterExecute() {
//System.out.println("操作后置增强");
}
private void beforeExecute() {
//System.out.println("操作前置增强");
}
获取总共完成的任务数
/**
* 获取总共完成的任务数
* @return
*/
public int getCompliteNum() {
return this.compliteNum;
}
运行示例
全部代码
public class MyPool {
/**
* 线程池中用了一个ctl来实现, 不过自己实现的时候并不想写的这么复杂, 因此还是拆成两个变量, state与threadNum
* 线程池状态,简单理解为-1是正常,大于0线程池已经异常, 简单起见,就没有做判断.
*/
private AtomicInteger state = new AtomicInteger(0);
/**
* 线程数量
*/
private AtomicInteger threadNum = new AtomicInteger(0);
/**
* 完成任务量
*/
private volatile int compliteNum = 0;
/**
* 持有所有的线程
*/
private HashSet<Worker> workers = new HashSet<>();
/**
* 缓存工作任务
*/
private BlockingQueue<Runnable> taskQueue;
/**
* 线程池创建的工厂
*/
private volatile ThreadFactory threadFactory;
/**
* 核心线程数
*/
private volatile int corePoolSize;
/**
* 最大线程数
*/
private volatile int maximumPoolSize;
/**
* 拒绝策略
*/
private MyRejectedExecutionHandler handler;
/**
* 锁, 处理一些公用变量时需要加锁
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 提供一种构造方法, 只需要配置线程数和queue的容量
* @param corePoolSize
* @param maximumPoolSize
* @param taskQueue
*/
public MyPool(Integer corePoolSize, Integer maximumPoolSize, BlockingQueue taskQueue) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.taskQueue = taskQueue;
this.threadFactory = Executors.defaultThreadFactory();
this.handler = new MyRejectedExecutionHandler();
}
/**
* 内部类, worker,持有线程,执行任务
*/
private class Worker implements Runnable {
Thread thread;
Runnable task;
/**
* 构造方法, 构造出work
* @param task
*/
public Worker( Runnable task) {
this.task = task;
this.thread = getThreadFactory().newThread(this);
}
@Override
public void run() {
System.out.println("覆盖run方法,对传入的任务做增强");
runWorker(this);
}
}
/**
* 通过线程工厂创建线程
* @return
*/
private ThreadFactory getThreadFactory() {
return this.threadFactory;
}
/**
* 自定义拒绝策略, 并没有去继承jdk中的接口
* 直接打印, 不做其他处理
*/
private class MyRejectedExecutionHandler {
public void rejectedExecution(Runnable r, MyPool p) {
System.out.println("任务过载,丢弃任务");
}
}
/**
* 对外暴露的方法, 执行任务
* @param r
*/
public void execute(Runnable r) {
if (r == null) throw new NullPointerException();
int n = threadNum.get();
// 小于核心线程数
if (n < corePoolSize) {
addWorker(r, true);
return;
}
// 队列未满, 未满时offer返回true
if (taskQueue.offer(r)) {
addWorker(null, false);
}
// 队列已满, 增加线程直到最大线程数为止
else if (!addWorker(r, false)){
// 执行拒绝策略
reject(r);
}
}
/**
* 将任务添加到执行过程中
* @param r
* @param core
*/
private boolean addWorker(Runnable r, boolean core) {
int num = threadNum.get();
// 乐观锁
for(;;) {
// 线程数是否超出
if (num >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
if (threadNum.compareAndSet(num, num + 1)) {
// 添加成功
break;
}
}
Worker w = null;
boolean workerStarted = false;
try {
w = new Worker(r);
Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
workers.add(w);
} finally {
mainLock.unlock();
}
t.start();
workerStarted = true;
}
} finally {
}
return workerStarted;
}
/**
* 线程由addWorker中t.start已启动, 启动之后将执行Worker中的run方法
* 这个run方法传入的是worker对象,
* 对普通的任务做增强, 也用于对于所执行的任务做前置后置的管理
* @param w
*/
private void runWorker(Worker w) {
Runnable task = w.task;
try {
// 一直获取任务
while (task != null || (task = getTask()) != null) {
try {
beforeExecute();
try {
task.run();
} finally {
afterExecute();
}
} finally {
// 演示用, 直接加锁, 统计任务完成数
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
compliteNum += 1;
} finally {
mainLock.unlock();
task = null;
}
}
}
} finally {
// 线程退出的后置操作
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
workers.remove(w);
threadNum.decrementAndGet();
} finally {
mainLock.unlock();
}
if (threadNum.get() < corePoolSize) {
addWorker(null, false);
}
}
}
/**
* 从队列中获取任务
* @return
*/
private Runnable getTask() {
try {
Runnable r = taskQueue.take();
if (r != null) {
return r;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
private void afterExecute() {
//System.out.println("操作后置增强");
}
private void beforeExecute() {
//System.out.println("操作前置增强");
}
private void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
/**
* 获取总共完成的任务数
* @return
*/
public int getCompliteNum() {
return this.compliteNum;
}
}
后记
工作中,手写线程池属实没什么用。jdk已经在底层实现了线程池的功能,我们只需要在此基础上封装即可。手写的意义在于能更细致的去精读里面的代码。获取你直接阅读,甚至是debug之后,你以为你已经掌握了。但是你用自己的语言,自己的理解再去实现一遍时。你会发现,还是有很多地方跟你想象中的不一样,有很多细节遗漏。因此即便到最后实现的线程池在一般流程下能运行,但是各种异常情况,各种容错都没有实现,这些异常的处理,已经边界条件的选择仍然需要精进。