互斥:进程(线程)在访问一些需要分时独占的资源或数据时,会引发进程(线程)之间的互斥关系。
同步:是指一个进程(线程)需要等待另外一个进程(线程)运行结束才可以运行,这种进程(线程)之间的运行次序关系我们称之为同步。
一.什么是生产者消费者问题?
生产者消费者问题又称有限缓冲区问题,问题描述如下:假设有一组生产者进程(线程)P1,P2,……Pk和一组消费者进程(线程)C1,C2……Cm,通过n个缓冲区组成缓冲池,共同完成"生产和消费"任务。
1)生产者将生产出的消息放入缓冲区,消费者从缓冲区中取出消息进行消费。当所有缓冲区均满时,生产者进程(线程)必须等待消费者进程(线程)消费消息以提供空缓冲区;
2)当所有缓冲区均为空时,消费者进程(线程)必须等待生产者进程(线程)生产消息以提供有消息的缓冲区;
3)对所有生产者和消费者而言,需将缓冲池看作一个整体,缓冲池是一个临界资源,任何一个进程(线程)要对缓冲池进行"存"或"取"的时候,需要与其他进程(线程)进行互斥。
二.如何实现?
1.使用非线程安全的队列作为有限缓冲区 + synchronized实现线程之间的互斥。
public class ProducerConsumerDemo{
static class MyBlockingQueue{
private Queue queue;
private final int max = 16; // 有限缓冲区
public MyBlockingQueue() {
queue = new LinkedList();
}
// 存操作互斥
public synchronized void put(Object o) throws InterruptedException {
while(queue.size() == max){
wait();
}
queue.add(o);
notifyAll();
}
// 取操作互斥
public synchronized Object take() throws InterruptedException {
while(queue.size() == 0) {
wait();
}
Object o = queue.remove();
notifyAll();
return o;
}
}
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue();
Runnable producer = () -> {
while(true) {
try{
queue.put(new Object());
} catch(Exception e) {
e.printStackTrace();
}
}
};
// 生产
new Thread(producer).start();
new Thread(producer).start();
Runnable consumer = () -> {
while(true) {
try{
queue.take();
} catch(Exception e) {
e.printStackTrace();
}
}
};
// 消费
new Thread(consumer).start();
new Thread(consumer).start();
}
}
因为生产者和消费者公用一个队列缓冲区,锁加到队列上面,可能造成生产者唤醒生产者,消费者唤醒消费者问题。
2.使用非线程安全的队列作为有限缓冲区 + ReentrantLock实现线程之间的互斥。
public class ProducerConsumerDemo {
static class MyBlockingQueue {
private Queue queue;
private final int max = 16; // 有限缓冲区
ReentrantLock lock = new ReentrantLock(); // 互斥锁
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public MyBlockingQueue() {
queue = new LinkedList();
}
public void put(Object o) throws InterruptedException {
lock.lockInterruptibly(); // 存互斥
try{
while(queue.size() == max) {
notFull.await();
}
queue.add(o);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lockInterruptibly(); // 取互斥
try{
while(queue.size() == 0) {
notEmpty.await();
}
Object o = queue.remove();
notFull.signal();
return o;
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue();
// 生产者
Runnable producer = () -> {
try{
while(true) {
queue.put(new Object());
}
} catch(Exception e) {
e.printStackTrace();
}
};
new Thread(producer).start();
new Thread(producer).start();
// 消费者
Runnable consumer = () -> {
try{
while(true) {
Object o = queue.take();
}
} catch(Exception e) {
e.printStackTrace();
}
};
new Thread(consumer).start();
new Thread(consumer).start();
}
}
解决了生产者唤醒生产者、消费者唤醒消费者问题,但是需要自己加解锁。
3.使用线程安全的队列作为有限缓冲区。队列线程安,全内部已经实现线程互斥。
public class ProducerConsumerDemo {
public static void main(String[] args) {
BlockingQueue<Object> queue = new ArrayBlockingQueue<>(16); // 有限缓冲区
// 生产者
Runnable producer = () -> {
try{
while(true) {
queue.put(new Object());
}
} catch(InterruptedException e) {
e.printStackTrace();
}
};
new Thread(producer).start();
new Thread(producer).start();
// 消费者
Runnable consumer = () -> {
try{
while(true) {
Object o = queue.take();
}
} catch(InterruptedException e) {
e.printStackTrace();
}
};
new Thread(consumer).start();
new Thread(consumer).start();
}
}