作者:王一飞老师,叩丁狼教育高级讲师
概念
并编程中,一般需要用到安全的队列,如果要自己实现安全队列,可以使用2种方式:
方式1:加锁,这种实现方式就是我们常说的阻塞队列。
方式2:使用循环CAS算法实现,这种方式实现队列称之为非阻塞队列。
先对而已,加锁队列的实现较为简单,这里就略过,我们来重点来解读一下非阻塞队列。
从点到面, 下面我们来看下非阻塞队列经典实现类:ConcurrentLinkedQueue (JDK1.8版)
ConcurrentLinkedQueue
根据API解释,ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全的队列,按照先进先出原则对元素进行排序。新元素从队列尾部插入,而获取队列元素,则需要从队列头部获取。
看下ConcurrentLinkedQueue的结构图
从内图可以了解ConcurrentLinkedQueue一个大概,ConcurrentLinkedQueue内部持有2个节点:head头结点,负责出列, tail尾节点,负责入列。而元素节点Node,使用item存储入列元素,next指向下一个元素节点。
private static class Node<E> {
volatile E item;
volatile Node<E> next;
//....
}
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
implements Queue<E>, java.io.Serializable {
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
//....
}
ConcurrentLinkedQueue使用特点
ConcurrentLinkedQueue使用约定:
1:不允许null入列
2:在入队的最后一个元素的next为null
3:队列中所有未删除的节点的item都不能为null且都能从head节点遍历到
4:删除节点是将item设置为null, 队列迭代时跳过item为null节点
5:head节点跟tail不一定指向头节点或尾节点,可能存在滞后性
之所以有这奇葩约定,全因ConcurrentLinkedQueue是并发非阻塞队列决定的。
我们从源码上看一下ConcurrentLinkedQueue实现过程
入列
我们印象中链表特点:tail节点表示最后一个节点, head表示第一个节点。ConcurrentLinkedQueue 跟传统的链表有点区别,在单线程环境下符合传统链表特点,但涉及到多线程环境,ConcurrentLinkedQueue 中的tail节点不一定是最后一个节点,他可能是倒数第二个。所以ConcurrentLinkedQueue判断队尾条件是节点的next为null。
public boolean offer(E e) {
checkNotNull(e); //为空判断,e为null是抛异常
final Node<E> newNode = new Node<E>(e); //将e包装成newNode
for (Node<E> t = tail, p = t;;) { //循环cas,直至加入成功
//t = p = tail
Node<E> q = p.next;
if (q == null) { //判断p是否为尾节点
//如果是,p.next = newNode
if (p.casNext(null, newNode)) {
//首次添加时,p 等于t,不进行尾节点更新,所以所尾节点存在滞后性
//并发环境,可能存添加/删除,tail就更难保证正确指向最后节点。
if (p != t)
//更新尾节点为最新元素
casTail(t, newNode);
return true;
}
}
else if (p == q)
//当tail不执行最后节点时,如果执行出列操作,很有可能将tail也给移除了
//此时需要对tail节点进行复位,复位到head节点
p = (t != (t = tail)) ? t : head;
else
//推动tail尾节点往队尾移动
p = (p != t && t != (t = tail)) ? t : q;
}
}
分析
从图上看tail不一定执行最后一个节点,但可以确定最后节点的next节点为null。到这可能朋友问他,并发环境什么情况都有可能,ConcurrentLinkedQueue是怎么保证线程安全的?
我们观察offer方法的设计,
1:是一个死循环,就是不停使用cas判断直到添加元素入队成功。
for (Node<E> t = tail, p = t;;)
2:2个cas判断方法
p.casNext(null, newNode) 确保队列在入列时是原子操作
private boolean casTail(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}
casTail(t, newNode); 确保tail队尾在移动改变时是原子操作
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
而在并发环境,ConcurrentLinkedQueue入列线程安全考虑具体可分2类:
1>线程1线程2同时入列
这个好理解, 线程1,线程2不管在offer哪个位置开始并发,他们最终的目的都是入列,也即都需要执行casNext方法, 我们只需要确保所有线程都有机会执行casNext方法,并且保证casNext方法是原子操作即可。casNext失败的线程,可以进入下一轮循环,人品好的话就可以入列,衰的话继续循环
2>线程1遍历,线程2入列
ConcurrentLinkedQueue 遍历是线程不安全的, 线程1遍历,线程2很有可能进行入列出列操作, 所以ConcurrentLinkedQueue 的size是变化。换句话说,要想安全遍历ConcurrentLinkedQueue 队列,必须额外加锁。
但换一个角度想, ConcurrentLinkedQueue 的设计初衷非阻塞队列,我们更多关注入列与出列线程安全,这2点能保证就可以啦。
出列
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
//入列折腾的tail,那出列折腾的就是head
E item = p.item;
//出列判断依据是节点的item=null
//item != null, 并且能将操作节点的item设置null, 表示出列成功
if (item != null && p.casItem(item, null)) {
if (p != h)
//一旦出列成功需要对head进行移动
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
//第一轮操作失败,下一轮继续,调回到循环前
continue restartFromHead;
else
//推动head节点移动
p = q;
}
}
}
看图, 被移动的节点(item为null的节点)会被jvm回收。但是有个问题, tail也被回收, 那ConcurrentLinkedQueue就没有tail节点了。如果此时再添加一个D元素时,会出现什么情况?
好问的朋友,又想了,ConcurrentLinkedQueue怎么保证出列线程安全?道理跟之前入列一样,cas保证原子操作即可。
总结
到这ConcurrentLinkedQueue介绍就完成了。总结下ConcurrentLinkedQueue贴点:
1>入列出列线程安全,遍历不安全
2>不允许添加null元素
3>底层使用列表与cas算法包装入列出列安全
想获取更多技术视频,请前往叩丁狼官网:http://www.wolfcode.cn/openClassWeb_listDetail.html