AbstractQueuedSynchronizer为锁机制维护了一个队列,需要获取锁的线程们排在队列中,只有排在队首的线程才有资格获取锁。
ConditionObject是AbstractQueuedSynchronizer的内部类,它为锁机制维护了另一个队列,如果线程排在了该队列中,说明这个线程需要在某种条件满足后,才被唤醒。
前一个队列是用于锁的争用的,称之为syn queue。后一个队列是用于条件等待的,称之为condition queue。这两个队列之间是这样协作的:当线程拿到锁后,发现条件未满足,便释放锁并挂到condition queue中去;当条件满足后,线程会被唤醒,并挂到syn queue中去重新获取锁。具体的应用场景可见Java--Lock&Condition的理解中提到的生产者和消费者模型。
本文主要是对ConditionObject的实现做简单的介绍。
Condition queue
ConditionObject主要是维护了一个condition queue,代码如下所示
<pre>
public class ConditionObject implements Condition, java.io.Serializable {
//First node of condition queue.
private transient Node firstWaiter;
//Last node of condition queue.
private transient Node lastWaiter;
......
}
</pre>
condition queue也是一个Node队列,这和syn queue同样,不过syn queue是通过Node的prev和next指针形成的双向队列,而condition queue则是通过Node的nextWaiter形成的单向队列。ConditionObject仅是记录了condition queue的队首和队尾。
下面结合代码简述一下ConditionObject中几个方法。
awaitUninterruptibly()
该方法就是当前线程要在某个条件上等待,要加入condition queue了。
步骤:
- 挂入condition queue;
- 释放锁;
- 挂起线程;
- 线程唤醒后重新尝试获取锁。
代码及注释如下。
<pre>
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();//将当前线程挂入condition queue
int savedState = fullyRelease(node);//释放锁
boolean interrupted = false;
while (!isOnSyncQueue(node)) {//线程是不是在syn queue里
LockSupport.park(this);//挂起当前线程
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted) //被唤醒后则重新开始尝试获取锁
selfInterrupt();
}
</pre>
这里面有个while,用来判断线程是不是在syn queue里。针对这个循环可做两点说明:
- node由addConditionWaiter()返回,是一个waitStatus=Node.CONDITION的node,所以,第一次执行判断时,必入循环,当前线程被挂起;
- 线程被唤醒,从挂起处继续执行,此时,会继续执行while内的判断。直到确认当前线程已经在syn queue队列上,才会尝试获取锁。那么,node又是被谁放到syn queue中的呢?是和await()方法对应的signal()方法。
其中
addConditionWaiter()是ConditionObject的私有方法
fullyRelease()和isOnSyncQueue()是AbstractQueuedSynchronizer为Conditions实现的方法。
addConditionWaiter()
将当前线程挂入condition queue,代码及注释如下。
<pre>
private Node addConditionWaiter() {
Node t = lastWaiter;
//如果队尾已经被cancel了,就清理一次condition queue,将所有的cancelled node出队
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//新建node,关联到当前线程,并加入condition queue
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
</pre>
该方法分为两步。因为Node是从队尾加入condition queue的,所以第一步是判断condition queue的队尾是否已经被cancel了,如果是,就调用unlinkCancelledWaiters()从队头开始将所有的cancelled node都出队。清理完cancelled node后,队尾就是有效的node了,此时,新建一个关联到当前线程的node,将该node添加到队列中,并设置为新的队尾。
unlinkCancelledWaiters()
清除队列中所有的cancelled node。
<pre>
private void unlinkCancelledWaiters() {
Node t = firstWaiter;//当前节点(就好比for循环中的i)
Node trail = null;//记录当前节点前面最近的一个有效节点(未被取消的节点)
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {//如果当前节点被取消了,就将当前节点出队
t.nextWaiter = null;
if (trail == null)//如果tiral为空,说明当前节点前面没有有效节点,而当前节点又被取消了
//说明从当前节点往前的所有节点都被取消了,队首自然要往后更新
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;//没有取消,则更新所谓“最近的有效节点”
t = next;//当前节点更新为下一个(就好比for循环中的++i)
}
}
</pre>
signal()
唤醒condition queue的队首,主要的代码其实就是对doSignal()的调用。
doSignal()
唤醒队首,代码及注释如下。
<pre>
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)//队首的nextWaiter是不是指向空(也即队列里是不是只有一个node,即队首)
//这一步同时更新了队首,相当于将原先的队首出队了
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&//将队首迁移到syn queue
(first = firstWaiter) != null);//如果迁移失败了,说明原先的队首被取消了,尝试处理更新后的队首
} //如果更新后的队首为空,说明队列已经被清空了,就无需再处理了
</pre>
doSignal()在源码中有这么一句注释"Split out from signal in part to encourage compilers to inline the case of no waiters".这句话的含义如下:
这里单独实现doSignal()接口的意义在于,使得signal()的代码看起来十分简单,不会直接包括循环体,编译器在编译的时候,将更倾向于将signal()当做inline function。这样,在没有任何waiters(即condition queue为空,也即firstWaiter == null)的情况下, signal()作为inline function,性能将得到更明显的提升。
transferForSignal()
将node从condition queue迁移到syn queue。
代码及注释如下。
<pre>
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))//如果node被取消了,就无需再进行什么迁移操作了
return false;//迁移失败,返回后doSignal()会去处理下一个node
Node p = enq(node);//将node加入syn queue
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))//尝试唤醒node关联的线程
//没唤醒也没关系,已经加入syn queue了,总会被syn queue前面的node唤醒的
LockSupport.unpark(node.thread);
return true;
}
</pre>
ConditionObject的方法介绍就到这里了,下面是对CAS和inline function的一些解释。
对condition queue的操作未用任何CAS操作?
比如说addConditionWaiter()和singal()方法在修改队首和队尾,或是在修改nextWaiter指针时,都未使用任何CAS操作。这是因为,一个线程如果正在调用ConditionObject的方法的话,说明它一定获得了ConditionObject所隶属的锁。此时,能够保证一次性只有一个线程正在修改该锁对应的condition queue。
在上文解释的代码中,只有transferForSignal()使用到了CAS方法。因为该方法是想要改变其他线程的状态,而其他线程的状态还可能因为其他原因改变,所以其中使用了CAS方法。
inline function
inline function提升性能之处在于,编译器在编译的时候,会将代码整个替换到函数调用所在位置,省去了函数调用的耗时。
函数调用的耗时我倒是知道些,调用时需要保存现场信息,开辟新的堆栈,返回时还要恢复现场信息。
但是,为何只有简短的函数适合内联呢?这是因为内联增大了代码的体积。代码在执行的时候是要被加载到内存的。若函数A采取调用的方式,不论被引用了多少次,代码本身就只占一份A的内存空间。若A采取内联的方式,若被引用了两次,代码本身就要占两份的内存空间。一旦A被更多的地方引用,代码占用的内存就会显著增大,从而影响到运行时的性能。
这里有篇针对inline function的问答,感觉挺好:
http://www.learncpp.com/cpp-tutorial/75-inline-functions/
为防止链接失效,特截图一张吧。