Java中Queue、BlockingQueue和队列实现生产者消费者模式

1. Queue接口 - 队列

public interface Queue<E> 
    extends Collection<E>
  • Collection的子接口,表示队列FIFO(First In First Out)
    常用方法:
    (1)抛出异常
    boolean add(E e) // 顺序添加1个元素(到达上限后,再添加则会抛出异常)
    E remove() // 获得第1个元素并移除(如果队列没有元素时,则抛异常)
    E element() // 获得第1个元素但不移除(如果队列没有元素时,则抛异常)
    (2)返回特殊值【推荐】
    boolean offer(E e) // 顺序添加1个元素(到达上限后,再添加则会返回false)
    E poll() // 获得第1个元素并移除(如果队列没有元素时,则返回null)
    E keep() // 获得第1个元素但不移除(如果队列没有元素时,则返回null)

1.1 ConcurrentLinkedQueue类(线程安全)

public class ConcurrentLinkedQueue<E> 
    extends AbstractQueue<E> 
    implements Queue<E>, Serializable

说明:

  • 线程安全、可高效读写的队列,高并发下性能最好的队列
  • 无锁、CAS比较交换算法,修改的方法包含3个核心参数(V,E,N);
  • V:要更新的变量、E:预期值、N:新值
  • 只有当V==E时,V=N;否则表示已被更新过,则取消当前操作。

使用示例:

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class TestQueue {
      public static void main(String[] args) {
            // 列表:尾部追加 - add...
            // 链表:头尾添加 - addFirst/addLast
            // 队列:先进先出(FIFO) - offer...
            // >>> 以上三种的对应成员方法,切记不能混用!会打乱已知规则。
            LinkedList<String> link = new LinkedList<String>();
            //Queue<String> link = new LinkedList<String>(); // 强制LinkedList遵循队列的规则
            link.offer("A"); // offer用的是FIFO队列方式
            link.offer("B");
            link.offer("C");
            // 用列表的方式打乱了FIFO队列的规则
            link.add(0, "D");
            System.out.println(link.peek()); // D
            
            // 线程安全的队列Queue
            // 严格遵循队列规则,线程安全,采用CAS交换算法
            Queue<String> q = new ConcurrentLinkedQueue<String>();
            // 1.抛出异常的 2.返回结果的
            q.offer("A");
            q.offer("B");
            q.offer("C");
            
            q.poll(); // 删除表头,表头更新为B
            
            System.out.println(q.peek()); // 获取表头,此时为B
      }
}

2. BlockingQueue接口 - 阻塞队列

public interface BlockingQueue<E> 
    extends Queue<E>

常用方法:
void put(E e) // 将指定元素插入此队列中,如果没有可用空间,则死等
E take() // 获取并移除此队列头部元素,如果没有可用元素,则死等
说明:

  • Queue的子接口,阻塞的队列,增加了两个线程状态为无限期等待的方法
  • 可用于解决生产者、消费者问题

2.1 ArrayBlockingQueue类(有界阻塞队列)

  • 数组结构实现,有界队列。手工固定上限
BlockingQueue<String> abq = new ArrayBlockingQueue<String>(3);

2.2 LinkedBlockingQueue类(无界阻塞队列)

  • 链表结构实现,无界队列。默认上限Integer.MAX_VALUE
BlockingQueue<String> lbq = new LinkedBlockingQueue<String>();

3. 源码:BlockingQueue实现生产者消费者模式

BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是await()/signal()方法。它可以在生成对象时指定容量大小,用于阻塞操作的是put()和take()方法。

  • put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞
  • take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞
import java.util.concurrent.LinkedBlockingQueue;
public class TestProduceAndCustomer2 {
    public static void main(String[] args) {
        StorageQ s = new StorageQ();
        Thread p1 = new Thread(new ProducerQ(s), "A厂");
        Thread p2 = new Thread(new ProducerQ(s), "B厂");
        Thread p3 = new Thread(new ProducerQ(s), "C厂");

        Thread c1 = new Thread(new CustomerQ(s), "a人");
        Thread c2 = new Thread(new CustomerQ(s), "b人");
        Thread c3 = new Thread(new CustomerQ(s), "c人");
        p1.start();
        p2.start();
        p3.start();
        c1.start();
        c2.start();
        c3.start();
    }
}

// 仓库 - 共享资源对象
class StorageQ {
    // 仓库存储的载体 - 使用无界阻塞队列,也可指定容量大小。
    private LinkedBlockingQueue<Object> lbq = new LinkedBlockingQueue<>(10);
    public StorageQ() {
        super();
    }
    public StorageQ(LinkedBlockingQueue<Object> lbq) {
        super();
        this.lbq = lbq;
    }
    public LinkedBlockingQueue<Object> getLbq() {
        return lbq;
    }
    public void setLbq(LinkedBlockingQueue<Object> lbq) {
        this.lbq = lbq;
    }

    // 生产
    public void produce() {
        try{
            lbq.put(new Object());
            System.out.println("【生产者" + Thread.currentThread().getName()
                    + "】生产一个产品,现库存" + lbq.size());
        } catch (InterruptedException e){
            e.printStackTrace();
        }
    }

    
    // 消费
    public void custome() {
        try{
            lbq.take();
            System.out.println("【消费者" + Thread.currentThread().getName()
                    + "】消费了一个产品,现库存" + lbq.size());
        } catch (InterruptedException e){
            e.printStackTrace();
        }
    }
}

// 生产者
class ProducerQ implements Runnable {
    private StorageQ s;
    public ProducerQ() {}
    public ProducerQ(StorageQ s) {
        this.s = s;
    }
    public void run() {
        while (true) {
            try {
                Thread.sleep((int) (Math.random() * 2000));
                this.s.produce();  // 没满 + 可锁 = 生产+1
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

// 消费者
class CustomerQ implements Runnable {
    private StorageQ s;
    public CustomerQ() {}
    public CustomerQ(StorageQ s) {
        this.s = s;
    }
    public void run() {
        while (true) {
            try {
                Thread.sleep((int) (Math.random() * 2000));
                this.s.custome(); // 不空 + 可锁 = 消费-1
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果:


Java队列实现生产者与消费者
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容

  • 阻塞队列提供了可阻塞的put 和take方法, 以及支持定时的offer和poll方法。如果队列已经满了, 那么p...
    好好学习Sun阅读 1,658评论 0 4
  • 引言 JDK中除了上文提到的各种并发容器,还提供了丰富的阻塞队列。阻塞队列统一实现了BlockingQueue接口...
    小刀爱编程阅读 552评论 0 0
  • 1、简介 Queue(队列):一种特殊的线性表,它只允许在表的前端(front)进行删除操作,只允许在表的后端(r...
    瑜小贤阅读 531评论 1 0
  • java笔记第一天 == 和 equals ==比较的比较的是两个变量的值是否相等,对于引用型变量表示的是两个变量...
    jmychou阅读 1,483评论 0 3
  • 一直都知道自己是个没有智商的人,活到今天才发觉我连情商都少得可怜;从小是个伶牙俐齿的人,谁知现在说话都不...
    猪麦兜阅读 119评论 0 0