condition_variable
条件变量可以用来实现线程同步,它必须与互斥量mutex
配合使用。
条件变量适用场景:一个线程先对某一条件进行判断, 如果条件不满足则进入等待, 条件满足的时候, 该线程被通知条件满足, 继续执行任务
在wait()之前,必须先lock相关联的mutex, 因为假如目标条件未满足,wait()实际上会unlock该mutex, 然后block,在目标条件满足后再重新lock该mutex, 然后返回
线程同步的方式:临界区,互斥量,信号量,事件
使用条件变量实现生产者消费者的简单例子如下:
#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <unistd.h>
#include <condition_variable>
using namespace std;
mutex mtx;
condition_variable produce, consume; // 条件变量是一种同步机制,要和mutex以及lock一起使用
queue<int> q; // shared value by producers and consumers, which is the critical section
int maxSize = 20;
void consumer()
{
while (true)
{
//this_thread::sleep_for(chrono::milliseconds(1000));
sleep(1);//包含在unistd.h头文件中,Sleep包含在windows.h中
unique_lock<mutex> lck(mtx);
while(q.size()==0)
{
consume.wait(lck); //condition_variable.wait()锁至满足while条件不满足
}
//consume.wait(lck, [] {return q.size() != 0; }); // wait(block) consumer until q.size() != 0 is true
cout << "consumer " << this_thread::get_id() << ": ";
q.pop();
cout << q.size() << '\n';
produce.notify_all(); // nodity(wake up) producer when q.size() != maxSize is true
lck.unlock();
}
}
void producer(int id)
{
while (true)
{
//this_thread::sleep_for(chrono::milliseconds(900)); // producer is a little faster than consumer
sleep(1);//
unique_lock<mutex> lck(mtx);
while(q.size() == maxSize)
{
produce.wait(lck);
}
// produce.wait(lck, [] {return q.size() != maxSize; }); // wait(block) producer until q.size() != maxSize is true
cout << "-> producer " << this_thread::get_id() << ": ";
q.push(id);
cout << q.size() << '\n';
consume.notify_all(); // notify(wake up) consumer when q.size() != 0 is true
lck.unlock();
}
}
int main()
{
thread consumers[2], producers[2];
// spawn 2 consumers and 2 producers:
for (int i = 0; i < 2; ++i)
{
consumers[i] = thread(consumer);
producers[i] = thread(producer, i + 1); //thread:第一个参数是task任务,第二个参数是task函数的参数
}
// join them back: (in this program, never join...)
for (int i = 0; i < 2; ++i)
{
producers[i].join();
consumers[i].join();
}
system("pause");
return 0;
}
下面实现了维护了缓冲区的结构体,并每次返回相应的位置,可以循环写入的生产者消费者模型:
#include <unistd.h>
#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
static const int kItemRepositorySize = 10; // Item buffer size.
static const int kItemsToProduce = 1000; // How many items we plan to produce.
struct ItemRepository {
int item_buffer[kItemRepositorySize]; // 产品缓冲区, 配合 read_position 和 write_position 模型环形队列.
size_t read_position; // 消费者读取产品位置.
size_t write_position; // 生产者写入产品位置.
std::mutex mtx; // 互斥量,保护产品缓冲区
std::condition_variable repo_not_full; // 条件变量, 指示产品缓冲区不为满.
std::condition_variable repo_not_empty; // 条件变量, 指示产品缓冲区不为空.
} gItemRepository; // 产品库全局变量, 生产者和消费者操作该变量.
typedef struct ItemRepository ItemRepository;
void ProduceItem(ItemRepository *ir, int item)
{
std::unique_lock<std::mutex> lock(ir->mtx);
while(((ir->write_position + 1) % kItemRepositorySize)
== ir->read_position) { // item buffer is full, just wait here.
std::cout << "Producer is waiting for an empty slot...\n";
(ir->repo_not_full).wait(lock); // 生产者等待"产品库缓冲区不为满"这一条件发生.
}
(ir->item_buffer)[ir->write_position] = item; // 写入产品.
(ir->write_position)++; // 写入位置后移.
if (ir->write_position == kItemRepositorySize) // 写入位置若是在队列最后则重新设置为初始位置.
ir->write_position = 0;
(ir->repo_not_empty).notify_all(); // 通知消费者产品库不为空.
lock.unlock(); // 解锁.
}
int ConsumeItem(ItemRepository *ir)
{
int data;
std::unique_lock<std::mutex> lock(ir->mtx);
// item buffer is empty, just wait here.
while(ir->write_position == ir->read_position) {
std::cout << "Consumer is waiting for items...\n";
(ir->repo_not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生.
}
data = (ir->item_buffer)[ir->read_position]; // 读取某一产品
(ir->read_position)++; // 读取位置后移
if (ir->read_position >= kItemRepositorySize) // 读取位置若移到最后,则重新置位.
ir->read_position = 0;
(ir->repo_not_full).notify_all(); // 通知消费者产品库不为满.
lock.unlock(); // 解锁.
return data; // 返回产品.
}
void ProducerTask() // 生产者任务
{
for (int i = 1; i <= kItemsToProduce; ++i) {
// sleep(1);
std::cout << "Produce the " << i << "^th item..." << std::endl;
ProduceItem(&gItemRepository, i); // 循环生产 kItemsToProduce 个产品.
}
}
void ConsumerTask() // 消费者任务
{
static int cnt = 0;
while(1) {
sleep(1);
int item = ConsumeItem(&gItemRepository); // 消费一个产品.
std::cout << "Consume the " << item << "^th item" << std::endl;
if (++cnt == kItemsToProduce) break; // 如果产品消费个数为 kItemsToProduce, 则退出.
}
}
void InitItemRepository(ItemRepository *ir)
{
ir->write_position = 0; // 初始化产品写入位置.
ir->read_position = 0; // 初始化产品读取位置.
}
int main()
{
InitItemRepository(&gItemRepository);
std::thread producer(ProducerTask); // 创建生产者线程.
std::thread consumer(ConsumerTask); // 创建消费之线程.
producer.join();
consumer.join();
return 0;
}
知识点
- condition_variable条件变量线程同步与mutex互斥变量配合使用
每个线程的同步互斥控制流程如下:
A. 进入后加互斥锁
unique_lock<mutex> lck(mtx);
B.判断此时是否能进行读写,能则立刻进行生产或消费,如不能则等待且释放互斥锁,等到能够生产消费时,再加锁进行生产消费操作。操作结束后通知生产者或者消费者,然后进入D。
while(q.size() == maxSize) {produce.wait(lck);} task();consume.notify_all();
D.释放互斥锁
lck.unlock()
- c++17中新加入了scope_lock:可以设置除了该模块之后自动解锁,不必设置unlock。
- C++11中加入了新的atomic原子性,可以用来进行互斥操作。