本文将会对DelayQueue做一个简单的介绍,并提供部分源码的分析。
DelayQueue的特性基本上由BlockingQueue、PriorityQueue和Delayed的特性来决定的。
简而言之,DelayQueue是通过Delayed,使得不同元素之间能按照剩余的延迟时间进行排序,然后通过PriorityQueue,使得超时的元素能最先被处理,然后利用BlockingQueue,将元素处理的操作阻塞住。
基本定义如下:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();
}
ReentrantLock lock = new ReentrantLock();
ReentrantLock是一个可重入的互斥锁,将由最近成功获得锁,并且还没有释放该锁的线程所拥有,当锁被其他线程获得时,调用lock的线程将无法获得锁。
在DelayQueue中,只有一个互斥锁lock。
PriorityQueue<E> q = new PriorityQueue<E>();
PriorityQueue是一个优先级队列,每次从队列中取出的是具有最高优先权的元素。
在DelayQueue中,因为E继承于Delayed,所以q表示一个按照delayTime排序的优先级队列,用于存放需要延迟执行的元素。
Thread leader = null;
这里的leader设计出来是为了minimize unnecessary timed waiting(减少不必要的等待时间),如何实现的方案会在详细解读中解释。
在DelayQueue中leader表示一个等待从队列中获取消息的线程。
Condition available = lock.newCondition();
Condition是lock对象的条件变量,只能和锁lock配合使用,用于控制并发程序访问竞争资源的安全。
一个锁lock可以有多个条件变量condition,每个条件上可以有多个线程等待,通过调用await()方法,可以让线程在该条件下等待。当调用signalAll()方法,又可以唤醒该条件下的等待的线程。
在DelayQueue中lock对象只有一个条件变量available。
以下是DelayQueue的主要方法:
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
1、执行lock.lock(),获取锁。
2、把元素e添加到优先队列q(下称队列q)中。
3、判断队列q的队首元素是否为e。
4、如果e是队首元素的话,即元素e是最近可被执行的元素,意味着延迟队列的执行顺序将被变更。
执行leader = null,否则在执行take时,所有线程就会在if(leader!=null)的判断下进入等待。
执行available.signal(),唤醒其他等待中的线程,重新去循环执行take中的操作1-8。
如果不执行signal,那么在take方法中,只有执行awaitNanos(delay)的线程在等待delay指定的时间后自动唤醒,其他执行await的线程将一直被挂起。
如果没有新的线程去执行take方法,那么等待执行awaitNanos(delay)的线程自动唤醒时,此时等待时间将超过元素e的delayTime,这不符合预期。
即便有新的线程去执行take方法,那之前挂起的线程也将一直在等待,效率很低。
5、在finally块中执行lock.unlock()。
需要注意的是,锁必须在 finally 块中释放。否则,如果代码抛出异常,那么锁就有可能永远得不到释放。如果没有释放锁,那么就会产生死锁的问题。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
1、执行lock.lockInterruptibly(),获取锁。
lockInterruptibly和lock的区别在于
lock 在锁被其他线程占有,当前线程等待锁期间(下称等待锁期间),只考虑获取锁。只有在获取锁成功后,才会去响应中断。
而lockInterruptibly 在等待锁期间,会优先考虑响应中断,而不是响应锁的获取。如果当前线程被打断(interrupt)则该方法抛出InterruptedException。该方法提供了一种解除死锁的途径。
2、E first = q.peek(),获取队列q的队首元素first(下称first)。
3、如果first为空,则执行avaliable.await()让线程进入等待。实际上就是释放锁,然后挂起线程,等待被唤醒,此时其他线程可以获得锁了。
await()和awaitNanos(nanosTimeout)区别在于
执行awaitNanos(nanosTimeout)的线程比执行await()的线程多一个唤醒条件,超过等待nanosTimeout指定的时间,线程将自动唤醒。线程唤醒时,保证该线程是持有锁的。
4、如果first不为空,则执行first.getDelay(NANOSECONDS)获取first的剩余延迟时间delayTime(下称delayTime)
5、如果first的delayTime<=0,表明该元素已经达到之前设定的延迟时间了,则调用return q.poll(),将first从队列q中的移除并且返回该元素first.
6、如果first的delayTime>0,则将first指向null,释放first的引用,避免内存泄露.
7、如果线程leader(下称leader)不为空的话,则执行avaliable.await()让线程进入等待。leader不为空的话,表明已经有其他线程在获取优先队列q的队首元素了(下称获取队首元素),此时只需要执行avaliable.await()让当前线程进入等待即可。
8、如果leader为空,则执行Thread thisThread = Thread.currentThread();leader = thisThread;将leader指向当前线程,然后执行available.awaitNanos(delay);让线程最长等待delayTime的时间。最后在finally块中,如果leader依然指向前文获取的当前线程thisThread,那么将leader指向null,释放leader引用。
这里leader为空,表明尚未有其他线程在获取队首元素,此时设置leader对象,指向当前线程(下称currentThread)。因为currentThread执行了available.awaitNanos(delay)释放了锁,所以其他线程(下称otherThread)在调用take方法时能获取锁,但是因为leader非空,所以otherThread都会进入7的那步,直接进入等待,而不需要像currentThread那样执行8的一系列操作,达到设计leader线程的初衷。
9、循环执行以上1-8步,直到first非空且first的delayTime<=0,跳出循环。
10、跳出循环后,进入finally块。
11、如果leader为空且队列q的队首元素非null(q队列中移除了上文的first元素后还有其他元素),此时执行available.signal(),调用signal唤醒其他等待中的线程。
12、执行lock.unlock(),执行解锁操作。
ok,源码分析就先讲到这里了,下一期我准备讲一下如何将DelayQueue封装成可用的组件,让使用者调用起来更加方便。