1. Queue接口 - 队列
public interface Queue<E>
extends Collection<E>
- Collection的子接口,表示队列FIFO(First In First Out)
常用方法:
(1)抛出异常
boolean add(E e) // 顺序添加1个元素(到达上限后,再添加则会抛出异常)
E remove() // 获得第1个元素并移除(如果队列没有元素时,则抛异常)
E element() // 获得第1个元素但不移除(如果队列没有元素时,则抛异常)
(2)返回特殊值【推荐】
boolean offer(E e) // 顺序添加1个元素(到达上限后,再添加则会返回false)
E poll() // 获得第1个元素并移除(如果队列没有元素时,则返回null)
E keep() // 获得第1个元素但不移除(如果队列没有元素时,则返回null)
1.1 ConcurrentLinkedQueue类(线程安全)
public class ConcurrentLinkedQueue<E>
extends AbstractQueue<E>
implements Queue<E>, Serializable
说明:
- 线程安全、可高效读写的队列,高并发下性能最好的队列;
- 无锁、CAS比较交换算法,修改的方法包含3个核心参数(V,E,N);
- V:要更新的变量、E:预期值、N:新值
- 只有当V==E时,V=N;否则表示已被更新过,则取消当前操作。
使用示例:
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class TestQueue {
public static void main(String[] args) {
// 列表:尾部追加 - add...
// 链表:头尾添加 - addFirst/addLast
// 队列:先进先出(FIFO) - offer...
// >>> 以上三种的对应成员方法,切记不能混用!会打乱已知规则。
LinkedList<String> link = new LinkedList<String>();
//Queue<String> link = new LinkedList<String>(); // 强制LinkedList遵循队列的规则
link.offer("A"); // offer用的是FIFO队列方式
link.offer("B");
link.offer("C");
// 用列表的方式打乱了FIFO队列的规则
link.add(0, "D");
System.out.println(link.peek()); // D
// 线程安全的队列Queue
// 严格遵循队列规则,线程安全,采用CAS交换算法
Queue<String> q = new ConcurrentLinkedQueue<String>();
// 1.抛出异常的 2.返回结果的
q.offer("A");
q.offer("B");
q.offer("C");
q.poll(); // 删除表头,表头更新为B
System.out.println(q.peek()); // 获取表头,此时为B
}
}
2. BlockingQueue接口 - 阻塞队列
public interface BlockingQueue<E>
extends Queue<E>
常用方法:
void put(E e) // 将指定元素插入此队列中,如果没有可用空间,则死等
E take() // 获取并移除此队列头部元素,如果没有可用元素,则死等
说明:
- Queue的子接口,阻塞的队列,增加了两个线程状态为无限期等待的方法
- 可用于解决生产者、消费者问题
2.1 ArrayBlockingQueue类(有界阻塞队列)
- 数组结构实现,有界队列。手工固定上限
BlockingQueue<String> abq = new ArrayBlockingQueue<String>(3);
2.2 LinkedBlockingQueue类(无界阻塞队列)
- 链表结构实现,无界队列。默认上限Integer.MAX_VALUE
BlockingQueue<String> lbq = new LinkedBlockingQueue<String>();
3. 源码:BlockingQueue实现生产者消费者模式
BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是await()/signal()方法。它可以在生成对象时指定容量大小,用于阻塞操作的是put()和take()方法。
- put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
- take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。
import java.util.concurrent.LinkedBlockingQueue;
public class TestProduceAndCustomer2 {
public static void main(String[] args) {
StorageQ s = new StorageQ();
Thread p1 = new Thread(new ProducerQ(s), "A厂");
Thread p2 = new Thread(new ProducerQ(s), "B厂");
Thread p3 = new Thread(new ProducerQ(s), "C厂");
Thread c1 = new Thread(new CustomerQ(s), "a人");
Thread c2 = new Thread(new CustomerQ(s), "b人");
Thread c3 = new Thread(new CustomerQ(s), "c人");
p1.start();
p2.start();
p3.start();
c1.start();
c2.start();
c3.start();
}
}
// 仓库 - 共享资源对象
class StorageQ {
// 仓库存储的载体 - 使用无界阻塞队列,也可指定容量大小。
private LinkedBlockingQueue<Object> lbq = new LinkedBlockingQueue<>(10);
public StorageQ() {
super();
}
public StorageQ(LinkedBlockingQueue<Object> lbq) {
super();
this.lbq = lbq;
}
public LinkedBlockingQueue<Object> getLbq() {
return lbq;
}
public void setLbq(LinkedBlockingQueue<Object> lbq) {
this.lbq = lbq;
}
// 生产
public void produce() {
try{
lbq.put(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + lbq.size());
} catch (InterruptedException e){
e.printStackTrace();
}
}
// 消费
public void custome() {
try{
lbq.take();
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费了一个产品,现库存" + lbq.size());
} catch (InterruptedException e){
e.printStackTrace();
}
}
}
// 生产者
class ProducerQ implements Runnable {
private StorageQ s;
public ProducerQ() {}
public ProducerQ(StorageQ s) {
this.s = s;
}
public void run() {
while (true) {
try {
Thread.sleep((int) (Math.random() * 2000));
this.s.produce(); // 没满 + 可锁 = 生产+1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 消费者
class CustomerQ implements Runnable {
private StorageQ s;
public CustomerQ() {}
public CustomerQ(StorageQ s) {
this.s = s;
}
public void run() {
while (true) {
try {
Thread.sleep((int) (Math.random() * 2000));
this.s.custome(); // 不空 + 可锁 = 消费-1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
运行结果: