前言
在之前的文章中,已经对ArrayBlockingQueue、LinkedBlockingQueue这两个比较常用的阻塞队列做了源码分析,我们知道其内部都是通过ReentrantLock来保证数据读写的线程安全,通过Condition来完成线程等待和唤醒,只不过ArrayBlockingQueue在读写时使用了一把锁所完成,而LinkedBlockingQueue对于读和写分别使用了两把锁来进行处理,从而达到读写分离的效果。
然而,通过锁机制来实现一个线程安全的队列,在并发不是特别高的情况下并不是非常合适,因为在大多数情况下都只有几个线程同时访问,而每次执行都需要去加一次锁,从而导致线程进行上下文切换,影响整体性能。因此,JDK还为我们提供了一个无锁线程安全的队列——ConcurrentLinkedQueue,其底层使用CAS来实现无阻塞的并发控制。本文将该队列的实现机制和源码做一个分析,让我们共同看看Doug Lea大神是如何巧妙地通过无锁机制来实现一个线程安全的队列。
1.ConcurrentLinkedQueue介绍
首先让我们看看JDK文档对该类的描述:
ConcurrentLinkedQueue是一个基于链表、无界、线程安全的队列。这个队列将元素按照先进先出的顺序进行存储。队列的头节点是在队列中存在时间最久的节点,队列的尾节点是在队列中存在时间最短的节点。新的元素会被插入到队列的尾部,而队列的元素的获取操作则会从队列的头部去获取元素。ConcurrentLinekedQueue适合作为多个线程共享访问的集合。与大多数并发集合的实现类似,该类也不允许添加null元素,该类的实现使用了一个高效的无锁算法,其算法的是基于podc-1996.pdf所改进的。
除了上面所描述的基本特性之外,ConcurrentLinkedQueue中还有一些其他的特点:
- 队列中的最后一个元素的next属性总是为null,并且最后一个节点可以通过tail节点以时间复杂度为o(1)的方式到达,也可以通过head以时间复杂度o(n)的方式到达。即队列中的最后一个元素,其总是可达的。
- 节点出队列,是通过执行CAS操作,将其item设置为null。这里与我们传统想象的节点删除不太一样,它是通过将一个节点的item标记为null,标记这个节点已经出队列,在下次操作的时候,如果一旦发现节点的item为null时,就将该节点的next设置为自己,从而真正完成节点从队列的移除,因此节点的删除是延迟处理的。 (下面我们在分析节点出队列的过程时可以更加清楚地看到其实现细节)
- 如果队列中的节点其item不为null,它们从是可以从head开始,逐个查找到对应的节点。
- ConcurrentLinkedQueue的size方法不是一个常量级别的操作,每一次获取队列的大小,都需要整体地对队列做一次遍历操作,并且得到值也是一个非精确的值,因为在遍历的过程中,队列的结构可能也会被其他线程所改变。
2. ConcurrentLinkedQueue内部结构分析
由于是基于链表的实现方式,与其他的并发队列类似,都会在内部定义一个节点类,ConcurrentLinedQueu亦是如此,首先我们看一下节点的定义:
//该节点是一个静态内部类,因此其只能作用于该队列内部
private static class Node<E> {
/*
* 当前节点存储的元素,注意到这里使用volatile关键字来对节点进行
* 修饰,其目的是在并发读的时候保证内存的可见性
*/
volatile E item;
//当前节点的下一个节点
volatile Node<E> next;
/**
* 构建新的节点,这里没有使用volatile的方式来对节点的元素值进行设置,而是使用普通的写方式
* 因为对于一个新增的节点,只有在其被成功插入到队列尾部才对外可见,因此在这里没有对数据可见性的强制要求
*/
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
/*
* 通过Unsafe来完成对当前节点元素的CAS操作
*/
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
/*
* 使用普通的方式来设置当前节点的下一个节点
*/
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
/*
* 通过Unsafe来完成对当前节点的下一个节点的CAS操作
*/
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = Node.class;
//通过Unsafe来获取一个Node节点的item属性在内存中相对该对象的位置偏移量
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
//通过Unsafe来获取一个Node节点的next节点属性在内存中相对该对象的位置偏移量
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
上面节点的定义与之前分析的SynchronousQueue中的内部定义的节点非常类似,这里不再过多阐述。
队列的属性定义
为了提高快速查找队列中第一个节点和最后一个节点,因此ConcurrentLinkedQueue中分别定义了一个head节点和tail节点来快速定位。
/**
* 不变性:
* - 队列中所有未删除的节点都可以通过head节点的succ方法查找到
* - head节点一定不可能等于null
* - (tmp = head).next != tmp,即head的next不能指向自己。
*
* 可变性:
* - head的item可能为null,也可能不为null
* - tail节点可能会滞后于head节点,因此从head节点未必一定可以找到tail节点
*
*/
private transient volatile Node<E> head;
/**
* 不变性:
* - 节点中的最后一个元素总是可以通过tail的succ方法来获取
* - tail节点不等于null
*
* 可变性:
* - head的item可能为null,也可能不为null
* - tail 节点的next可能指向自己,也可能不指向自己
* - tail节点可能会滞后于head节点,因此从head节点未必一定可以找到tail节点
*/
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
ConcurrentLinkedQueue源码分析
通过上面的描述我们知道了该队列是通过一个头节点和一个尾节点,然后将中间链接节点之间两两相连接构成一个队列,下面让我们分析一下ConcurrentLinkedQueue的内部的具体实现。(在这里需要说明一下,由于其节点是完全地基于Unsafe来完成CAS的操作,如果你对该内容还不是很熟悉的话,可以参考我的深入分析Java中的原子操作这篇文章,里面对原子操作有一个比较细致的描述。
入队列源码分析
在分析源码之前,我们先通过几张图来说明一下ConcurrentLinkedQueue的入队列的整体过程。在了解完整体的过程后,再结合源码去分析细节会更加容易理解:
队列初始化
在队列刚创建时,head和tail同时指向空节点,我们也称其为dummy
节点。
添加元素a
向队列中添加元素a,此时只是新节点追加到第一个节点的后面,但是tail节点并未发生改变。
添加元素b
向队列中添加元素b,此时新节点与原先的tail节点之间的距离大于1,因此tail节点在这个时候会更新,真正的指向了最后一个节点
添加元素c
由于新节点与tail节点的距离没有大于1,因此此时tail节点同样不会发生更新。
添加元素d
通过上面的图示,我们可以看到ConcurrentLinkedQueue在入队列过程中非常明显的一个特点就是tail指针不是实时更新的,即tail节点可能会滞后于队列中真正的最后一个节点,只有当最后的一个节点与tail节点之前的距离大于1时才会更新,而这样设计的目的就是为了减少避免每增加一个节点,tail节点都需要去执行一次CAS操作的情况发生。
public boolean offer(E e) {
// 由于元素不允许为null,因此对元素做一个检查
checkNotNull(e);
// 生成待插入的节点
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
// 当q等于null时,则p就是队列中的最后一个元素
if (q == null) {
// 执行cas操作,如果执行成功,则newNode成为队列的最后一个元素,但它未必是tail指向的元素
if (p.casNext(null, newNode)) {
// 通过判断p!=t,从而确定当前新节点与tail指向的节点之间的距离是否大于1
// 如果大于1,则需要更新tail指针
if (p != t)
casTail(t, newNode);
return true;
}
}
// 如果p==q,则意味着当前p节点已经被从队列中移除(如果单纯从入队列看是看不出来的,后面结合出队列再回头分析)
else if (p == q)
/* 判断在执行过程中tail是否发生变化,如果未发生变化,则tail也已经脱落队列
* 因为 p = t = tail,而p已经脱离队列,从而推断出tail也脱离了队列
* 那么此时只能从head开始,重新查找队列的最后一个元素
* 如果tail发生了变化,则直接从当前队列的tail开始查找队列的最后一个元素
*/
p = (t != (t = tail)) ? t : head;
else
/**
* 由于p节点的next不为null,并且p节点并未从队列中删除,因此需要继续查找队列的最后一个节点
* 判断执行过程中tail节点是否发生了变化
* 如果发生了变化,则让p执行当前的tail,否则就让p直接指向它的next节点q
*/
p = (p != t && t != (t = tail)) ? t : q;
}
}
可能会有不少朋友对上面t != (t = tail)
的处理感到疑惑,疑惑的原因可能会觉得一个变量自己和自己比较,那不是一定为true嘛,怎么还会出现等于false的可能呢。为了理解这个问题,我们一起看一下下面的这段代码:
public class VarCompareTest {
static volatile int b = 2;
public static void main(String[] args) {
int a = b;
int c = a != (a = b) ? 5 : 4;
System.out.println(a);
System.out.println(c);
}
}
这段代码的也用到了上面类似的t != (t = tail)
,但是在编译完成后,我们通过IDEA本身的反编译工具来查看一下对应的Class文件的结果(目前由于对字节码指令不熟悉,因此不从那个角度去解读):
通过上面的代码,我们可以看到,一开始a的值就等于b的值,其值为2;紧接着a的值首先通过一个局部变量记录,然后再将b的值赋值给a,由于b可能存在多线程修改的可能,此时b的值可能被其他线程改成了3,因此a的值也会变为3,最后再拿2与3进行比较,即var10000 != b
进行比较,就会存在等于false的情况发生了。对于t != (t = tail)
的情况也是如此,相信通过这个例子说明大家应该明白其中的原理了!
出队列源码分析
在分析出队列源码之前,我们也结合上面的图来看一下出队列的整体过程:
这里假设队列是基于上面入队列之后的状态进行的。
移除第1个节点
原本指向head的节点,此时的next指向了自己,因此它从队列中真正的移除。存储a的节点,其item置为了null,并且head节点发生变更,真正指向了队列中第一个有效(真正存储数据)的节点。
移除第2个节点
此时只是仅仅将节点的item设置为了null,但是head节点没有发生变更,这样做的目的也是为了减少一次CAS操作,它会等到下一次才去变更。
移除第3个节点
移除第4个节点
看完上图的分析,相信大家对ConcurrentLinkedQueue出队列的操作应该有一个直观的理解了,下面我们看下源码的具体实现:
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// 如果元素的item不为null,则说明p节点是当前队列中的第一个未被删除的节点
// 此时也说明head指向的节点确实是队列中的第一个元素
// 通过CAS操作,将item设置为null,来标记其已经被删除
if (item != null && p.casItem(item, null)) {
// 判断p节点与head指向的节点是否是同一个,如果不是则需要将head节点向前移动
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 节点的next为null,则说明队列为空,只有一个dummy节点
else if ((q = p.next) == null) {
// 尝试将dummy节点p设置为新的head
updateHead(h, p);
return null;
}
// 如果p节点被删除,只能从队列的head从头开始再次查找
else if (p == q)
// 跳回到最外层的循环,重新执行一次Node<E> h = head, p = h, q;操作
continue restartFromHead;
else
// 由于head指向的节点其item为null,即head指向的节点不是一个有效节点,因此继续通过head的next继续查找
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
// 判断新设置的节点与原来的head节点是否是同一个,如果不是则将新节点设置为新的head节点,
// 并且将原来的head节点的next指向自己。
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
tail节点滞后于head节点的场景分析
一般来说,head节点都在队列的左边,而tail节点在队列的右边。然而,在ConcurrentLinkedQueue中,可能存在tail节点在左边,而head节点却跑到了右边的情况,这种场景我们将其称为tail lag behind head
。下面我们分析一下在什么样的场景下会发生这样的情况,这对于真正理解ConcurrentLinkedQueue非常重要!
从上图可以看到,在添加完一个元素后,tail节点并有发生改变。此时,假设我们去获取队列中的元素,队列的结构就变成如下的样子。
从上面的结构我们可以看到,此时tail节点滞后于head节点,并且我们此时通过head节点也无法查找到tail节点,因为该节点已经从队列中移除。当下一次添加元素的时候,就会出现tail节点自己指向自己的情况,此时就需要重新获取到head,将新增的元素追加到head后面。
总结
至此,ConcurrentLinkedQueue的实现我们已经分析完成了。该类的核心设计就在于CAS的无阻塞以及head/tail节点的延迟更新。尽量我们在实际的开发中基本不会去实现一个如此复杂的队列,但是通过分析一个经典无阻塞队列,可以更加好地帮助我们理解并发编程。如果存在分析不对的地方,还望大神指出。