1.字段与构造方法
/**
* 线程池状态成为 shutdown 时,线程从任务队列内获取到 “周期执行任务” 时,是否进行执行。
* 默认值 false,不执行。
* 如果设置为true,线程读取出来 周期执行任务 时 ,还会执行一次..
*
* False if should cancel/suppress periodic tasks on shutdown.
*/
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
/**
* 线程池状态成为 shutdown 时,线程从任务队列内获取到 “延迟任务” 时,是否进行执行。
* 默认值 true,执行。
*
* False if should cancel non-periodic tasks on shutdown.
*/
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
/**
* 任务被取消时 是否 需要从“任务队列”内移除,默认false,不移除,等到线程拿到任务之后 抛弃..
* 设置为 true 则 取消任务时,就主动的从 任务队列 内移除走了..
*
* True if ScheduledFutureTask.cancel should remove from queue
*/
private volatile boolean removeOnCancel = false;
/**
* 生成任务序列号的一个字段
* 为什么需要序列号呢?因为调度线程池 它 有独特的任务队列,该任务队列 是 优先级任务队列,
* 优先级任务队列 内的 任务 都是需要实现 Compare 接口的,如果两个 任务 他们 的 compare 比较之后
* 得到的 结果 是 0, 说明 这俩任务的 优先级一致,此时 就需要 再根据 序列号 进行 对比,看到底谁优先。
*
* Sequence number to break scheduling ties, and in turn to
* guarantee FIFO order among tied entries.
*/
private static final AtomicLong sequencer = new AtomicLong();
/* corePoolSize 核心线程数
* threadFactory 线程工厂
* handler 拒绝策略
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// super -> ThreadPoolExecutor 构造方法
// 参数2:maximumPoolSize,传的 int 最大值,非核心线程 不受限制
// 参数3:keepAliveTime,0 说明非核心线程 一旦空闲 就立马销毁去..(线程是不分 核心 和 非核心的,只不过线程池会维护 一个 corePoolSize 内的线程数量)
// 参数5:new DelayedWorkQueue() ,创建了一个 任务队列 ,这是一个 延迟任务队列,它是一个 优先级 任务队列,在优先级 基础之上提供了 hold 线程的事情。
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
ScheduledThreadPoolExecutor的主要特点:
- 对Runnable任务进行包装,封装成ScheduledFutureTask,该类任务支持任务的周期执行、延迟执行;
- 采用DelayedWorkQueue作为任务队列。该队列是无界队列,所以任务一定能添加成功,但是当工作线程尝试从队列取任务执行时,只有最先到期的任务会出队,如果没有任务或者队首任务未到期,则工作线程会阻塞;
- ScheduledThreadPoolExecutor的任务调度流程与ThreadPoolExecutor略有区别,最大的区别就是,先往队列添加任务,然后创建工作线程执行任务。
- 另外,maximumPoolSize这个参数对ScheduledThreadPoolExecutor其实并没有作用,因为除非把corePoolSize设置为0,这种情况下ScheduledThreadPoolExecutor只会创建一个属于非核心线程池的工作线程;否则,ScheduledThreadPoolExecutor只会新建归属于核心线程池的工作线程,一旦核心线程池满了,就不再新建工作线程。
2.提交任务的接口
2.1 提交延迟任务
/**
* 提交的任务被称为“延迟任务”,不是周期执行的任务
* 延迟多久呢? delay 参数 和 unit 参数控制。
* 注意:只执行一次
*
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
2.2 按照固定时间间隔执行的周期性任务
/**
* 提交的任务是“周期任务”,什么是周期任务?按照一个周期去反复执行的任务。
* initialDelay:任务初次执行时的延迟时间。
* period:周期时长
*
* 特点:该接口提交的任务,它不考虑执行的耗时。
* 举个例子:有一个任务 10秒钟 执行一次,假设它每次执行耗时2秒。在 03:00:00 执行了一次,耗时了2秒,那它下一次的执行时间节点是 03:00:10.
* 如果它执行耗时超过了 10 秒,那它下一次的执行时间 就是 立马执行。
*
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
2.3 按照固定延迟时间执行的周期性任务
/**
* 提交的任务是“周期任务”,什么是周期任务?按照一个周期去反复执行的任务。
* initialDelay:任务初次执行时的延迟时间。
* period:周期时长
*
* 特点:它考虑到执行耗时。
* 例子:有一个任务 每10秒执行一次,假设它本次执行的时间点是 03:00:00 ,耗时了5秒钟,那它下一次的执行时间点 03:00:05 + 10s => 03:00:15 这个时间点。
*
* 任务执行【等待10s】任务执行【等待10s】任务执行【等待10s】...
*
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
3.ScheduledFutureTask
3.1 字段
// 序列号,用于对比任务优先级,当任务的 time 字段 比较不出来 优先级时,就使用该字段进行比较。
// 该字段一定不会一致。
/** Sequence number to break ties FIFO */
private final long sequenceNumber;
// 任务下一次执行的时间节点,交付时间。当任务执行结束之后,会再次将任务加入到 queue,加入到queue之前
// 会修改该字段 为 下一次 执行的时间节点。
/** The time the task is enabled to execute in nanoTime units */
private long time;
// 周期时间长度..
// 1. period > 0 的情况,说明当前任务 是 由 scheduleAtFixedRate(..)接口提交的任务,该任务它不考虑执行耗时。
// 2. period < 0 的情况,说明当前任务 是 由 scheduleWithFixedDelay(..)接口提交的任务,该任务考虑执行耗时,下次执行节点为 本次结束时间 + period 时间。
// 3. perod == 0 的情况,说明当前任务 是 由 schedule(..)接口提交的任务,该任务是一个延迟执行的任务,只执行一次
/**
* Period in nanoseconds for repeating tasks. A positive
* value indicates fixed-rate execution. A negative value
* indicates fixed-delay execution. A value of 0 indicates a
* non-repeating task.
*/
private final long period;
// 指向自身..后面看源码再说..
/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture<V> outerTask = this;
// 因为DelayedWorkQueue 底层使用的数据结构是 堆(最小堆),记录当前任务 在 堆 中的索引。
// 其实就是 数组中的索引,堆是一个满二叉树,满二叉树 一般是使用 数组表示,看过 netty 内存管理的同学 应该清楚 满二叉树 数组表示..
/**
* Index into delay queue, to support faster cancellation.
*/
int heapIndex;
/**
* 周期任务执行完成之后,再次提交到 DelayedWorkQueue 之前调用的。
* 设置下一次 任务的执行时间节点。
*
* Sets the next time to run for a periodic task.
*/
private void setNextRunTime() {
// p 表示 period
long p = period;
if (p > 0)
// 该条件成立,说明 任务 是由 scheduleAtFixedRate(..)接口提交的任务,该任务它不考虑执行耗时。
// 直接拿 上一次 任务的执行节点 + 执行周期 作为下一次 执行节点。
time += p;
else
// 该条件成立,说明 任务 是由 scheduleWithFixedDelay(..)接口提交的任务,该任务考虑执行耗时 (period < 0 , -period 就是正数了 )
// 拿系统当前时间 + 执行周期,作为下一次执行 时间节点。
time = triggerTime(-p);
}
/**
* 取消任务
* @param mayInterruptIfRunning (当任务运行中时,是否需要先中断它..true 会先去中断任务,false 则直接设置任务状态为 取消状态)
* @return cancelled bool值,表示是否取消成功
*/
public boolean cancel(boolean mayInterruptIfRunning) {
// super -> FutureTask 父类,cancelled 表示任务是否取消成功
// 取消失败有哪些情况?1. 任务已经执行完成 2. 任务出现异常 3.任务被其它线程取消..
boolean cancelled = super.cancel(mayInterruptIfRunning);
// 条件1:cancelled ...
// 条件2:removeOnCancel 设置为 true 则 取消任务时,就主动的从 任务队列 内移除走了..
// 条件3:heapIndex > = 0 ,条件成立,说明 任务仍然在 堆 结构中,从 堆 结构中移除的任务 它的 heapIndex 会先设置为 -1
if (cancelled && removeOnCancel && heapIndex >= 0)
// remove 是线程池的方法,最终触发的是 延迟任务队列(DelayedWorkQueue.remove(..) 方法)
remove(this);
return cancelled;
}
/**
* 线程池中的线程 从 queue 内获取到 task 之后,调用 task.run() 方法 执行任务逻辑。
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
public void run() {
// periodic 表示当前任务是否为 周期执行 的任务
boolean periodic = isPeriodic();
// 1. periodic == true 时,当前任务是周期任务,并且 continue...参数值为 true,则会执行当前任务,否则 continue.. 值为 false,则不执行周期任务。
// 2. periodic == false 时,当前任务是 延迟任务,并且 executeExistingDelayedTasksAfterShutdown 参数值为 true(默认),则执行当前 延迟任务,
// 否则 executeExistingDelayedTasksAfterShutdown 参数值为 false,则不执行 从queue内取出的 延迟任务。
if (!canRunInCurrentRunState(periodic))
// 取消任务
cancel(false);
else if (!periodic)// 条件成立,说明任务是 延迟任务,不是 周期任务..
// 调用父类的run方法,即FutureTask.run() 逻辑,普通线程池没啥区别
ScheduledFutureTask.super.run();
// 执行到 else if 条件,就说明当前这个任务 肯定 是一个 “周期执行的任务”,此时调用 父类的 runAndReset() 方法去执行 任务逻辑。
// 该方法一般情况下 返回 true
else if (ScheduledFutureTask.super.runAndReset()) {
// 上面 ScheduledFutureTask.super.runAndReset() 执行,代表 任务 周期性 执行了 一次,执行完一次之后 该干什么呢?
// 1. 设置当前任务的 下一次执行时间
// 2. 再加入到 延迟任务队列
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
上面FutureTask#run和runAndReset区别是,runAndReset没有设置结果的步骤,因为是周期执行。
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
// 将任务再次加入到 任务队列
super.getQueue().add(task);
// 下面if 条件内的逻辑,是说 如果提交完任务之后,线程池状态变为了 shutdown 状态,并且 continue...参数值是 false的话,直接
// 将上一步 提交的任务 从 队列 移除走
if (!canRunInCurrentRunState(true) && remove(task))
// 将任务设置为 取消状态
task.cancel(false);
else
// 正常走这里..确保提交任务之后,线程池内有线程干活。
ensurePrestart();
}
}
4.DelayedWorkQueue
使用优先级队列DelayedWorkQueue,保证添加到队列中的任务,会按照任务的延时时间进行排序,延时时间少的任务首先被获取。
- DelayedWorkQueue的数据结构是基于堆实现的;
- DelayedWorkQueue采用数组实现堆,根节点出队,用最后叶子节点替换,然后下推至满足堆成立条件;最后叶子节点入队,然后向上推至满足堆成立条件;
- DelayedWorkQueue添加元素满了之后会自动扩容原来容量的1/2,即永远不会阻塞,最大扩容可达Integer.MAX_VALUE,所以线程池中至多有corePoolSize个工作线程正在运行;
- DelayedWorkQueue 消费元素take,在堆顶元素为空和delay >0 时,阻塞等待;
- DelayedWorkQueue 是一个生产永远不会阻塞,消费可以阻塞的生产者消费者模式;
- DelayedWorkQueue 有一个leader线程的变量,是Leader-Follower模式的变种。当一个take线程变成leader线程时,只需要等待下一次的延迟时间,而不是leader线程的其他take线程则需要等leader线程出队列了才唤醒其他take线程。
工作队列是高度定制化的延迟阻塞队列DelayedWorkQueue,其实现原理和DelayQueue基本一样,核心数据结构是二叉最小堆的优先队列,队列满时会自动扩容,所以offer操作永远不会阻塞,maximumPoolSize也就用不上了,所以线程池中永远会保持至多有corePoolSize个工作线程正在运行。
最小堆:
假设“第一个元素” 在数组中的索引为 0 的话,则父结点和子结点的位置关系如下:
- 索引为 的左孩子的索引是 (2∗i+1);
- 索引为 的右孩子的索引是 (2∗i+2);
- 索引为 的父结点的索引是 floor((i−1)/2);
为什么要使用DelayedWorkQueue呢?
- 定时任务执行时需要取出最近要执行的任务,所以任务在队列中每次出队时一定要是当前队列中执行时间最靠前的,所以自然要使用优先级队列。
- DelayedWorkQueue是一个优先级队列,它可以保证每次出队的任务都是当前队列中执行时间最靠前的,由于它是基于堆结构的队列,堆结构在执行插入和删除操作时的最坏时间复杂度是 O(logN)。
/**
* 先讲一个数据结构 “堆”,堆分为 最大堆 和 最小堆,堆这个结构是使用 满二叉树 表示的。
* 满二叉树结构在程序内一般是使用 数组表示的,下面这个最小堆 使用数组表示:[1,3,5,6,7,8,9]
* 公式:
* 1. 查询父节点:floor((i-1) / 2)
* 2. 查询左子节点: left = i * 2 + 1
* 3. 查询右子节点: right = i * 2 + 2
*
* 最小堆:
* 1 --- 索引 0
* / \
*索引1----3 5 ----索引 2
* / \ / \
* 6 7 8 9
*
* 最大堆:
*
* 10
* / \
* 8 9
* / \ / \
* 4 5 6 7
*
* 延迟队列采用的堆 是 最小堆,言外之意 最顶层的 元素 是优先级最高的 任务。
*
* 最小堆 插入元素的流程,咱们向下面这颗 最小堆 插入 元素 2 的流程:
*
*
* 1 | 1 | 1
* / \ | / \ | / \
* 3 5 -> 3 5 -> 2 5
* / \ / \ | / \ / \ | / \ / \
* 6 7 8 9 | 2 7 8 9 | 3 7 8 9
* / / | /
* 2 6 | 6
*
* 规则:
* 1. 将新元素 放入数组的最后一个位置,[1,3,5,6,7,8,9] -> [1,3,5,6,7,8,9, “2”]
* 2. 新元素 执行向上冒泡的逻辑,去和它的父节点 比较大小,(最小堆)父节点 值 大于 子节点,则交换它俩位置,此时相当于 向上冒泡了一层,然后持续这个过程
* 直到 新元素 成为 顶层节点 或者 碰到 某个 父节点 优先级 比他高,则停止。
*
*
* 最小堆 删除顶层元素的流程,假设删除 节点 1:
* 1 | 9 | 3 | 3
* / \ | / \ | / \ | / \
* 3 5 -> 3 5 -> 9 5 -> 6 5
* / \ / \ | / \ / | / \ / | / \ /
* 6 7 8 9 | 6 7 8 | 6 7 8 | 9 7 8
*
*
* 规则:
* 1. 将数组最后一个节点 覆盖 数组[0] 这个位置,将数组最后一个节点 设置为null,反映到树上 就是 将最后一个 节点 提升到 树顶层位置。
* 2. 上一步 提升的 这个节点,执行 向下 冒泡的逻辑。怎么向下冒泡?获取左右子节点 中 优先级 高的节点,去和优先级高的子节点比较 看 到底是 向下 冒泡节点的优先级高
* 还是 子节点优先级高,如果子节点 优先级高,则交换位置,反映到图上 就是向下 坠落了一层。
*/
4.1 字段
leader线程的设计,是Leader-Follower模式的变种,旨在于为了不必要的时间等待。当一个take线程变成leader线程时,只需要等待下一次的延迟时间,而不是leader线程的其他take线程则需要等leader线程出队列了才唤醒其他take线程。
// 初始化数组时的大小 16
private static final int INITIAL_CAPACITY = 16;
// 这个数组就是 满二叉树 的数组表示,存储提交的 优先级任务
// 这里其实存储的就是 ScheduledFutureTask 实例
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
// 可重入锁,添加任务 和 删除任务的时候 都需要加锁操作,确保 满二叉树 线程安全
private final ReentrantLock lock = new ReentrantLock();
// 表示延迟队列内存储的任务数量
private int size = 0;
/**
* leader字段? 一种策略
* 线程池内的某个线程 去 take() 获取任务时,如果 满二叉树 顶层节点不为null(队列内有任务),但是顶层节点 这个 任务 它还不到 交付时间,
* 这个怎么办?线程就去检查 leader 字段是否被占用,如果未被占用,则当前线程占用该字段,将线程自己 保存 到 leader字段
* 再下一步,当前线程 到 available 条件队列 指定超时时间(堆顶任务.time - now()) 的挂起。
*
* 如果 满二叉树顶层节点不为null(队列内有任务),但是顶层节点 这个 任务 它还不到 交付时间,
* 线程检查 leader 字段是否被占用,如果被占用了,则当前线程怎么办?直接到available 条件队列 “不指定”超时时间的 挂起。
*
* 不指定超时时间挂起,那什么时候会被唤醒呢?..
* leader 挂起时指定了 超时时间,其实 leader 在 available 条件队列 内 它是 首元素,它超时之后,会醒过来,
* 然后再次将 堆顶元素 获取走..
* 获取走之后,take()结束之前,会释放 lock,释放lock前 会做一件事,就是 available.signal() 唤醒下一个 条件队列 内的 等待者
* 下一个等待者 收到信号之后,去哪了 ? 去到 AQS 队列了,做 acquireQueue(node) 逻辑去了。
*/
private Thread leader = null;
/**
* 条件队列,很关键的一个东西..
*
* Condition signalled when a newer task becomes available at the
* head of the queue or a new thread may need to become leader.
*/
private final Condition available = lock.newCondition();
4.2 添加任务
/**
* 添加任务的入口
* @param x 任务(ScheduledFutureTask 实例)
*/
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
// 转换成RunnableScheduledFuture类型,e表示
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
// 内部的代码 肯定是 串行的!
try {
// i,当前任务队列 任务数量
int i = size;
if (i >= queue.length)// 条件成立,说明 任务数组 所有的slot都已经占用完了,需要扩容了..
// 扩容..
grow();
// size + 1,因为接下来就是存储任务 到 最小堆 的逻辑了,注意:i 还是原来的 size值。
size = i + 1;
if (i == 0) {// 条件成立,说明 当前 任务 是最小堆 的第一个节点,将它存放到 数组[0] 的这个位置,即可,不需要执行 向上冒泡的逻辑。
queue[0] = e;
setIndex(e, 0);
} else {
// 执行到这里,说明 最小堆 存在其它节点,新节点 需要执行向上冒泡的逻辑..
// i : 新插元素的下标
// e : 新提交的任务
siftUp(i, e);
}
if (queue[0] == e) {
// 两种情况:
// 1. 当前任务是 第一个加入到 queue 内的任务
// 2. 当前任务不是第一个加入到 queue 内的任务,但是当前任务 优先级 高啊,它上升成为了 堆顶 节点。
// 1. 当前任务是 第一个加入到 queue 内的任务
// 第一种情况:在当前任务 加入 到 queue 之前,take()线程会直接到 available 不设置超时的挂起。并不会去占用 leader 字段。
// available.signal() 时,会唤醒一个线程 让它去消费...
// 2. 当前任务不是第一个加入到 queue 内的任务,但是当前任务 优先级 高啊,它上升成为了 堆顶 节点。
// 这种情况下,leader 可能是 有值的,因为原 堆顶 任务 可能还未到 交付时间,leader线程正在 设置超时的 在 available 挂起呢..
// 此时 需要将 leader 线程 唤醒,唤醒之后 它 会检查 堆顶,如果堆顶任务 可以被消费,则直接获取走了..否则 继续 成为 leader 等待 新堆顶..
leader = null;
available.signal();
}
} finally {
// 释放锁
lock.unlock();
}
// 返回true 表示添加任务成功..
return true;
}
public void put(Runnable e) {
offer(e);
}
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable e, long timeout, TimeUnit unit) {
return offer(e);
}
扩容是1.5倍扩容:
/**
* 数组扩容的方法,每次扩容1.5倍
* Resizes the heap array. Call only when holding lock.
*/
private void grow() {
int oldCapacity = queue.length;
int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;
queue = Arrays.copyOf(queue, newCapacity);
}
/ * k: 新插元素的下标
* key: 新插元素
*/
private void siftUp(int k, RunnableScheduledFuture<?> key) {
// while 条件:k > 0, k 其实表示的是 新插元素的最终存储的下标,
// 隐含结束循环的条件是 “k == 0”,因为最顶层元素的下标是0,到0之后没办法再冒泡了..
while (k > 0) {
// 获取父节点 下标
int parent = (k - 1) >>> 1;
// 获取父节点 任务对象
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
// 条件成立:说明 父节点 优先级 是高于 key 任务的,停止向上冒泡的逻辑。
break;
// 执行到这里,说明 父节点 的优先级 是 低于 key 任务的,需要执行 冒泡逻辑..
// 父节点 占用 当前key 对象的 位置 即可,即下坠一层。
queue[k] = e;
// 设置heapIndex
setIndex(e, k);
// k 赋值为 原父节点的 下标位置
k = parent;
}
// 结束循环之后,说明当前key任务 找到 它的应许之地了
// k 这个下标表示的位置,就是 key 对象的应许之地
queue[k] = key;
// 设置任务的heapIndex
setIndex(key, k);
}
4.3 取走任务
// 线程池 线程数在 核心数量以内时(<= corePoolSize) 的时候,线程获取任务使用的 方法 是 take() 方法
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 获取锁,保证 队列线程安全
lock.lockInterruptibly();
try {
// 自旋,从该循环返回时,一定是获取到 任务了,或者 线程收到了 中断异常..
for (;;) {
// 获取堆顶任务(优先级最高的任务)
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
// 该条件成立,说明队列是空队列,没办法,线程只能在 条件队列 available 不设置超时的挂起..
// 什么时候会唤醒呢? offer时 添加第一个任务时,会signal..
available.await();
else {
// 执行到该分支,说明 任务队列 内存在其它任务
// 获取堆顶任务 距离 执行的 时间长度 ,纳秒
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)// 该条件成立,说明 堆顶任务 已经到达 交付时间了,需要立马返回 给 线程 去执行了..
return finishPoll(first);
// 执行到这里,说明什么?
// 说明堆顶任务 还未到 交付时间..
first = null; // don't retain ref while waiting
if (leader != null)
// 当前线程不设置 超时时间的挂起..
// 不用担心 醒不来的事,leader 它醒来之后,会获取 堆顶元素,返回之前 会 唤醒下一个 等待者线程的..
available.await();
else {
// 因为持锁线程是 自己,没人能给你 抢 leader,所以 这里没有使用CAS
// 获取当前线程
Thread thisThread = Thread.currentThread();
// leader 设置为当前线程
leader = thisThread;
try {
// 在条件队列available使用 带超时的挂起 (堆顶任务.time - now() 纳秒值..)
// 当 到达 阻塞时间时,当前线程会从这里醒过来,或者 offer 了 优先级更高 的任务时,offer线程也会唤醒你..
// (能从这里醒来,那肯定已经拿到lock了..)
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
// 释放leader字段
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
// 队列内还有其它 任务,此时 唤醒 条件队列内 下一个 等待者,让它去尝试 获取 新的 堆顶节点。
available.signal();
// 释放锁..
lock.unlock();
}
}
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
// --size ,堆顶任务 肯定是要返回走了,所以 减就行了。
// 将 减1之后的 size 赋值给 s,s表示 堆结构 最后一个节点的下标
int s = --size;
// 获取 堆结构 最后一个节点
RunnableScheduledFuture<?> x = queue[s];
// 将堆结构 最后一个节点占用的slot设置为null,因为 该节点 要升级成 堆顶,然后执行 下坠的逻辑了..
queue[s] = null;
if (s != 0)// s == 0 说明 当前 堆结构 只有堆顶一个节点,此时不需要做任何的事情..
// 执行if代码块内,说明 堆结构 的元素数量是 > 1 的。这个时候就需要执行 堆最后节点 下坠的逻辑了..
// k: 堆最后节点 当前开始下坠的位置
// x: 堆最后节点 (在这个时候,它已经认为自己是 堆顶了..)
siftDown(0, x);
// f 原堆顶,将它的 heapIndex 设置为 -1 ,表示从 堆内出去了..
setIndex(f, -1);
// 返回原堆顶任务..
return f;
}
// k: 堆最后节点 当前开始下坠的位置
// key: 堆最后节点 (在这个时候,它已经认为自己是 堆顶了..)
private void siftDown(int k, RunnableScheduledFuture<?> key) {
// 记住:size 在来到 siftDown 之前 已经-1过了.. k 值,最终要成为 key 对象 应该存储的 slot 下标.
int half = size >>> 1;
// half 干什么的呢?做为 while 循环条件的一个判断了..
// half 表示满二叉树 最底层 ,最左侧的节点 下标值,当 k >= half 时,说明 key 对象已经下落到 最底层了,再往下 没法再下坠了..
while (k < half) {
// child 最终表示 key 的左右子节点 中 优先级 高的 子节点 下标
// child 的初始值 是 key 的左子节点的 下标
int child = (k << 1) + 1;
// 获取左子节点任务对象
RunnableScheduledFuture<?> c = queue[child];
// 获取key的右子节点 下标
int right = child + 1;
// 条件1:right < size,成立 说明 key 存在右子节点
// 条件2:(条件1成立,key在该层时,存在右子节点),c key的左子节点 比较 右子节点
if (right < size && c.compareTo(queue[right]) > 0)
// 将优先级高的子节点 赋值给 c,优先级高的子节点的下标 赋值给 child
c = queue[child = right];
if (key.compareTo(c) <= 0)
// 条件成立,说明 key 的优先级 是高于 下层子节点的,说明已经下坠到 该到的层级了..不再继续下坠了..break 跳出循环。
break;
// 执行到这里,说明 key 优先级 是小于 下层子节点的 某个节点的。
// 与子节点 交换位置..对应就是 下坠一层..(对应c节点 向上升一层)
queue[k] = c;
// 设置 c的heapIndex..
setIndex(c, k);
// key 下坠一层,占用child 下标..
k = child;
}
// 执行完 while 循环 key对象 肯定已经找到它 自己的位置了..
// 进入自己的坐席..
queue[k] = key;
// 设置好自己的 heapIndex
setIndex(key, k);
}
5.DelayedWorkQueue与DelayQueue、PriorityBlocking区别
5.1 DelayQueue、PriorityBlocking区别
DelayQueue 与 PriorityBlockingQueue 有相似之处,它们都具有优先级队列的特性,因为它们底层都使用了二叉树数组实现。但 DelayQueue 比起 PriorityBlockingQueue 还多了一个延迟属性,可以设置延迟到某个时间再出列。
需要实现Delayed接口同时需要实现getDelay方法和compareTo方法,getDelay方法用于计算出队列时间,一旦小于0就会出队列;compareTo方法用于按触发时间从小到大排序。
对于 DelayQueue 的实现,其实也并不复杂。其直接使用了 PriorityBlockingQueue 来实现数据的维护,使用 ReentrantLock 实现并发控制。因为其使用 PriorityBlockingQueue 实现,而本质上 PriorityBlockingQueue 是有最大值限制的,所以其并不是无界队列,而是有界队列。
在 DelayQueue 的具体实现上,其使用了 leader 属性保存了第一个等待获取元素的线程,从而避免了过多线程进行 CPU 自旋浪费资源。此外,还使用了 awaitNano 方法最大限度地避免 CPU 无效空转,这些都是非常好的设计思路。
5.2 DelayedWorkQueue与DelayQueue区别
DelayedWorkQueue 类其实一个特殊的 DelayQueue 类,其唯一的不同是:DelayQueue 内部用 PriorityQueue 来维护元素的二叉树结构。而 DelayedWorkQueue 因为是专为线程池设计,所以其内部用 RunnableScheduledFuture 数组重新实现了一遍二叉树结构。
DelayedWorkQueue是一个无界队列,在队列元素满了以后会自动扩容,它并没有像DelayQueue那样,将队列操作委托给PriorityQueue,而是自己重新实现了一遍堆的核心操作——上浮、下沉。
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
/*
* A DelayedWorkQueue is based on a heap-based data structure
* like those in DelayQueue and PriorityQueue, except that
* every ScheduledFutureTask also records its index into the
* heap array. This eliminates the need to find a task upon
* cancellation, greatly speeding up removal (down from O(n)
* to O(log n)), and reducing garbage retention that would
* otherwise occur by waiting for the element to rise to top
* before clearing. But because the queue may also hold
* RunnableScheduledFutures that are not ScheduledFutureTasks,
* we are not guaranteed to have such indices available, in
* which case we fall back to linear search. (We expect that
* most tasks will not be decorated, and that the faster cases
* will be much more common.)
*
* All heap operations must record index changes -- mainly
* within siftUp and siftDown. Upon removal, a task's
* heapIndex is set to -1. Note that ScheduledFutureTasks can
* appear at most once in the queue (this need not be true for
* other kinds of tasks or work queues), so are uniquely
* identified by heapIndex.
*/
DelayedWorkQueue类似于DelayQueue和PriorityQueue,是基于“堆”的一种数据结构。区别就在于ScheduledFutureTask记录了它在堆数组中的索引,这个索引的好处就在于:取消任务时不再需要从数组中查找任务,极大的加速了remove操作,时间复杂度从O(n)降低到了O(log n),同时不用等到元素上升至堆顶再清除从而降低了垃圾残留时间。但是由于DelayedWorkQueue持有的是RunnableScheduledFuture接口引用而不是ScheduledFutureTask的引用,所以不能保证索引可用,不可用时将会降级到线性查找算法(我们预测大多数任务不会被包装修饰,因此速度更快的情况更为常见)。
所有的堆操作必须记录索引的变化 ————主要集中在siftUp和siftDown两个方法中。一个任务删除后他的headIndex会被置为-1。注意每个ScheduledFutureTask在队列中最多出现一次(对于其他类型的任务或者队列不一定只出现一次),所以可以通过heapIndex进行唯一标识。
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 因为在heapIndex中存储了索引
// indexOf的时间复杂度从线性搜索的O(n)
// 降低到了常量O(1)
int i = indexOf(x);
if (i < 0)
return false;
// heapIndex标记为-1,表示已删除
setIndex(queue[i], -1);
int s = --size;
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
// siftUp和siftDown操作完全二叉树时间复杂度为O(log n)
// 综合前面的O(1)+O(log n) ==> O(log n)
if (s != i) {
siftDown(i, replacement);
if (queue[i] == replacement)
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}
private int indexOf(Object x) {
if (x != null) {
if (x instanceof ScheduledFutureTask) {
// 如果是ScheduledFutureTask,可用heapIndex直接索引
int i = ((ScheduledFutureTask) x).heapIndex;
if (i >= 0 && i < size && queue[i] == x)
return i;
} else {
// 否则使用线性查找
for (int i = 0; i < size; i++)
if (x.equals(queue[i]))
return i;
}
}
return -1;
}