背景
自JDK1.5开始,JDK提供了ScheduledThreadPoolExecutor类来支持周期性任务的调度。在这之前的实现需要依靠Timer和TimerTask或者其它第三方工具来完成。但Timer有不少的缺陷:
Timer是单线程模式;
如果在执行任务期间某个TimerTask耗时较久,那么就会影响其它任务的调度;
Timer的任务调度是基于绝对时间的,对系统时间敏感;
Timer不会捕获执行TimerTask时所抛出的异常,由于Timer是单线程,所以一旦出现异常,则线程就会终止,其他任务也得不到执行。
实现方式
ScheduledThreadPoolExecutor继承ThreadPoolExecutor来重用线程池的功能,它的实现方式如下:
将任务封装成ScheduledFutureTask对象,ScheduledFutureTask基于相对时间,不受系统时间的改变所影响;
ScheduledFutureTask实现了java.lang.Comparable接口和java.util.concurrent.Delayed接口,所以有两个重要的方法:compareTo和getDelay。compareTo方法用于比较任务之间的优先级关系,如果距离下次执行的时间间隔较短,则优先级高;getDelay方法用于返回距离下次任务执行时间的时间间隔;
ScheduledThreadPoolExecutor定义了一个DelayedWorkQueue,它是一个有序队列,会通过每个任务按照距离下次执行时间间隔的大小来排序;
ScheduledFutureTask继承自FutureTask,可以通过返回Future对象来获取执行的结果。
可以看下与Timer实现的对比:
例子:
public class TestScheduledThreadPoolExecute {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
ScheduledFuture<?> scheduledFuture =null;
for(int i = 0; i< 3; i++) {
Task worker = new Task("线程" + i);
//只执行一次的, 五秒后执行
//scheduledExecutorService.schedule(worker, 5 ,TimeUnit.SECONDS);
// 周期性执行,每五秒执行一次
scheduledExecutorService.scheduleAtFixedRate(worker, 0, 5, TimeUnit.SECONDS);
}
// ScheduledFuture<?> scheduledFuture1 = scheduledExecutorService.scheduleAtFixedRate(new Task("测试暂停线程"), 0, 3, TimeUnit.SECONDS);
//
// try {
// Thread.sleep(10000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
//取消线程池中的定时周期任务。
// scheduledFuture1.cancel(true);
System.out.println("Shutting down executor...");
// 直接关闭线程池
// scheduledExecutorService.shutdown();
boolean isDone;
// 等待线程池终止
do {
isDone = scheduledExecutorService.awaitTermination(1, TimeUnit.MINUTES);
System.out.println("awaitTermination...");
} while(!isDone);
System.out.println("Finished all threads");
}
static class Task implements Runnable{
private String name;
public Task(String name){
this.name = name;
}
@Override
public void run() {
System.out.println("name = " + name + ", startTime = " + new Date());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("name = " + name + ", endTime = " + new Date());
}
}
}
类图
源码分析
先看下类:
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
....
}
实现了ScheduledExecutorService,最顶层是Executor,继承了ThreadPoolExecutor。该接口定义了schedule等任务调度的方法。
同时ScheduledThreadPoolExecutor有两个重要的内部类:DelayedWorkQueue和ScheduledFutureTask。
看到,DelayeddWorkQueue是一个阻塞队列,而ScheduledFutureTask继承自FutureTask,并且实现了Delayed接口,后续会专门讲。有关FutureTask的介绍请参考另一篇文章:FutureTask源码解析(https://juejin.im/post/5d7a30f1f265da039d32fc02)。
构造方法
//设置内部核心线程数为corePoolSize, 最大工作线程数量为Integer的最大值
// threadFactory 指定创建线程的线程工厂
// RejectedExecutionHandler 指定线程池线程数量超出后的执行策略。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
几个重要的全局变量:
/**
* 如果应该在关闭时取消/禁止定期任务,则为false,用来判断是否在关闭时取消或禁止定期任务
* 默认为false
* /
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
/**
* 用来判断是否在关闭时取消非周期性任务,则为false,默认为true
*/
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
/**
* 如果ScheduledFutureTask.cancel应从队列中删除,则为True
*/
private volatile boolean removeOnCancel = false;
/**
*用于中断调度关系的序列号,以及*保证绑定条目之间的FIFO顺序
*/
private static final AtomicLong sequencer = new AtomicLong();
schedule方法
使用该方法进行任务调度时,主要是执行非周期性任务调度。看源码:(这里使用正常的线程,无返回值)
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
//校验参数
if (command == null || unit == null)
throw new NullPointerException();
// 在把任务进行调度之前,使用装饰者模式,先进行封装成RunnableScheduledFuture对象。
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
//加入延迟队列
delayedExecute(t);
return t;
}
//执行具备返回值的任务
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit))); //返回延迟操作的触发时间戳
delayedExecute(t);
return t;
}
上面两种任务调度方法都使用decorateTask方法转换对象,一般使用默认的,也可以重写来实现。
然后,通过调用delayedExecute方法来延时执行任务。最后,返回一个ScheduledFuture对象。
//修改或替换用于执行 runnable 的任务。此方法可重写用于管理内部任务的具体类。默认实现只返回给定任务。
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
//修改或替换用于执行 callable 的任务
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V>callable,RunnableScheduledFuture<V> task) {
return task;
}
decorateTask可以看出,该方法将返回一个ScheduledFutureTask对象,ScheduledThreadPoolExecutor就是通过该对象控制任务的调度执行。
ScheduledFutureTask对象
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result); //设置执行线程和返回结果
this.time = ns; //控制在几个时间段之后开始执行
this.period = 0; //是否周期性任务
this.sequenceNumber = sequencer.getAndIncrement(); //线程入队列的index以及顺序。要保证线程安全。
}
总结:
该方法为定时执行任务调度,不是周期性执行的,可以分为两种,是否有任务结果返回值。先把进入线程池ScheduledThreadPoolExecutor的任务封装为一个内部类ScheduledFutureTask,然后调用delayedExecute,去控制执行的时间和周期。
delayedExecute()方法在后面一起讲。先来看下
对比scheduleAtFixedRate与scheduleWithFixedDelay方法
/**
* @param command 任务线程
* @param initialDelay 何时开始执行该任务
* @param period 执行周期
* @param unit 执行周期单位
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t; //这里实际操作是执行定期重新入队操作,来实现周期性执行。
delayedExecute(t);
return t;
}
该方法设置了执行周期,下一次执行时间相当于是上一次的执行时间加上period,它是采用已固定的频率来执行任务
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay)); //这里把周期设置为负数来表示是相对固定的延迟执行
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
该方法设置了执行周期,与scheduleAtFixedRate方法不同的是,下一次执行时间是上一次任务执行完的系统时间加上period,因而具体执行时间不是固定的,但周期是固定的,是采用相对固定的延迟来执行任务:
两者的区别还可以在ScheduledFutureTask的run方法调用的setNextRunTime方法中就可以看出来:
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime(); //该方法
reExecutePeriodic(outerTask);
}
}
private void setNextRunTime() {
long p = period; //默认为0,表示非周期性
if (p > 0) //是否周期性
time += p;
else
time = triggerTime(-p);
}
triggerTime
triggerTime方法用于获取下一次执行的具体时间:
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
//相当于转换后的该时间单位的持续时间戳。long
public long toNanos(long duration) {...}
这里的delay < (Long.MAX_VALUE >> 1是为了判断是否要防止Long类型溢出,如果delay的值小于Long类型最大值的一半,则直接返回delay,否则需要进行防止溢出处理。
overflowFree方法
作用是限制队列中所有节点的延迟时间在Long.MAX_VALUE之内,防止在compareTo方法中溢出
private long overflowFree(long delay) {
// 获取队列中的第一个节点
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
// 获取延迟时间
long headDelay = head.getDelay(NANOSECONDS);
// 如果延迟时间小于0,并且 delay - headDelay 超过了Long.MAX_VALUE
// 将delay设置为 Long.MAX_VALUE + headDelay 保证delay小于Long.MAX_VALUE
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
ScheduledFutureTask类
ScheduledFutureTask继承自FutureTask并实现了RunnableScheduledFuture接口
构造方法如下:
//使用给定的ns时间,触发一次任务
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
//使用给定的ns时间,在period内周期性触发任务执行
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
//使用给定的ns时间,触发一次任务,具备任务执行返回值
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
属性:
time:下次任务执行时的时间;
period:执行周期;
sequenceNumber:保存任务被添加到ScheduledThreadPoolExecutor中的序号。
delayedExecute()
在上面的几个schedule方法执行时,创建完ScheduledFutureTask对象之后,会执行delayedExecute方法来执行任务。
private void delayedExecute(RunnableScheduledFuture<?> task) {
<!-- 如果线程池已经关闭,使用拒绝策略拒绝任务-->
if (isShutdown())
reject(task);
else {
//添加到父类的阻塞队列
super.getQueue().add(task);
/**
* 1. 判断是否处于关闭状态,如果不是走else
* 2. canRunInCurrentRunState(boolean periodic) 是否在应该在关闭时取消/禁止定期任务,根据是否周期性任务。
如果可以在给定当前运行状态*和运行后关闭参数的情况下运行任务,则返回true。
这里可以设置setContinueExistingPeriodicTasksAfterShutdownPolicy方法设置在线程池关闭时,
周期任务继续执行,默认为false,也就是线程池关闭时,不再执行周期任务。
* 3. remove(task) 移除任务,从工作队列中移出任务。
*/
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false); //如果移除任务操作成功,则该任务暂停。这里false
是指在任务开始后,不尝试中断线程来停止任务。
else
//确保至少有一个线程在执行,加入工作等待队列
ensurePrestart();
}
}
ensurePrestart,对该方法有比较像了解的,可以看这篇文章的分析:
从源码来看JDK8线程池ThreadPoolExecutor的实现原理(一)
https://juejin.im/post/5d688686e51d4561ce5a1c8b
从源码来看JDK8线程池ThreadPoolExecutor的实现原理(二)
https://juejin.im/post/5d67e5b4e51d4561f64a0849
线程池中的工作线程是通过该方法来启动并执行任务的。
//在ThreadPoolExecutor 中实现,
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
ScheduledFutureTask的run方法
回顾一下线程池的执行过程:当线程池中的工作线程启动时,不断地从阻塞队列中取出任务并执行,当然,取出的任务实现了Runnable接口,所以是通过调用任务的run方法来执行任务的。
这里的任务类型是ScheduledFutureTask,所以下面看一下ScheduledFutureTask的run方法:
public void run() {
// 是否是周期性任务
boolean periodic = isPeriodic();
// 如果为周期性任务,!canRunInCurrentRunState 结果为 false
// 如果为非周期性任务 !canRunInCurrentRunState 结果为 true
// 如果当前线程池运行状态不可以执行任务,取消该任务,然后直接返回,否则执行步骤2;可以看下面分析
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果不是周期性任务,调用FutureTask中的run方法执行
else if (!periodic) //步骤二
ScheduledFutureTask.super.run();
// 如果是周期性任务,调用FutureTask中的runAndReset方法执行
// runAndReset方法不会设置执行结果,所以可以重复执行任务
else if (ScheduledFutureTask.super.runAndReset()) {
// 计算下次执行该任务的时间
setNextRunTime();
// 重复执行任务
reExecutePeriodic(outerTask);
}
}
canRunInCurrentRunState分析
如果当前线程池运行状态不可以执行任务,取消该任务,然后直接返回,否则执行步骤2;
canRunInCurrentRunState
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
//判断任务状态以及如果当前线程池shutdown状态下是否继续执行任务。
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
cancel 移除该任务,mayInterruptIfRunning 判断任务正在进行是否打断。
public boolean cancel(boolean mayInterruptIfRunning) {
// 调用父类 futureTask的cancel方法。
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
//super.cancel
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
//从工作队列中删除该任务。
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
setNextRunTime 设置下次执行时间
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
有关FutureTask的run方法和runAndReset方法
可以看这篇文章分析:
https://juejin.im/post/5d7a30f1f265da039d32fc02
分析一下run执行过程:
如果当前线程池运行状态不可以执行任务,取消该任务,然后直接返回,否则执行步骤2;
如果不是周期性任务,调用FutureTask中的run方法执行,会设置执行结果,然后直接返回,否则执行步骤3;
如果是周期性任务,调用FutureTask中的runAndReset方法执行,不会设置执行结果,然后直接返回,否则执行步骤4和步骤5;
计算下次执行该任务的具体时间;
重复执行任务。
ScheduledFutureTask的reExecutePeriodic方法
除非当前运行状态排除它,否则重新定时周期性任务。 *与delayedExecute相同的想法除了丢弃任务而不是拒绝。
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
//判断shutdown状态下,是否能任务执行。
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
//该方法和delayedExecute方法类似,不同的是:
由于调用reExecutePeriodic方法时已经执行过一次周期性任务了,所以不会reject当前任务;
传入的任务一定是周期性任务。
onShutdown方法
onShutdown方法是ThreadPoolExecutor中的钩子方法,在ThreadPoolExecutor中什么都没有做,参考深入理解Java线程池(https://juejin.im/post/5d67e5b4e51d4561f64a0849):ThreadPoolExecutor,该方法是在执行shutdown方法时被调用
//由于关闭策略,取消并清除不应运行*的所有任务的队列。在super.shutdown中调用
@Override void onShutdown() {
BlockingQueue<Runnable> q = super.getQueue();
// 获取在线程池已 shutdown 的情况下是否继续执行现有延迟任务
boolean keepDelayed =
getExecuteExistingDelayedTasksAfterShutdownPolicy();
// 获取在线程池已 shutdown 的情况下是否继续执行现有定期任务
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();
// 如果在线程池已 shutdown 的情况下不继续执行延迟任务和定期任务
// 则依次取消任务,否则则根据取消状态来判断
if (!keepDelayed && !keepPeriodic) {
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
}
else {
// Traverse snapshot to avoid iterator exceptions
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t =
(RunnableScheduledFuture<?>)e;
// 如果有在 shutdown 后不继续的延迟任务或周期任务,则从队列中删除并取消任务
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) { // also remove if already cancelled
if (q.remove(t))
t.cancel(false);
}
}
}
}
tryTerminate();
}
DelayedWorkQueue
ScheduledThreadPoolExecutor之所以要自己实现阻塞的工作队列,是因为ScheduledThreadPoolExecutor要求的工作队列有些特殊。
DelayedWorkQueue是一个基于堆的数据结构,类似于DelayQueue和PriorityQueue。在执行定时任务的时候,每个任务的执行时间都不同,所以DelayedWorkQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面(注意:这里的顺序并不是绝对的,堆中的排序只保证了子节点的下次执行时间要比父节点的下次执行时间要大,而叶子节点之间并不一定是顺序的,下文中会说明)。
堆结构如下图所示:
可见,DelayedWorkQueue是一个基于最小堆结构的队列。堆结构可以使用数组表示,可以转换成如下的数组:
在这种结构中,可以发现有如下特性:
假设,索引值从0开始,子节点的索引值为k,父节点的索引值为p,则:
- 一个节点的左子节点的索引为:k = p * 2 + 1;
- 一个节点的右子节点的索引为:k = (p + 1) * 2;
- 一个节点的父节点的索引为:p = (k - 1) / 2。
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
// 队列初始容量
private static final int INITIAL_CAPACITY = 16;
// 根据初始容量创建RunnableScheduledFuture类型的数组
private RunnableScheduledFuture<?>[] queue=newRunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
// leader线程
private Thread leader = null;
// 当较新的任务在队列的头部可用时,或者新线程可能需要成为leader,则通过该条件发出信号
private final Condition available = lock.newCondition();
}
首先可以看到继承了阻塞队列的特性。
接着注意这里的leader,它是Leader-Follower模式的变体,用于减少不必要的定时等待。什么意思呢?对于多线程的网络模型来说:
所有线程会有三种身份中的一种:leader和follower,以及一个干活中的状态:proccesser。它的基本原则就是,永远最多只有一个leader。而所有follower都在等待成为leader。线程池启动时会自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己就去干活了,去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。
想了解更多可以看这里:
https://blog.csdn.net/goldlevi/article/details/7705180
具体leader的作用在分析take方法时再详细介绍。
offer
入队的操作如add和put方法都调用了offer方法,下面查看一下offer
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// queue是一个RunnableScheduledFuture类型的数组,如果容量不够需要扩容
if (i >= queue.length)
grow();
size = i + 1;
//如果此时长度为0,设置首个节点
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
//筛选元素在底部添加到其堆排序点。 *仅在持有锁时才调用
siftUp(i, e);
}
// 唤醒其他等待线程。
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
流程:
加锁后,先判断是否需要扩容,如果需要则先扩容。
紧接着如果队列长度为空,直接设置首个节点。否则需要进行排序后加入队列。
唤醒其他的可能条件的线程。
释放锁。
重点看下这个方法siftUp(int k, RunnableScheduledFuture<?> key)
siftUp 重点
//筛选元素在底部添加到其堆排序点。 *仅在持有锁时才调用
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
// 找到父节点的索引
int parent = (k - 1) >>> 1;
// 获取父节点
RunnableScheduledFuture<?> e = queue[parent];
// 如果key节点的执行时间大于父节点的执行时间,不需要再排序了
if (key.compareTo(e) >= 0) //如果返回1,则当前节点执行时间比较晚,优先级低,跳出while,不交换
break;
// 如果key.compareTo(e)<0,说明key节点的执行时间小于父节点的执行时间,需要把父节点移到后面,
queue[k] = e;
// 设置索引为k
setIndex(e, k);
k = parent; //这里是设置子节点的索引位置为原来父节点的位置。
}
// key设置为排序后的位置中
queue[k] = key;
setIndex(key, k);
}
//通过重写该方法,进行比较是是比较距离执行时间越近的优先级越大。
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time; //当前节点的执行时间减去父节点的时间
if (diff < 0) //当前节点的执行时间比较早,优先级高
return -1;
else if (diff > 0) //当前节点的执行时间比较晚,优先级低
return 1;
else if (sequenceNumber < x.sequenceNumber) //如果相等,则比较队列序列号
return -1;
else
return 1;
}
//获取延迟时间
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
就是循环的根据key节点与它的父节点来判断,如果key节点的执行时间小于父节点,则将两个节点交换,使执行时间靠前的节点排列在队列的前面。
假设新入队的节点的延迟时间(调用getDelay()方法获得)是5,执行过程如下:
1.先将新的节点添加到数组的尾部,这时新节点的索引k为7:
2.计算新父节点的索引:parent = (k - 1) >>> 1,parent = 3,那么queue[3]的时间间隔值为8,因为 5 < 8 ,将执行queue[7] = queue[3]:
3.这时将k设置为3,继续循环,再次计算parent为1,queue[1]的时间间隔为3,因为 5 > 3 ,这时退出循环,最终k为3:
可见,每次新增节点时,只是根据父节点来判断,而不会影响兄弟节点。
另外,setIndex方法只是设置了ScheduledFutureTask中的heapIndex属性:
private void setIndex(RunnableScheduledFuture<?> f, int idx) {
if (f instanceof ScheduledFutureTask)
((ScheduledFutureTask)f).heapIndex = idx;
}
take方法
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
// 计算当前时间到执行时间的时间间隔
long delay = first.getDelay(NANOSECONDS);
//如果执行时间已经早于当前时间了,立即替换队列中的第一个,并执行
if (delay <= 0)
//finishPoll 将第一个元素替换为最后一个并向下筛选
return finishPoll(first);
first = null; // 等待时不要保留ref
// leader不为空,阻塞线程
else {
// leader为空,则把leader设置为当前线程,
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 阻塞到执行时间
available.awaitNanos(delay);
} finally {
// 设置leader = null,让其他线程执行available.awaitNanos(delay);
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果leader不为空,则说明leader的线程正在执行available.awaitNanos(delay);
// 如果queue[0] == null,说明队列为空
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
//将第一个元素替换为最后一个并将其向下筛选。仅在*持有锁定时调用。
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
take方法是什么时候调用的呢?在深入理解Java线程池:ThreadPoolExecutor中(https://juejin.im/post/5d67e5b4e51d4561f64a0849),介绍了getTask方法,工作线程会循环地从workQueue中取任务。但定时任务却不同,因为如果一旦getTask方法取出了任务就开始执行了,而这时可能还没有到执行的时间,所以在take方法中,要保证只有在到指定的执行时间的时候任务才可以被取走。
再来说一下leader的作用,这里的leader是为了减少不必要的定时等待,当一个线程成为leader时,它只等待下一个节点的时间间隔,但其它线程无限期等待。leader线程必须在从take()或poll()返回之前signal其它线程,除非其他线程成为了leader。
举例来说,如果没有leader,那么在执行take时,都要执行available.awaitNanos(delay),假设当前线程执行了该段代码,这时还没有signal,第二个线程也执行了该段代码,则第二个线程也要被阻塞。多个这时执行该段代码是没有作用的,因为只能有一个线程会从take中返回queue[0](因为有lock),其他线程这时再返回for循环执行时取的queue[0],已经不是之前的queue[0]了,然后又要继续阻塞。
所以,为了不让多个线程频繁的做无用的定时等待,这里增加了leader,如果leader不为空,则说明队列中第一个节点已经在等待出队,这时其它的线程会一直阻塞,减少了无用的阻塞(注意,在finally中调用了signal()来唤醒一个线程,而不是signalAll())。
poll
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//自旋
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
//如果该队列为null
if (first == null) {
if (nanos <= 0) //如果已经到时间超时,则返回null
return null;
else //阻塞等待
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
// 如果delay <= 0,说明已经到了任务执行的时间,返回。
if (delay <= 0)
return finishPoll(first);
// 如果nanos <= 0,说明已经超时,返回null
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
// nanos < delay 说明需要等待的时间小于任务要执行的延迟时间
// leader != null 说明有其它线程正在对任务进行阻塞
// 这时阻塞当前线程nanos纳秒
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 这里的timeLeft表示delay减去实际的等待时间
long timeLeft = available.awaitNanos(delay);
// 计算剩余的等待时间
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
总结
本文详细分析了ScheduedThreadPoolExecutor的实现,主要介绍了以下方面:
与Timer执行定时任务的比较,相比Timer,ScheduedThreadPoolExecutor有什么优点:
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,所以它也是一个线程池,也有coorPoolSize和workQueue,ScheduledThreadPoolExecutor特殊的地方在于,自己实现了优先工作队列DelayedWorkQueue;
ScheduedThreadPoolExecutor实现了ScheduledExecutorService,所以就有了任务调度的方法,如schedule,scheduleAtFixedRate和scheduleWithFixedDelay,同时注意他们之间的区别;
内部类ScheduledFutureTask继承自FutureTask,实现了任务的异步执行并且可以获取返回结果。同时也实现了Delayed接口,可以通过getDelay方法获取将要执行的时间间隔;
周期任务的执行其实是调用了FutureTask类中的runAndReset方法,每次执行完不设置结果和状态。参考FutureTask源码解析;
详细分析了DelayedWorkQueue的数据结构,它是一个基于最小堆结构的优先队列,并且每次出队时能够保证取出的任务是当前队列中下次执行时间最小的任务。同时注意一下优先队列中堆的顺序,堆中的顺序并不是绝对的,但要保证子节点的值要比父节点的值要大,这样就不会影响出队的顺序。
总体来说,ScheduedThreadPoolExecutor的重点是要理解下次执行时间的计算,以及优先队列的出队、入队和删除的过程,这两个是理解ScheduedThreadPoolExecutor的关键。
待续。。。。
部分摘自:
Leader/Follower多线程网络模型介绍
https://blog.csdn.net/goldlevi/article/details/7705180
ThreadPoolExecutor的实现原理
https://juejin.im/post/5d67e5b4e51d4561f64a0849
并发之异步计算任务FutureTask源码jdk1.8解读
https://juejin.im/post/5d7a30f1f265da039d32fc02
深入理解Java线程池:ScheduledThreadPoolExecutor
http://www.ideabuffer.cn/2017/04/14/%E6%B7%B1%E5%85%A5%E7%90%86%E8%A7%A3Java%E7%BA%BF%E7%A8%8B%E6%B1%A0%EF%BC%9AScheduledThreadPoolExecutor/