1.官方文档
A ThreadPoolExecutor that can additionally schedule commands to
run after a given delay, or to execute periodically. This class is
preferable to Timer when multiple worker threads are needed, or when
the additional flexibility or capabilities of ThreadPoolExecutor (which
this class extends) are required.
Delayed tasks execute no sooner than they are enabled, but without
any real-time guarantees about when, after they are enabled, they will
commence. Tasks scheduled for exactly the same execution time are
enabled in first-in-first-out (FIFO) order of submission.
When a submitted task is cancelled before it is run, execution is
suppressed. By default, such a cancelled task is not automatically
removed from the work queue until its delay elapses. While this
enables further inspection and monitoring, it may also cause
unbounded retention of cancelled tasks. To avoid this, set
setRemoveOnCancelPolicy(boolean) to true, which causes tasks to
be immediately removed from the work queue at time of cancellation.
Successive executions of a task scheduled via scheduleAtFixedRate
or scheduleWithFixedDelay do not overlap. While different executions
may be performed by different threads, the effects of prior executions
happen-before those of subsequent ones.
While this class inherits from ThreadPoolExecutor, a few of the
inherited tuning methods are not useful for it. In particular, because it
acts as a fixed-sized pool using corePoolSize threads and an
unbounded queue, adjustments to maximumPoolSize have no useful
effect. Additionally, it is almost never a good idea to set corePoolSize
to zero or use allowCoreThreadTimeOut because this may leave the
pool without threads to handle tasks once they become eligible to run.
Extension notes: This class overrides the execute and submit
methods to generate internal ScheduledFuture objects to control per-
task delays and scheduling. To preserve functionality, any further
overrides of these methods in subclasses must invoke superclass
versions, which effectively disables additional task customization.
However, this class provides alternative protected extension method
decorateTask (one version each for Runnable and Callable) that can
be used to customize the concrete task types used to execute
commands entered via execute, submit, schedule,
scheduleAtFixedRate, and scheduleWithFixedDelay. By default, a
ScheduledThreadPoolExecutor uses a task type extending
FutureTask. However, this may be modified or replaced using
subclasses of the form
可在给定延迟后执行任务,或定期执行。当需要多线程,或需要线程池额外的灵活性或功能性时,ScheduledThreadPoolExecutor优于Timer。
延迟任务的执行没有实时保证。
在提交的任务在运行之前取消时,将禁止执行。默认情况下,此类已取消的任务不会自动从工作队列中删除,直到其延迟结束。可以进行进一步检查和监控,但也可能导致取消任务的无限制滞留。要避免这种情况,请将setRemoveOnCancelPolicy(boolean)设置为true,这会使得在取消时立即从工作队列中删除任务。
通过scheduleAtFixedRate或scheduleWithFixedDelay任务调度来进行连续任务执行,不会导致任务执行重叠。虽然可能不同执行由不同的线程进行,但是先前执行happen-before后续的执行。
虽然这个类继承自ThreadPoolExecutor,但是一些继承的调整方法对它没用。特别是,因为使用corePoolSize线程和无界队列,所以对maximumPoolSize的调整没有任何有用的效果。此外,将corePoolSize设置为零或使用allowCoreThreadTimeOut不是一个好主意,因为会使线程池没有线程来处理到期的定时任务。
扩展注释:此类重写execute和submit方法生成内部ScheduledFuture对象,以控制每个任务的延迟和调度。为了保留功能,子类中这些方法的任何进一步重写必须调用超类版本。但是,此类提供了替代的保护扩展方法decorateTask(Runnable和Callable各一个版本),可用于自定义由execute、submit、schedule、scheduleAtFixedRate和scheduleWithFixedDelay提交的具体任务类型。默认情况下,ScheduledThreadPoolExecutor使用扩展FutureTask的任务类型。但是,可以使用以下形式的子类来修改或替换它:
public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable r, RunnableScheduledFuture<V> task) {
return new CustomTask<V>(r, task);
}
protected <V> RunnableScheduledFuture<V> decorateTask(
Callable<V> c, RunnableScheduledFuture<V> task) {
return new CustomTask<V>(c, task);
}
// ... add constructors, etc.
}
This class specializes ThreadPoolExecutor implementation by
1. Using a custom task type ScheduledFutureTask, even for tasks
that don't require scheduling because they are submitted
using ExecutorService rather than ScheduledExecutorService
methods, which are treated as tasks with a delay of zero.
2. Using a custom queue (DelayedWorkQueue), a variant of
unbounded DelayQueue. The lack of capacity constraint and
the fact that corePoolSize and maximumPoolSize are
effectively identical simplifies some execution mechanics
(see delayedExecute) compared to ThreadPoolExecutor.
3. Supporting optional run-after-shutdown parameters, which
leads to overrides of shutdown methods to remove and cancel
tasks that should NOT be run after shutdown, as well as
different recheck logic when task (re)submission overlaps
with a shutdown.
4. Task decoration methods to allow interception and
instrumentation, which are needed because subclasses cannot
otherwise override submit methods to get this effect. These
don't have any impact on pool control logic though.
1.使用自定义任务类型ScheduledFutureTask,即使对于不需要调度的任务,因为它们是使用ExecutorService而不是ScheduledExecutorService方法提交的,这些方法被视为延迟为零的任务。
2.使用自定义队列(DelayedWorkQueue),无界DelayQueue的变体。与ThreadPoolExecutor相比,缺少容量约束以及corePoolSize和maximumPoolSize实际上相同的事实简化了一些执行机制(请参阅delayedExecute)。
3.支持可选的run-after-shutdown参数,这会导致关闭方法的覆盖,以删除和取消关闭后不应运行的任务,以及任务(重新)提交与关闭重叠时的不同重新检查逻辑。
4.允许拦截和检测的任务装饰方法,因为子类不能覆盖submit方法。但这些对池控制逻辑没有任何影响。
2.构造器
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
使用的是自定义的DelayedWorkQueue。
3.提交任务
3.1 schedule一次性任务
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
public Future<?> submit(Runnable task) {
return schedule(task, 0, NANOSECONDS);
}
public <T> Future<T> submit(Runnable task, T result) {
return schedule(Executors.callable(task, result), 0, NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}
核心是:
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
/**
* Main execution method for delayed or periodic tasks. If pool
* is shut down, rejects the task. Otherwise adds task to queue
* and starts a thread, if necessary, to run it. (We cannot
* prestart the thread to run the task because the task (probably)
* shouldn't be run yet.) If the pool is shut down while the task
* is being added, cancel and remove it if required by state and
* run-after-shutdown parameters.
*
* @param task the task
*/
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
- step1.如果线程池已关闭,则拒绝任务
- step2.将任务加入到队列中。不能预启动线程,因为任务可能还没有到达运行的时间。
- step3.如果任务加入后,线程池已经关闭,且状态和run-after-shutdown参数不允许运行,则删除并取消任务
*step4.ensurePrestart(),正常情况下,确保有线程可以执行任务
executeExistingDelayedTasksAfterShutdown如果为false(默认为true),则应该在关闭后取消非周期性任务。
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
3.2 scheduleAtFixedRate和scheduleWithFixedDelay重复执行的任务
scheduleAtFixedRate和scheduleWithFixedDelay任务调度的核心与 schedule一样,都是通过delayedExecute(t)进行调度。不同点在于ScheduledFutureTask构造的不同。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
4.run-after-shutdown参数
/**
* Returns true if can run a task given current run state
* and run-after-shutdown parameters.
*
* @param periodic true if this task periodic, false if delayed
*/
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
/**
* False if should cancel/suppress periodic tasks on shutdown.
*/
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
/**
* False if should cancel non-periodic tasks on shutdown.
*/
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
continueExistingPeriodicTasksAfterShutdown = value;
if (!value && isShutdown())
onShutdown();
}
public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
executeExistingDelayedTasksAfterShutdown = value;
if (!value && isShutdown())
onShutdown();
}
continueExistingPeriodicTasksAfterShutdown如果为false(默认为false),则应该在关闭后取消或者禁止周期性任务。
5.ScheduledFutureTask
5.1 域
/** Sequence number to break ties FIFO */
private final long sequenceNumber;
/** The time the task is enabled to execute in nanoTime units */
private long time;
/**
* Period in nanoseconds for repeating tasks. A positive
* value indicates fixed-rate execution. A negative value
* indicates fixed-delay execution. A value of 0 indicates a
* non-repeating task.
*/
private final long period;
/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture<V> outerTask = this;
/**
* Index into delay queue, to support faster cancellation.
*/
int heapIndex;
- sequenceNumber是用于打破平局的序列号。
- time任务执行的时间
- period重复执行任务的周期,正值表示以固定的速率执行,负值表示固定延迟执行,0表示非重复执行的任务。
- outerTask表示在reExecutePeriodic中重新入队的实际任务。
- heapIndex延迟队列的索引,支持更快的取消。
5.2 构造器
接下来看看区分schedule、scheduleAtFixedRate和scheduleWithFixedDelay的关键:
new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))
schedule调用的是如下构造器,设置period为0:
/**
* Creates a one-shot action with given nanoTime-based trigger time.
*/
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period))
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay))
调用的是如下构造器:
/**
* Creates a periodic action with given nano time and period.
*/
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
5.3 compareTo方法使用sequenceNumber打破time相同的平局
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
先比较time,然后比较sequenceNumber,用来打破time相等时的平局。
5.4 setNextRuntime
setNextRuntime是scheduleAtFixedRate和scheduleWithFixedDelay的关键区别:
/**
* Sets the next time to run for a periodic task.
*/
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
- p > 0 表示以固定速率运行任务,因此time设置为上次任务启动运行时间向后延迟p
- p < 0 表示以固定的延迟运行任务,time设置为当前任务完成时间now()向后延迟delay
5.5 heapIndex会加速cancel
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
heapIndex会加速删除任务在remove(this)中的删除。
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
调用DelayedWorkQueue中的remove(task):
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(x);
if (i < 0)
return false;
setIndex(queue[i], -1);
int s = --size;
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
if (s != i) {
siftDown(i, replacement);
if (queue[i] == replacement)
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}
private int indexOf(Object x) {
if (x != null) {
if (x instanceof ScheduledFutureTask) {
int i = ((ScheduledFutureTask) x).heapIndex;
// Sanity check; x could conceivably be a
// ScheduledFutureTask from some other pool.
if (i >= 0 && i < size && queue[i] == x)
return i;
} else {
for (int i = 0; i < size; i++)
if (x.equals(queue[i]))
return i;
}
}
return -1;
}
5.6 核心的run
/**
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
/**
* Requeues a periodic task unless current run state precludes it.
* Same idea as delayedExecute except drops task rather than rejecting.
*
* @param task the task
*/
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
- 对于一次性任务,调用FutureTask.run执行
- 对于周期性任务
step1.首先调用FutureTask.runAndReset执行任务并重置future为初始化状态(与run相比,未调用set(result),因此所有状态都是初始状态);
step2.然后调用setNextRunTime()重置下次任务运行的时间time;
step3.最后将周期性任务重新入队。
6.DelayedWorkQueue
定制延迟队列,为了与TPE声明一致,该类必须声明为一个BlockingQueue<Runnable>,尽管其只能存放RunnableScheduledFutures。
A DelayedWorkQueue is based on a heap-based data structure
like those in DelayQueue and PriorityQueue, except that
every ScheduledFutureTask also records its index into the
heap array. This eliminates the need to find a task upon
cancellation, greatly speeding up removal (down from O(n)
to O(log n)), and reducing garbage retention that would
otherwise occur by waiting for the element to rise to top
before clearing. But because the queue may also hold
RunnableScheduledFutures that are not ScheduledFutureTasks,
we are not guaranteed to have such indices available, in
which case we fall back to linear search. (We expect that
most tasks will not be decorated, and that the faster cases
will be much more common.)
All heap operations must record index changes -- mainly
within siftUp and siftDown. Upon removal, a task's
heapIndex is set to -1. Note that ScheduledFutureTasks can
appear at most once in the queue (this need not be true for
other kinds of tasks or work queues), so are uniquely
identified by heapIndex.
DelayedWorkQueue是基于堆的数据结构,如同DelayQueue和PriorityQueue一样。ScheduledFutureTask会记录其在堆数组中索引,这会消除在取消时查找任务的操作,大大加快了移除操作(从O(n)到O(lgn)),并减少了垃圾滞留,否则需要等到垃圾上升到堆顶才会被清除。但是因为队列也可能包含不是ScheduledFutureTasks的RunnableScheduledFutures,所以我们不能保证有这样的索引可用,在这种情况下我们会回到线性搜索。
所有堆操作都必须记录索引更改 - 主要在siftUp和siftDown中。删除后,任务的heapIndex设置为-1。请注意,ScheduledFutureTasks最多可以出现在队列中一次(对于其他类型的任务或工作队列,不一定是这样),因此由heapIndex唯一标识。
6.1 入队和出队
入队:
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
在ThreadPoolExecutor.Worker.run.runWorker()中:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
getTask会从队列中取任务:
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
6.2 Leader-Follower模式
参考并发容器BlockingQueue - DelayQueue及Leader-Follower模式
leader是等待队列头部元素的指定线程。Leader-Follower模式的这种变体用于最小化不必要的定时等待。
- 当一个线程称为leader时,其会定时等待下一个delay元素过期,但是其他线程会无限期等待。
- 当从take/poll返回之前,leader线程必须signal其他等待线程,除非在此期间有线程称为了新的leader。
- 每当队列头部元素被更早到期的元素替换时,leader被置为null,offer里面q.peek() == e时,会将leader=null,此时当然会signal,重新竞选leader。所以定时等待线程必须要处理失去leader时情况。
7.总结
- 首先就是线程池本身:执行任务execute及submit被覆盖了以实现周期任务,增加了run-after-shutdown参数来处理线程池关闭后怎么处理周期任务
- 线程还是沿用Worker,本身实现了AQS,在执行任务加锁,屏蔽了中断
- 阻塞队列使用的是定制的DelayedWorkQueue,优先队列,ScheduledFutureTask会记录其在堆数组中索引,这会消除在取消时查找任务的操作,大大加快了移除操作。但是在siftUp和siftDown中会增加维护索引的额外操作。
- 任务是继承自FutureTask的ScheduledFutureTask,实现了compareTo(基于time和序列号),方便放入DelayedWorkQueue。通过period区分是一次性任务还是周期性任务。通过setNextRuntime区分是scheduleAtFixedRate还是scheduleWithFixedDelay。