概述
生产者-消费者模型是多线程编程中的一个经典模型,主要描述的是生产者和消费者在同一时间段内共用同一块存储空间(通常也称为缓冲区,类似于仓库的概念),工作时,生产者向缓冲区去存放数据,而消费者则从缓冲区中取走数据。
特点
- 生产者在生产的时候,消费者不能消费,需要等待。
- 消费者在消费的时候,生产者不能生产,需要等待。
- 当缓冲区已满时,生产者暂停生产,自动阻塞;此时需要通知消费者去消费。
- 当缓冲区为空时,消费者暂停消费,自动阻塞;此时需要通知生产者去生产。
解决生产者/消费者问题的方法
- 采用某种机制维护生产者和消费者之间的同步。比如:
<1>. 上述特点1和特点2需要通过同步来解决,比如synchronize,Lock等 <2>. 上述特点3和特点4则需要涉及到线程间的通信了,生产者线程生产数据放入缓冲区后,通知消费者 线程取出数据;消费者线程取出数据后,通知生产者生产数据,比如可以利用wait()/notify()机制来实现。
- 在生产者和消费者之间建立一个管道
以上第一种方式比较常用,第二种方式见得不多。
实现方式
- wait()/notify()方法
- await()/signal()方法
- BlockingQueue阻塞队列
- PipedInputStream / PipedOutputStream 管道流
以上前三个是同步机制实现,第四个是管道方式,目前不太清楚。
1. wait()/notify()方法
wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,并且释放锁资源,使得自己处于等待(阻塞)状态,让其他线程执行。
notify()方法:当生产者/消费者向缓冲区中放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时释放锁资源,使得自己处于等待状态。
Storage.java仓库类:
package com.feizi.java.concurrency.model.one;
import java.util.LinkedList;
/**
* wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,并且释放锁资源,使得自己处于等待(阻塞)状态,让其他线程执行。
* notify()方法:当生产者/消费者向缓冲区中放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时释放锁资源,使得自己处于等待状态。
* Created by feizi on 2018/5/28.
*/
public class Storage {
//仓库最大存储量
private static final int MAX_COUNT = 10;
//仓库存储的载体
private LinkedList<Object> list = new LinkedList<>();
/**
* 生产产品-同步方法
* @param producer
*/
public synchronized void produce(String producer){
while (list.size() == MAX_COUNT){
System.out.println("【仓库已满】," + producer + ":暂时不能执行生产任务...");
try {
//仓库已满,暂不生产,生产阻塞
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//生产产品
list.add(new Object());
System.out.println( producer + ":=====》生产了一个产品\t,【仓库储量】为:" + list.size());
notifyAll();
}
/**
* 消费产品-同步方法
* @param consumer
*/
public synchronized void consume(String consumer){
while (list.size() == 0){
System.out.println("【仓库已空】, " + consumer + ":暂不消费...");
try {
//仓库已空,暂不消费,消费阻塞
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//消费产品
list.remove();
System.out.println(consumer + ":================》消费了一个产品\t,【仓库储量】为:" + list.size());
notifyAll();
}
/**
* 生产产品-同步块
* @param producer
*/
public void produce1(String producer){
synchronized (list){
while (list.size() == MAX_COUNT){
try {
System.out.println("【仓库已满】," + producer + ":暂时不能执行生产任务...");
//仓库已满,暂不生产,生产阻塞
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//生产产品
list.add(new Object());
System.out.println( producer + ":=====》生产了一个产品\t,【仓库储量】为:" + list.size());
list.notifyAll();
}
}
/**
* 消费产品-同步块
* @param consumer
*/
public void consume1(String consumer){
synchronized (list){
while (list.size() == 0){
try {
System.out.println("【仓库已空】, " + consumer + ":暂不消费...");
//仓库已空,暂不消费,消费阻塞
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//消费产品
list.remove();
System.out.println(consumer + ":================》消费了一个产品\t,【仓库储量】为:" + list.size());
list.notifyAll();
}
}
}
Producer.java生产者类:
package com.feizi.java.concurrency.model.one;
import java.util.concurrent.TimeUnit;
/**
* Created by feizi on 2018/5/28.
*/
public class Producer implements Runnable {
/*生产者名称*/
private String name;
/*仓库*/
private Storage storage;
public Producer(String name, Storage storage) {
this.name = name;
this.storage = storage;
}
@Override
public void run() {
while (true){
try {
storage.produce(name);
// storage.produce1(name);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Consumer.java消费者类:
package com.feizi.java.concurrency.model.one;
import java.util.concurrent.TimeUnit;
/**
* Created by feizi on 2018/5/28.
*/
public class Consumer implements Runnable {
/*消费者名称*/
private String name;
/*仓库*/
private Storage storage;
public Consumer(String name, Storage storage) {
this.name = name;
this.storage = storage;
}
@Override
public void run() {
while (true){
try {
TimeUnit.MILLISECONDS.sleep(2500);
storage.consume(name);
// storage.consume1(name);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
TestOne.java测试类:
package com.feizi.java.concurrency.model.one;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by feizi on 2018/5/28.
*/
public class TestOne {
public static void main(String[] args) {
Storage storage = new Storage();
/*for (int i = 0; i < 2; i++){
new Thread(new Producer("生产者【" + i + "】", storage)).start();
}
for (int i = 0; i < 5; i++){
new Thread(new Consumer("消费者【" + i + "】" , storage)).start();
}*/
ExecutorService threadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 2; i++){
threadPool.execute(new Producer("生产者【" + i + "】", storage));
}
for (int i = 0; i < 5; i++){
threadPool.execute(new Consumer("消费者【" + i + "】" , storage));
}
threadPool.shutdown();
}
}
控制台输出结果:
生产者【1】:=====》生产了一个产品 ,【仓库储量】为:1
生产者【0】:=====》生产了一个产品 ,【仓库储量】为:2
生产者【1】:=====》生产了一个产品 ,【仓库储量】为:3
生产者【0】:=====》生产了一个产品 ,【仓库储量】为:4
消费者【0】:================》消费了一个产品 ,【仓库储量】为:3
消费者【5】:================》消费了一个产品 ,【仓库储量】为:2
消费者【3】:================》消费了一个产品 ,【仓库储量】为:1
消费者【4】:================》消费了一个产品 ,【仓库储量】为:0
【仓库已空】, 消费者【1】:暂不消费...
【仓库已空】, 消费者【2】:暂不消费...
生产者【0】:=====》生产了一个产品 ,【仓库储量】为:1
消费者【2】:================》消费了一个产品 ,【仓库储量】为:0
【仓库已空】, 消费者【1】:暂不消费...
生产者【1】:=====》生产了一个产品 ,【仓库储量】为:1
消费者【1】:================》消费了一个产品 ,【仓库储量】为:0
2.await() / signal()方法
await()和signal()的功能基本上和wait()/notify()类似,完全可以取代它们,但是它们和新引入的锁定机制Lock直接挂钩,具有更强的灵活性,通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。
Storage.java仓库类:
package com.feizi.java.concurrency.model.two;
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* await()和signal()的功能基本上和wait()/notify()类似,完全可以取代它们,但是它们和新引入的锁定机制Lock直接挂钩,具有更强的灵活性,
* 通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全
* Created by feizi on 2018/5/28.
*/
public class Storage {
//仓库最大存储量
private static final int MAX_COUNT = 10;
//仓库存储的载体
private LinkedList<Object> list = new LinkedList<>();
//可重入锁
private final Lock lock = new ReentrantLock();
//仓库满的条件变量
private final Condition full = lock.newCondition();
//仓库空的条件变量
private final Condition empty = lock.newCondition();
/**
* 生产产品
* @param producer
*/
public void produce(String producer){
try {
//获得锁
lock.lock();
while (list.size() == MAX_COUNT){
try {
System.out.println("仓库已满,【" + producer + "】:暂时不能执行生产任务...");
//由于仓库已满,暂不生产,生产阻塞
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//生产产品
list.add(new Object());
System.out.println("===>【" + producer + "】:生产了一个产品,\t【仓库储量】为:" + list.size());
empty.signalAll();
} finally {
//释放锁
lock.unlock();
}
}
/**
* 消费产品
* @param consumer
*/
public void consume(String consumer){
try {
//获得锁
lock.lock();
while (list.size() == 0){
try {
System.out.println("仓库已空,【" + consumer + "】:暂时不能执行消费任务...");
//由于仓库已空,暂不消费,消费阻塞
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//消费产品
list.remove();
System.out.println("=============>【" + consumer + "】:消费了一个产品,\t【仓库储量】为:" + list.size());
full.signalAll();
}finally {
//释放锁
lock.unlock();
}
}
}
Producer.java生产者类:
package com.feizi.java.concurrency.model.two;
import java.util.concurrent.TimeUnit;
/**
* Created by feizi on 2018/5/28.
*/
public class Producer implements Runnable {
private String name;
private Storage storage;
public Producer(String name, Storage storage) {
this.name = name;
this.storage = storage;
}
@Override
public void run() {
while (true){
try {
TimeUnit.SECONDS.sleep(1);
storage.produce(name);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Consumer.java消费者类:
package com.feizi.java.concurrency.model.two;
import java.util.concurrent.TimeUnit;
/**
* Created by feizi on 2018/5/28.
*/
public class Consumer implements Runnable {
private String name;
private Storage storage;
public Consumer(String name, Storage storage) {
this.name = name;
this.storage = storage;
}
@Override
public void run() {
while (true){
try {
TimeUnit.SECONDS.sleep(2);
storage.consume(name);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
TestTwo.java测试类:
package com.feizi.java.concurrency.model.two;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by feizi on 2018/5/28.
*/
public class TestTwo {
public static void main(String[] args) {
Storage storage = new Storage();
ExecutorService threadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 2; i++){
threadPool.execute(new Producer("生产者" + i, storage));
}
for (int i = 0; i < 4; i++){
threadPool.execute(new Consumer("消费者" + i, storage));
}
threadPool.shutdown();
}
}
控制台输出结果:
===>【生产者3】:生产了一个产品, 【仓库储量】为:1
===>【生产者0】:生产了一个产品, 【仓库储量】为:2
===>【生产者4】:生产了一个产品, 【仓库储量】为:3
===>【生产者2】:生产了一个产品, 【仓库储量】为:4
===>【生产者1】:生产了一个产品, 【仓库储量】为:5
===>【生产者1】:生产了一个产品, 【仓库储量】为:6
=============>【消费者0】:消费了一个产品, 【仓库储量】为:5
===>【生产者3】:生产了一个产品, 【仓库储量】为:6
===>【生产者4】:生产了一个产品, 【仓库储量】为:7
===>【生产者2】:生产了一个产品, 【仓库储量】为:8
===>【生产者0】:生产了一个产品, 【仓库储量】为:9
=============>【消费者1】:消费了一个产品, 【仓库储量】为:8
===>【生产者1】:生产了一个产品, 【仓库储量】为:9
===>【生产者4】:生产了一个产品, 【仓库储量】为:10
仓库已满,【生产者0】:暂时不能执行生产任务...
仓库已满,【生产者3】:暂时不能执行生产任务...
3.BlockingQueue阻塞队列
BlockingQueue是一个已经在内部实现了同步的阻塞队列(由链表结构组成的有界阻塞队列),实现方式采用的是await()/signal()方法,可以在生成对象时指定容量大小,用于阻塞操作的是put()和take()方法。
put()方法:类似于前面的生产者线程,容量达到最大时,自动阻塞
take()方法:类似于前面的消费者线程,容量为0时,启动阻塞
Storage.java仓库类:
package com.feizi.java.concurrency.model.three;
import java.util.concurrent.LinkedBlockingQueue;
/**
* BlockingQueue是一个已经在内部实现了同步的阻塞队列,实现方式采用的是await()/signal()方法,
* 可以在生成对象时指定容量大小,用于阻塞操作的是put()和take()方法
* put()方法:类似于前面的生产者线程,容量达到最大时,自动阻塞
* take()方法:类似于前面的消费者线程,容量为0时,启动阻塞
* Created by feizi on 2018/5/28.
*/
public class Storage {
//仓库最大存储量
private static final int MAX_COUNT = 10;
//仓库存储的载体
private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<>(10);
/**
* 生产产品
* @param producer
*/
public void produce(String producer){
//如果仓库已满
if(list.size() == MAX_COUNT){
System.out.println("仓库已满,【" + producer + "】:暂时不能执行生产任务...");
}
try {
//生产产品
list.put(new Object());
System.out.println("===>【" + producer + "】:生产了一个产品,\t【仓库储量】为:" + list.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 消费产品
* @param consumer
*/
public void consume(String consumer){
//如果仓库已空
if (list.size() == 0){
System.out.println("仓库已空,【" + consumer + "】:暂时不能执行消费任务...");
}
try {
//消费产品
list.take();
System.out.println("================>【" + consumer + "】:消费了一个产品,\t【仓库储量】为:" + list.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Producer.java生产者类:
package com.feizi.java.concurrency.model.three;
import java.util.concurrent.TimeUnit;
/**
* Created by feizi on 2018/5/28.
*/
public class Producer implements Runnable {
private String name;
private Storage storage;
public Producer(String name, Storage storage) {
this.name = name;
this.storage = storage;
}
@Override
public void run() {
while (true){
try {
TimeUnit.SECONDS.sleep(1);
storage.produce(name);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Consumer.java消费者类:
package com.feizi.java.concurrency.model.three;
import java.util.concurrent.TimeUnit;
/**
* Created by feizi on 2018/5/28.
*/
public class Consumer implements Runnable {
private String name;
private Storage storage;
public Consumer(String name, Storage storage) {
this.name = name;
this.storage = storage;
}
@Override
public void run() {
while (true){
try {
TimeUnit.SECONDS.sleep(2);
storage.consume(name);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
TestThree.java测试类:
package com.feizi.java.concurrency.model.three;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by feizi on 2018/5/28.
*/
public class TestThree {
public static void main(String[] args) {
Storage storage = new Storage();
ExecutorService threadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 3; i++){
threadPool.execute(new Producer("生产者" + i, storage));
}
for (int j = 0; j < 5; j++){
threadPool.execute(new Consumer("消费者" + j, storage));
}
}
}
控制台输出结果:
===>【生产者2】:生产了一个产品, 【仓库储量】为:1
===>【生产者1】:生产了一个产品, 【仓库储量】为:2
===>【生产者0】:生产了一个产品, 【仓库储量】为:3
===>【生产者1】:生产了一个产品, 【仓库储量】为:4
===>【生产者2】:生产了一个产品, 【仓库储量】为:5
===>【生产者0】:生产了一个产品, 【仓库储量】为:6
================>【消费者4】:消费了一个产品, 【仓库储量】为:5
================>【消费者0】:消费了一个产品, 【仓库储量】为:4
================>【消费者1】:消费了一个产品, 【仓库储量】为:3
================>【消费者2】:消费了一个产品, 【仓库储量】为:2
================>【消费者3】:消费了一个产品, 【仓库储量】为:1
===>【生产者1】:生产了一个产品, 【仓库储量】为:2
===>【生产者0】:生产了一个产品, 【仓库储量】为:3
===>【生产者2】:生产了一个产品, 【仓库储量】为:4
===>【生产者2】:生产了一个产品, 【仓库储量】为:5
===>【生产者1】:生产了一个产品, 【仓库储量】为:6
===>【生产者0】:生产了一个产品, 【仓库储量】为:7
================>【消费者4】:消费了一个产品, 【仓库储量】为:6
================>【消费者0】:消费了一个产品, 【仓库储量】为:5
原文参考
感谢以下作者的分享,受益匪浅~~