经典的生产者-消费者模式,操作流程是这样的:
有多个生产者,可以并发生产产品,把产品置入队列中,如果队列满了,生产者就会阻塞;
有多个消费者,并发从队列中获取产品,如果队列空了,消费者就会阻塞;
SynchronousQueue 也是一个队列来的,但它的特别之处在于它内部没有容器,一个生产线程,当它生产产品(即put的时候),如果当前没有人想要消费产品(即当前没有线程执行take),此生产线程必须阻塞,等待一个消费线程调用take操作,take操作将会唤醒该生产线程,同时消费线程会获取生产线程的产品(即数据传递)
package com.concurrent;
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
final SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
Thread putThread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("put thread start");
try {
queue.put(1);
} catch (InterruptedException e) {
}
System.out.println("put thread end");
}
});
Thread takeThread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("take thread start");
try {
System.out.println("take from putThread: " + queue.take());
} catch (InterruptedException e) {
}
System.out.println("take thread end");
}
});
putThread.start();
Thread.sleep(1000);
takeThread.start();
}
}
一种输出结果如下:
put thread start
take thread start
take from putThread: 1
put thread end
take thread end
从结果可以看出,put线程执行queue.put(1) 后就被阻塞了,只有take线程进行了消费,put线程才可以返回。可以认为这是一种线程与线程间一对一传递消息的模型。
作为BlockingQueue中的一员,SynchronousQueue与其他BlockingQueue有着不同特性:
- SynchronousQueue没有容量。与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。
- 因为没有容量,所以对应 peek, contains, clear, isEmpty … 等方法其实是无效的。例如clear是不执行任何操作的,contains始终返回false,peek始终返回null。
- SynchronousQueue分为公平和非公平,默认情况下采用非公平性访问策略,当然也可以通过构造函数来设置为公平性访问策略(为true即可)。
- 若使用 TransferQueue, 则队列中永远会存在一个 dummy node
SynchronousQueue数据结构
由于SynchronousQueue的支持公平策略和非公平策略,所以底层可能两种数据结构:队列(实现公平策略)和栈(实现非公平策略),队列与栈都是通过链表来实现的。具体的数据结构如下
说明:数据结构有两种类型,栈和队列;栈有一个头结点,队列有一个头结点和尾结点;栈用于实现非公平策略,队列用于实现公平策略。
类的继承关系
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {}
说明:SynchronousQueue继承了AbstractQueue抽象类,AbstractQueue定义了对队列的基本操作;同时实现了BlockingQueue接口,BlockingQueue表示阻塞型的队列,其对队列的操作可能会抛出异常;同时也实现了Searializable接口,表示可以被序列化。
SynchronousQueue的内部类框架图如下
说明:其中比较重要的类是左侧的三个类,Transferer是TransferStack栈和TransferQueue队列的公共类,定义了转移数据的公共操作,由TransferStack和TransferQueue具体实现,WaitQueue、LifoWaitQueue、FifoWaitQueue表示为了兼容JDK1.5版本中的SynchronousQueue的序列化策略所遗留的,这里不做具体的讲解。下面着重看左侧的三个类。
SynchronousQueue实现原理
公平模式下的模型:
公平模式下,底层实现使用的是TransferQueue这个内部队列,它有一个head和tail指针,用于指向当前正在等待匹配的线程节点。
初始化时,TransferQueue的状态如下:
接着我们进行一些操作:
1、线程put1执行 put(1)操作,由于当前没有配对的消费线程,所以put1线程入队列,自旋一小会后睡眠等待,这时队列状态如下:
2、接着,线程put2执行了put(2)操作,跟前面一样,put2线程入队列,自旋一小会后睡眠等待,这时队列状态如下:
- 这时候,来了一个线程take1,执行了 take操作,由于tail指向put2线程,put2线程跟take1线程配对了(一put一take),这时take1线程不需要入队,但是请注意了,这时候,要唤醒的线程并不是put2,而是put1。为何? 大家应该知道我们现在讲的是公平策略,所谓公平就是谁先入队了,谁就优先被唤醒,我们的例子明显是put1应该优先被唤醒。
公平策略总结下来就是:队尾匹配队头出队。
执行后put1线程被唤醒,take1线程的 take()方法返回了1(put1线程的数据),这样就实现了线程间的一对一通信,这时候内部状态如下:
-
最后,再来一个线程take2,执行take操作,这时候只有put2线程在等候,而且两个线程匹配上了,线程put2被唤醒,
take2线程take操作返回了2(线程put2的数据),这时候队列又回到了起点,如下所示:
以上便是公平模式下,SynchronousQueue的实现模型。总结下来就是:队尾匹配队头出队,先进先出,体现公平原则。
非公平模式下的模型:
我们还是使用跟公平模式下一样的操作流程,对比两种策略下有何不同。非公平模式底层的实现使用的是TransferStack,
一个栈,实现中用head指针指向栈顶,接着我们看看它的实现模型:
1、线程put1执行 put(1)操作,由于当前没有配对的消费线程,所以put1线程入栈,自旋一小会后睡眠等待,这时栈状态如下:
2、接着,线程put2再次执行了put(2)操作,跟前面一样,put2线程入栈,自旋一小会后睡眠等待,这时栈状态如下:
3、这时候,来了一个线程take1,执行了take操作,这时候发现栈顶为put2线程,匹配成功,但是实现会先把take1线程入栈,然后take1线程循环执行匹配put2线程逻辑,一旦发现没有并发冲突,就会把栈顶指针直接指向 put1线程
4、最后,再来一个线程take2,执行take操作,这跟步骤3的逻辑基本是一致的,take2线程入栈,然后在循环中匹配put1线程,最终全部匹配完毕,栈变为空,恢复初始状态,如下图所示:
可以从上面流程看出,虽然put1线程先入栈了,但是却是后匹配,这就是非公平的由来。
下面我们分析一下cachedThreadPool的使用流程,通过这个过程我们来了解synchronousQueue的使用方式:先看代码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//1
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
由于ThreadPoolExecutor内部实现任务提交的时候调用的是工作队列(BlockingQueue接口的实现类)的非阻塞式入队列方法(offer方法),因此,在使用SynchronousQueue作为工作队列的前提下,客户端代码向线程池提交任务时,而线程池中又没有空闲的线程能够从SynchronousQueue队列实例中取一个任务,那么相应的offer方法调用就会失败(即任务没有被存入工作队列)。此时,ThreadPoolExecutor会新建一个新的工作者线程用于对这个入队列失败的任务进行处理(假设此时线程池的大小还未达到其最大线程池大小)。
SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大.
所以,使用SynchronousQueue作为工作队列,工作队列本身并不限制待执行的任务的数量。但此时需要限定线程池的最大大小为一个合理的有限值,而不是Integer.MAX_VALUE,否则可能导致线程池中的工作者线程的数量一直增加到系统资源所无法承受为止。
使用SynchronousQueue的目的就是保证“对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务”。
源码解析
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//Transferer是一个抽象类,SynchronousQueue内部有2个Transferer的子类,分别是TransferQueue和TransferStack
//
private transient volatile Transferer<E> transferer;
//默认构造方法的线程等待队列是不保证顺序的
public SynchronousQueue() {
this(false);
}
//如果fair为true,那SynchronousQueue所采用的是能保证先进先出的TransferQueue,也就是先被挂起的线程会先返回
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
//向SynchronousQueue中添加数据,如果此时线程队列中没有获取数据的线程的话,当前的线程就会挂起等待
public void put(E e) throws InterruptedException {
//添加的数据不能是null
if (e == null) throw new NullPointerException();
//可以看到添加的方法调用的是transfer方法,如果添加失败会抛出InterruptedException异常
//后面我们可以在transfer方法的源码中调用put方法添加数据在当前线程被中断时才会返回null
//这里相当于继续把线程中断的InterruptedException向上抛出
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
//不带超时时间的offer方法,如果此时没有线程正在等待获取数据的话transfer就会返回null,也就是添加数据失败
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}
//带超时时间的offer方法,与上面的不同的是这个方法会等待一个超时时间,如果时间过了还没有线程来获取数据就会返回失败
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
//添加的数据被其他线程成功获取,返回成功
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
return true;
//如果添加数据失败了,有可能是线程被中断了,不是的话直接返回false
if (!Thread.interrupted())
return false;
//是线程被中断的话就向上跑出InterruptedException异常
throw new InterruptedException();
}
//take方法用于从队列中取数据,如果此时没有添加数据的线程被挂起,那当前线程就会被挂起等待
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
//成功获取数据
if (e != null)
return e;
//没有获取到数据,同时又退出挂起状态了,那说明线程被中断了,向上抛出InterruptedException
Thread.interrupted();
throw new InterruptedException();
}
//poll方法同样用于获取数据
public E poll() {
return transferer.transfer(null, true, 0);
}
//带超时时间的poll方法,如果超时时间到了还没有线程插入数据,就会返回失败
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = transferer.transfer(null, true, unit.toNanos(timeout));
//返回结果有2种情况
//e != null表示成功取到数据了
//!Thread.interrupted()表示返回失败了,且是因为超时失败的,此时e是null
if (e != null || !Thread.interrupted())
return e;
//返回失败了,并且是因为当前线程被中断了
throw new InterruptedException();
}
//可以看到SynchronousQueue的isEmpty方法一直返回的是true,因为SynchronousQueue没有任何容量
public boolean isEmpty() {
return true;
}
//同样的size方法也返回0
public int size() {
return 0;
}
<!--下面我们看看TransferQueue的具体实现,TransferQueue中的关键方法就是transfer方法了-->
//先看看TransferQueue的父类Transferer,比较简单,就是提供了一个transfer方法,需要子类具体实现
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}
//TransferQueue
static final class TransferQueue<E> extends Transferer<E> {
//内部的节点类,用于表示一个请求
//这里可以看出TransferQueue内部是一个单链表,因此可以保证先进先出
static final class QNode {
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
//请求所在的线程
volatile Thread waiter; // to control park/unpark
//用于判断是入队还是出队,true表示的是入队操作,也就是添加数据
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
//可以看到QNode内部通过volatile关键字以及Unsafe类的CAS方法来实现线程安全
//compareAndSwapObject方法第一个参数表示需要改变的对象,第二个参数表示偏移量
//第三个参数表示参数期待的值,第四个参数表示更新后的值
//下面的方法调用的意思是将当前的QNode对象(this)的next字段赋值为val,当目前的next的值是cmp时就会更新next字段成功
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
U.compareAndSwapObject(this, NEXT, cmp, val);
}
//方法的原理同上面的类似,这里就是更新item的值了
boolean casItem(Object cmp, Object val) {
return item == cmp &&
U.compareAndSwapObject(this, ITEM, cmp, val);
}
//方法的原理同上面的类似,这里把item赋值为自己,就表示取消当前节点表示的操作了
void tryCancel(Object cmp) {
U.compareAndSwapObject(this, ITEM, cmp, this);
}
//调用tryCancel方法后item就会是this,就表示当前任务被取消了
boolean isCancelled() {
return item == this;
}
//表示当前任务已经被返回了
boolean isOffList() {
return next == this;
}
// Unsafe mechanics
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long ITEM;
private static final long NEXT;
static {
try {
ITEM = U.objectFieldOffset
(QNode.class.getDeclaredField("item"));
NEXT = U.objectFieldOffset
(QNode.class.getDeclaredField("next"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
}
//首节点
transient volatile QNode head;
//尾部节点
transient volatile QNode tail;
/**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node
* when it was cancelled.
*/
transient volatile QNode cleanMe;
//构造函数中会初始化一个出队的节点,并且首尾都指向这个节点
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
//transfer方法用于提交数据或者是获取数据
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
//如果e不为null,就说明是添加数据的入队操作
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
//当队列为空的时候或者新加的操作和队尾的操作是同一个操作,可能都是入队操作也可能是出队操作,说明当前没有反向操作的线程空闲
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
//这是一个检查,确保t指向队尾
if (t != tail) // inconsistent read
continue;
//tn不为null,说明t不是尾部节点,就执行advanceTail操作,将tn作为尾部节点,继续循环
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
//如果timed为true,表示带有超时参数,等待超时期间没有其他相反操作的线程提交就会直接返回null
//这里如果nanos初始值就是0,比如不带超时时间的offer和poll方法,当队尾的节点不是相反操作时就会直接返回null
if (timed && nanos <= 0L) // can't wait
return null;
//如果没有超时时间或者超时时间不为0的话就创建新的节点
if (s == null)
s = new QNode(e, isData);
//使tail的next指向新的节点
if (!t.casNext(null, s)) // failed to link in
continue;
//更新TransferQueue的tail指向新的节点,这样tail节点就始终是尾部节点
advanceTail(t, s); // swing tail and wait
//如果当前操作是带超时时间的,则进行超时等待,否则就挂起线程,直到有新的反向操作提交
Object x = awaitFulfill(s, e, timed, nanos);
//当挂起的线程被中断或是超时时间已经过了,awaitFulfill方法就会返回当前节点,这样就会有x == s为true
if (x == s) { // wait was cancelled
//将队尾节点移出,并重新更新尾部节点,返回null,就是入队或是出队操作失败了
clean(t, s);
return null;
}
//如果s还没有被
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
}
//提交操作的时候刚刚好有反向的操作在等待
else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
//这里先判断m是否是有效的操作
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
//更新头部节点
advanceHead(h, m); // successfully fulfilled
//唤醒m节点的被挂起的线程
LockSupport.unpark(m.waiter);
//返回的结果用于给对应的操作,如take、offer等判断是否执行操作成功
return (x != null) ? (E)x : e;
}
}
}
<!--下面看看执行挂起线程的方法awaitFulfill-->
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
//首先获取超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//当前操作所在的线程
Thread w = Thread.currentThread();
//线程被挂起或是进入超时等待之前阻止自旋的次数
int spins = (head.next == s)
? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
: 0;
for (;;) {
//这里首先判断线程是否被中断了,如果被中断了就取消等待,并设置s的item指向s本身作为标记
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item;
//x != e就表示超时时间到了或是线程被中断了,也就是执行了tryCancel方法
if (x != e)
return x;
//这里先判断超时的时间是否过了
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
//这里通过多几次循环来避免直接挂起线程
if (spins > 0)
--spins;
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
//park操作会让线程挂起进入等待状态(Waiting),需要其他线程调用unpark方法唤醒
LockSupport.park(this);
else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
//parkNanos操作会让线程挂起进入限期等待(Timed Waiting),不用其他线程唤醒,时间到了会被系统唤醒
LockSupport.parkNanos(this, nanos);
}
}
}
}
通过上述源码我们可以看出SynchronousQueue本身没有容量存储元素,但是它是通过管理提交操作的线程队列来实现阻塞队列的
SynchronousQueue可以实现控制线程先进先出进行排序,也就是先被挂起的线程先被唤醒,这个内部是通过链表来实现的。SynchronousQueue默认是不保证证唤醒的顺序的
SynchronousQueue的不带超时时间的offer和poll方法不会挂起线程,而take和put方法可能会挂起线程。
SynchronousQueue一个典型的应用场景是线程池newCachedThreadPool,从上面的源码可以看出,如果入队操作和出队操作的处理速度相差比较大的话有可能会创建大量线程,有耗尽内存的风险.
详细分析
与其他BlockingQueue一样,SynchronousQueue同样继承AbstractQueue和实现BlockingQueue接口:
public class SynchronousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
SynchronousQueue提供了两个构造函数:
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
// 通过 fair 值来决定公平性和非公平性
// 公平性使用TransferQueue,非公平性采用TransferStack
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
TransferQueue、TransferStack继承Transferer,Transferer为SynchronousQueue的内部类,它提供了一个方法transfer(),该方法定义了转移数据的规范,如下:
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}
transfer()方法主要用来完成转移数据的,如果e != null,相当于将一个数据交给消费者,如果e == null,则相当于从一个生产者接收一个消费者交出的数据。
SynchronousQueue采用队列TransferQueue来实现公平性策略,采用堆栈TransferStack来实现非公平性策略,他们两种都是通过链表实现的,其节点分别为QNode,SNode。TransferQueue和TransferStack在SynchronousQueue中扮演着非常重要的作用,SynchronousQueue的put、take操作都是委托这两个类来实现的。
TransferQueue
TransferQueue是实现公平性策略的核心类,其节点为QNode,其定义如下:
/**
* 这是一个非常典型的 queue , 它有如下的特点
* 1. 整个队列有 head, tail 两个节点
* 2. 队列初始化时会有个 dummy 节点
* 3. 这个队列的头节点是个 dummy 节点/ 或 哨兵节点, 所以操作的总是队列中的第二个节点(AQS的设计中也是这也)
*/
/** 头节点 */
transient volatile QNode head;
/** 尾节点 */
transient volatile QNode tail;
/**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was last inserted node
* when it was cancelled
*/
/**
* 对应 中断或超时的 前继节点,这个节点存在的意义是标记, 它的下个节点要删除
* 何时使用:
* 当你要删除 节点 node, 若节点 node 是队列的末尾, 则开始用这个节点,
* 为什么呢?
* 大家知道 删除一个节点 直接 A.CASNext(B, B.next) 就可以,但是当 节点 B 是整个队列中的末尾元素时,
* 一个线程删除节点B, 一个线程在节点B之后插入节点 这样操作容易致使插入的节点丢失, 这个cleanMe很像
* ConcurrentSkipListMap 中的 删除添加的 marker 节点, 他们都是起着相同的作用
*/
transient volatile QNode cleanMe;
TransferQueue(){
/**
* 构造一个 dummy node, 而整个 queue 中永远会存在这样一个 dummy node
* dummy node 的存在使得 代码中不存在复杂的 if 条件判断
*/
QNode h = new QNode(null, false);
head = h;
tail = h;
}
/**
* 推进 head 节点,将 老节点的 oldNode.next = this, help gc,
* 这种和 ConcurrentLinkedQueue 中一样
*/
void advanceHead(QNode h, QNode nh){
if(h == head && unsafe.compareAndSwapObject(this, headOffset, h, nh)){
h.next = h; // forget old next help gc
}
}
/** 更新新的 tail 节点 */
void advanceTail(QNode t, QNode nt){
if(tail == t){
unsafe.compareAndSwapObject(this, tailOffset, t, nt);
}
}
/** CAS 设置 cleamMe 节点 */
boolean casCleanMe(QNode cmp, QNode val){
return cleanMe == cmp && unsafe.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}
在TransferQueue中定义了QNode类来表示队列中的节点,QNode节点定义如下:
static final class QNode {
// next 域
volatile QNode next;
// item数据项
volatile Object item;
// 等待线程,用于park/unpark
volatile Thread waiter; // to control park/unpark
//模式,表示当前是数据还是请求,只有当匹配的模式相匹配时才会交换
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
...
isData,该属性在进行数据交换起到关键性作用,两个线程进行数据交换的时候,必须要两者的模式保持一致。
公平模式 TransferQueue transfer方法
若队列为空 / 队列中的尾节点和自己的 类型相同, 则添加 node
到队列中, 直到 timeout/interrupt/其他线程和这个线程匹配
timeout/interrupt awaitFulfill方法返回的是 node 本身
匹配成功的话, 要么返回 null (producer返回的), 或正真的传递值 (consumer 返回的)队列不为空, 且队列的 head.next 节点是当前节点匹配的节点,
进行数据的传递匹配, 并且通过 advanceHead 方法帮助 先前 block 的节点 dequeue
/**
* Puts or takes an item
* 主方法
*
* @param e if non-null, the item to be handed to a consumer;
* if null, requests that transfer return an item
* offered by producer.
* @param timed if this operation should timeout
* @param nanos the timeout, in nanosecond
* @return
*/
@Override
E transfer(E e, boolean timed, long nanos) {
/**
* Basic algorithm is to loop trying to take either of
* two actions:
*
* 1. If queue apparently empty or holding same-mode nodes,
* try to add node to queue of waiters, wait to be
* fulfilled (or cancelled) and return matching item.
*
* 2. If queue apparently contains waiting items, and this
* call is of complementary mode, try to fulfill by CAS'ing
* item field of waiting node and dequeuing it, and then
* returning matching item.
*
* In each case, along the way, check for gurading against
* seeing uninitialized head or tail value. This never
* happens in current SynchronousQueue, but could if
* callers held non-volatile/final ref to the
* transferer. The check is here anyway because it places
* null checks at top of loop, which is usually faster
* than having them implicity interspersed
*
* 这个 producer / consumer 的主方法, 主要分为两种情况
*
* 1. 若队列为空 / 队列中的尾节点和自己的 类型相同, 则添加 node
* 到队列中, 直到 timeout/interrupt/其他线程和这个线程匹配
* timeout/interrupt awaitFulfill方法返回的是 node 本身
* 匹配成功的话, 要么返回 null (producer返回的), 或正真的传递值 (consumer 返回的)
*
* 2. 队列不为空, 且队列的 head.next 节点是当前节点匹配的节点,
* 进行数据的传递匹配, 并且通过 advanceHead 方法帮助 先前 block 的节点 dequeue
*/
QNode s = null; // constrcuted/reused as needed
boolean isData = (e != null); // 1.判断 e != null 用于区分 producer 与 consumer
for(;;){
QNode t = tail;
QNode h = head;
if(t == null || h == null){ // 2. 数据未初始化, continue 重来
continue; // spin
}
if(h == t || t.isData == isData){ // 3. 队列为空, 或队列尾节点和自己相同 (注意这里是和尾节点比价, 下面进行匹配时是和 head.next 进行比较)
QNode tn = t.next;
if(t != tail){ // 4. tail 改变了, 重新再来
continue;
}
if(tn != null){ // 5. 其他线程添加了 tail.next, 所以帮助推进 tail
advanceTail(t, tn);
continue;
}
if(timed && nanos <= 0){ // 6. 调用的方法的 wait 类型的, 并且 超时了, 直接返回 null, 直接见 SynchronousQueue.poll() 方法,说明此 poll 的调用只有当前队列中正好有一个与之匹配的线程在等待被【匹配才有返回值
return null;
}
if(s == null){
s = new QNode(e, isData); // 7. 构建节点 QNode
}
if(!t.casNext(null, s)){ // 8. 将 新建的节点加入到 队列中
continue;
}
advanceTail(t, s); // 9. 帮助推进 tail 节点
Object x = awaitFulfill(s, e, timed, nanos); // 10. 调用awaitFulfill, 若节点是 head.next, 则进行一些自旋, 若不是的话, 直接 block, 知道有其他线程 与之匹配, 或它自己进行线程的中断
if(x == s){ // 11. 若 (x == s)节点s 对应额线程 wait 超时 或线程中断, 不然的话 x == null (s 是 producer) 或 是正真的传递值(s 是 consumer)
clean(t, s); // 12. 对接点 s 进行清除, 若 s 不是链表的最后一个节点, 则直接 CAS 进行 节点的删除, 若 s 是链表的最后一个节点, 则 要么清除以前的 cleamMe 节点(cleamMe != null), 然后将 s.prev 设置为 cleanMe 节点, 下次进行删除 或直接将 s.prev 设置为cleanMe
return null;
}
if(!s.isOffList()){ // 13. 节点 s 没有 offlist
advanceHead(t, s); // 14. 推进head 节点, 下次就调用 s.next 节点进行匹配(这里调用的是 advanceHead, 因为代码能执行到这边说明s已经是 head.next 节点了)
if(x != null){ // and forget fields
s.item = s;
}
s.waiter = null; // 15. 释放线程 ref
}
return (x != null) ? (E)x :e;
}else{ // 16. 进行线程的匹配操作, 匹配操作是从 head.next 开始匹配 (注意 队列刚开始构建时 有个 dummy node, 而且 head 节点永远是个 dummy node 这个和 AQS 中一样的)
QNode m = h.next; // 17. 获取 head.next 准备开始匹配
if(t != tail || m == null || h != head){
continue; // 18. 不一致读取, 有其他线程改变了队列的结构inconsistent read
}
/** producer 和 consumer 匹配操作
* 1. 获取 m的 item (注意这里的m是head的next节点
* 2. 判断 isData 与x的模式是否匹配, 只有produce与consumer才能配成一对
* 3. x == m 判断是否 节点m 是否已经进行取消了, 具体看(QNOde#tryCancel)
* 4. m.casItem 将producer与consumer的数据进行交换 (这里存在并发时可能cas操作失败的情况)
* 5. 若 cas操作成功则将h节点dequeue
*
* 疑惑: 为什么将h进行 dequeue, 而不是 m节点
* 答案: 因为每次进行配对时, 都是将 h 是个 dummy node, 正真的数据节点 是 head.next
*/
Object x = m.item;
if(isData == (x != null) || // 19. 两者的模式是否匹配 (因为并发环境下 有可能其他的线程强走了匹配的节点)
x == m || // 20. m 节点 线程中断或者 wait 超时了
!m.casItem(x, e) // 21. 进行 CAS 操作 更改等待线程的 item 值(等待的有可能是 concumer / producer)
){
advanceHead(h, m); // 22.推进 head 节点 重试 (尤其 21 操作失败)
continue;
}
advanceHead(h, m); // 23. producer consumer 交换数据成功, 推进 head 节点
LockSupport.unpark(m.waiter); // 24. 换线等待中的 m 节点, 而在 awaitFulfill 方法中 因为 item 改变了, 所以 x != e 成立, 返回
return (x != null) ? (E)x : e; // 25. 操作到这里若是 producer, 则 x != null, 返回 x, 若是consumer, 则 x == null,.返回 producer(其实就是 节点m) 的 e
}
}
}
-
队列初始化的时候,只有一个空的Node。
-
此时,一个线程尝试 offer 或者 poll数据,都会插入一个 Node 插入到节点中。
-
假设刚刚发生的是 offer 操作,这个时候,另一个线程也来 offer,这时就会有 2 个节点。
-
这个时候,队列中有 2 个有真实数据(offer 操作)的节点了,注意,这个时候,那 2 个线程都是 wait的,因为没有人接受他们的数据。此时,又来一个线程,做 poll 操作。
从上图可以看出,poll 线程从head 开始取数据,因为它的 isData 和 tail 节点的 isData 不同,那么就会从 head 开始找节点,并尝试将自己的 null 值和节点中的真实数据进行交换。并唤醒等待中的线程
这 4 幅图就是 SynchronousQueue的精华。
既然叫做同步队列,一定是 A 线程生产数据的时候,有 B 线程在消费,否则 A 线程就需要等待,反之,如果 A 线程准备消费数据,但队列中没有数据,线程也会等待,直到有 B 线程存放数据。
而 JDK 的实现原理则是:使用一个队列,队列中的用一个 isData 来区分生产还是消费,所有新操作都根据 tail 节点的模式来决定到底是追加到 tail节点还是和 tail节点(从 head 开始)交换数据。
而所谓的交换是从head开始,取出节点的实际数据,然后使用 CAS 和匹配到的节点进行交换。从而完成两个线程直接交换数据的操作。
为什么他在某些情况下,比LinkedBlockingQueue性能高呢?其中有个原因就是没有使用锁,减少了线程上下文切换。第二则是线程之间交换数据的方式更加的高效。
好,重点部分讲完了,再看看其中线程是如何等待的。逻辑在 awaitFulfill 方法中:
// 自旋或者等待,直到填充完毕
// 这里的策略是什么呢?如果自旋次数不够了,通常是 16 次,但还有超过 1 秒的时间,就阻塞等待被唤醒。
// 如果时间到了,就取消这次的入队行为。
// 返回的是 Node 本身
// s.item 就是 e
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = ((head.next == s) ?// 如果成功将 tail.next 覆盖了 tail,如果有超时机制,则自旋 32 次,如果没有超时机制,则自旋 32 *16 = 512次
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())// 当前线程被中断
s.tryCancel(e);// 尝试取消这个 item
Object x = s.item;// 获取到这个 tail 的 item
if (x != e) // 如果不相等,说明 node 中的 item 取消了,返回这个 item。
// 这里是唯一停止循环的地方。当 s.item 已经不是当初的哪个 e 了,说明要么是时间到了被取消了,要么是线程中断被取消了。
// 当然,不仅仅只有这2种 “意外” 情况,还有一种情况是:当另一个线程拿走了这个数据,并修改了 item,也会通过这个判断,返回被“修改”过的 item。
return x;
if (timed) {// 如果有时间限制
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {// 如果时间到了
s.tryCancel(e);// 尝试取消 item,供上面的 x != e 判断
continue;// 重来
}
}
if (spins > 0)// 如果还有自旋次数
--spins;// 减一
else if (s.waiter == null)// 如果自旋不够,且 tail 的等待线程还没有赋值
s.waiter = w;// 当前线程赋值给 tail 的等待线程
else if (!timed)// 如果自旋不够,且如果线程赋值过了,且没有限制时间,则 wait,(危险操作)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)// 如果自旋不够,且如果限制了时间,且时间还剩余超过 1 秒,则 wait 剩余时间。
// 主要目的就是等待,等待其他线程唤醒这个节点所在的线程。
LockSupport.parkNanos(this, nanos);
}
}
该方法逻辑如下:
默认自旋 32 次,如果没有超时机制,则 512 次。
如果时间到了,或者线程被中断,则取消这次的操作,将item设置成自己。供后面判断。
如果自旋结束,且剩余时间还超过 1 秒,则阻塞等待至剩余时间。
当线程被其他的线程唤醒,说明数据被交换了。则 return,返回的是交换后的数据。
总结下来就是:
JDK 使用了队列或者栈来实现公平或非公平模型。其中,isData 属性极为重要,标识这这个线程的这次操作,决定了他到底应该是追加到队列中,还是从队列中交换数据。
每个线程在没有遇到自己的另一半时,要么快速失败,要么进行阻塞,阻塞等待自己的另一半来,至于对方是给数据还是取数据,取决于她自己,如果她是消费者,那么他就是生产者。
我们梳理一下一般性的流程:
一开始整个queue为空, 线程直接封装成QNode, 通过 awaitFulfill 方法进入自旋等待状态, 除非超时或线程中断, 不然一直等待, 直到有线程与之匹配
下个再来的线程若isData与尾节点一样, 则进行第一步, 不然进行数据转移(步骤 21), 然后 unpark 等待的线程
等待的线程被唤醒, 从awaitFulfill方法返回, 最后将结果返回
公平模式 TransferQueue awaitFulfill
/**
* Spins/blocks until node s is fulfilled
*
* 主逻辑: 若节点是 head.next 则进行 spins 一会, 若不是, 则调用 LockSupport.park / parkNanos(), 直到其他的线程对其进行唤醒
*
* @param s the waiting node
* @param e the comparsion value for checking match
* @param timed true if timed wait
* @param nanos timeout value
* @return matched item, or s of cancelled
*/
Object awaitFulfill(QNode s, E e, boolean timed, long nanos){
final long deadline = timed ? System.nanoTime() + nanos : 0L;// 1. 计算 deadline 时间 (只有 timed 为true 时才有用)
Thread w = Thread.currentThread(); // 2. 获取当前的线程
int spins = ((head.next == s) ? // 3. 若当前节点是 head.next 时才进行 spin, 不然的话不是浪费 CPU 吗, 对挖
(timed ? maxTimeSpins : maxUntimedSpins) : 0);
for(;;){ // loop 直到 成功
if(w.isInterrupted()){ // 4. 若线程中断, 直接将 item = this, 在 transfer 中会对返回值进行判断 (transfer中的 步骤 11)
s.tryCancel(e);
}
Object x = s.item;
if(x != e){ // 5. 在进行线程阻塞->唤醒, 线程中断, 等待超时, 这时 x != e,直接return 回去
return x;
}
if(timed){
nanos = deadline - System.nanoTime();
if(nanos <= 0L){ // 6. 等待超时, 改变 node 的item值, 进行 continue, 下一步就到 awaitFulfill的第 5 步 -> return
s.tryCancel(e);
continue;
}
}
if(spins > 0){ // 7. spin 一次一次减少
--spins;
}
else if(s.waiter == null){
s.waiter = w;
}
else if(!timed){ // 8. 进行没有超时的 park
LockSupport.park(this);
}
else if(nanos > spinForTimeoutThreshold){ // 9. 自旋次数过了, 直接 + timeout 方式 park
LockSupport.parkNanos(this, nanos);
}
}
}
梳理逻辑:
- 计算timeout时间(若 time = true)
- 判断 当前节点是否是 head.next 节点(queue中有个dummy node 的存在, AQS 中也是这样), 若是的话就进行 spin 的赋值, 其他的节点没有这个需要, 浪费资源
- 接下来就是自旋, 超过次数就进行阻塞, 直到有其他线程唤醒, 或线程中断(这里线程中断返回的是 Node 自己)
说一下大致的操作。在transfer中,把操作分为两种,一种就是入队put,一种是出队take,入队的时候会创建data节点,值为data。出队的时候会创建一个request节点,值为null。
put和take操作都会调用该方法,区别在于,put操作的时候e值为数据data,take操作的时候e值为null
如果h==t也就是队列为空,或者当前队列尾部的数据类型和调用该方法的数据类型一致:比如当前队列为空,第一次来了一个入队请求,这时候队列就会创建出一个data节点,如果第二次又来了一个入队请求(和第一次也就是队列尾部的数据类型一致,都是入队请求),这时候队列会创建出第二个data节点,并形成一个链表。同理,如果刚开始来了request请求,也会入队,之后如果继续来了一个reqeust请求,也会继续入队!
满足2的条件,就会进入3,中间会有一些一致性检查这也是必须的,避免产生并发冲突。3会创建出一个节点,根据e值的不同,可能是data节点或者request节点。
把3中创建的节点通过cas方式设置到队列尾部去。
把tail通过cas方式修改成3中新建立的s节点
-
调用方法awaitFulfill进行等待,如果3中创建的是data节点,那么就会等待来一个reqeust节点,反之亦然!
6.1 放入队列之后就开始进行循环判断
6.2 终止条件是节点的值被修改,具体如果是data节点,那么会被修改成null,如果是request节点,那么会被修改成data值。这个修改是在第9步中由相对的请求(如果创建的是data节点,那么就由reqeust请求来进行修改,反之亦然)来做的。如果一直没有相对的请求过来,那么节点的值就一直不会被修改,这样就跳不出循环体!
6.3 如果没有被修改,那么就需要进入park休眠,等待第9步进行修改后再通过unpark进行唤醒,唤醒之后就会判断节点值被修改从而返回。 如果在插入一个节点的时候,不满足2的条件,也就是队列不为空并且尾部节点和当前要插入节点的类型不一样(这就代表来了一个相对请求),比如上图中的尾部是data节点,如果来了一个插入reqeust节点的请求,那么就会走到7这里
由于是队列,先进先出,所以会取队列里面的第一个节点,也就是h.nex
把8中取出的节点的值通过cas的方式设置成新来节点的e值,这样就成功的满足了6-2的终止条件
将head节点往后移动,这样就把第一个节点成功的出队。
-
每个节点都保存了对应的操作线程,将8中节点对应的线程进行唤醒,这样6-3处于休眠的线程就醒来了,然后继续进行for循环,进而判断6-2终止条件满足,于是返回
example
package com.hust.grid.leesf.collections;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueDemo {
public static void main(String[] args) {
SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
Producer p1 = new Producer("p1", queue, 10);
Producer p2 = new Producer("p2", queue, 50);
Consumer c1 = new Consumer("c1", queue);
Consumer c2 = new Consumer("c2", queue);
c1.start();
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
c2.start();
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
p1.start();
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
p2.start();
}
static class Producer extends Thread {
private SynchronousQueue<Integer> queue;
private int n;
public Producer(String name, SynchronousQueue<Integer> queue, int n) {
super(name);
this.queue = queue;
this.n = n;
}
public void run() {
System.out.println(getName() + " offer result " + queue.offer(n));
}
}
static class Consumer extends Thread {
private SynchronousQueue<Integer> queue;
public Consumer(String name, SynchronousQueue<Integer> queue) {
super(name);
this.queue = queue;
}
public void run() {
try {
System.out.println(getName() + " take result " + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
输出
p1 offer result true
c2 take result 10
p2 offer result true
c1 take result 50
说明:该示例中,有两个生产者p1、p2和两个消费者c1、c2,按照c1、c2、p1、p2的顺序启动,并且每个线程启动后休眠100ms,则可能有如下的时序图
根据源码可知,此SynchronousQueue采用非公平策略,即底层采用栈结构。
① c1执行take操作,主要的函数调用如下
说明:其中,c1线程进入awaitFulfill后,会空旋等待,直到空旋时间消逝,会调用LockSupport.park函数,会禁用当前线程(c1),直至许可可用。
② c1执行take操作,主要的函数调用如下
③ p1线程执行offer(10)操作,主要的函数调用如下
在执行offer(10)操作后,c2线程所在的结点与头结点进行了匹配(头结点生产数据,c2线程所在的结点消费数据),c2线程被unpark,可以继续运行,而c1线程还是被park中(非公平策略)
③ c2线程被unpark后,继续运行,主要函数调用如下(由于c2线程是在awaitFulfill函数中被park的,所以,恢复也是在awaitFulfill函数中)
c2线程从unpark恢复时,结构如上图所示,先从awaitFulfill函数中返回,然后再从transfer函数中返回10,再从take函数中返回10。
④ p2线程执行offer(50)操作,主要的函数调用如下
说明:在执行offer(50)操作后,c1线程所在的结点与头结点进行了匹配(头结点生产数据,c1线程所在的结点消费数据),c1线程被unpark,可以继续运行。
⑤ c1线程被unpark后,继续运行,主要函数调用如下(由于c1线程是在awaitFulfill函数中被park的,所以,恢复也是在awaitFulfill函数中)
说明:c1线程从unpark恢复时,结构如上图所示,先从awaitFulfill函数中返回,然后再从transfer函数中返回50,再从take函数中返回50。
上述是使用非公平策略的结果(首先匹配c2线程所在的结点,之后再匹配c1线程所在结点)。
修改源码,还是使用非公平策略,只是改变c1、c2、p1、p2之间的启动顺序。更改为p1->c1->p2->c2。
package com.hust.grid.leesf.collections;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueueDemo {
public static void main(String[] args) {
SynchronousQueue<Integer> queue = new SynchronousQueue<Integer>();
Producer p1 = new Producer("p1", queue, 10);
Producer p2 = new Producer("p2", queue, 50);
Consumer c1 = new Consumer("c1", queue);
Consumer c2 = new Consumer("c2", queue);
p1.start();
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
c1.start();
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
p2.start();
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
c2.start();
}
static class Producer extends Thread {
private SynchronousQueue<Integer> queue;
private int n;
public Producer(String name, SynchronousQueue<Integer> queue, int n) {
super(name);
this.queue = queue;
this.n = n;
}
public void run() {
System.out.println(getName() + " offer result " + queue.offer(n));
}
}
static class Consumer extends Thread {
private SynchronousQueue<Integer> queue;
public Consumer(String name, SynchronousQueue<Integer> queue) {
super(name);
this.queue = queue;
}
public void run() {
try {
System.out.println(getName() + " take result " + queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
p1 offer result false
p2 offer result true
c1 take result 50
说明:此时,只有c1线程得到了匹配,p1线程存放元素,直接返回的false,因为此时没有消费者线程等待,而p2线程与c1线程进行了匹配,p2线程存放元素成功,c1线程获取元素成功,并且此时,c2线程还是处于park状态,此时应用程序无法正常结束。所以,可知,必须要先有取操作,然后存操作,两者才能正确的匹配,若先是存操作,然后再是取操作,此时无法匹配成功,会阻塞,取操作期待下一个存操作进行匹配。