1010.C++并发编程-生产者消费者模式
C/C++ 2023年4月13日
1. 单生产者-单消费者模型
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
static const int repository_size = 10;
static const int item_total = 20;
std::mutex mtx;
std::condition_variable repo_not_full;
std::condition_variable repo_not_empty;
int item_buffer[repository_size];
static std::size_t read_position = 0;
static std::size_t write_position = 0;
std::chrono::seconds t(1);
void produce_item(int i)
{
std::unique_lock<std::mutex> lck(mtx);
while(((write_position+1)%repository_size) == read_position)
{
std::cout << "Producer is waiting for an empty slot..." << std::endl;
repo_not_full.wait(lck);
}
item_buffer[write_position] = i;
write_position++;
if(write_position == repository_size)
{
write_position = 0;
}
repo_not_empty.notify_all();
lck.unlock();
}
int consume_item()
{
int data;
std::unique_lock<std::mutex> lck(mtx);
while(write_position == read_position)
{
std::cout << "Consume is waiting for items..." << std::endl;
repo_not_empty.wait(lck);
}
data = item_buffer[read_position];
read_position++;
if(read_position >= repository_size)
{
read_position = 0;
}
repo_not_full.notify_all();
lck.unlock();
return data;
}
void Producer_thread()
{
for(int i=1; i<=item_total; i++)
{
std::cout << "Producer product #" << i << std::endl;
produce_item(i);
}
}
void Consumer_thread()
{
static int cnt = 0;
while(1)
{
int item = consume_item();
std::cout << "Consumer consume #" << item << std::endl;
if(++cnt == item_total)
{
break;
}
}
}
int main()
{
std::thread producer(Producer_thread);
std::thread consumer(Consumer_thread);
producer.join();
consumer.join();
}
2. 单生产者-多消费者模型
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
static const int repository_size = 10;
static const int item_total = 20;
std::mutex mtx;
std::mutex mtx_counter;
std::condition_variable repo_not_full;
std::condition_variable repo_not_empty;
int item_buffer[repository_size];
static std::size_t read_position = 0;
static std::size_t write_position = 0;
static std::size_t item_counter = 0;
std::chrono::seconds t(1);
void produce_item(int i)
{
std::unique_lock<std::mutex> lck(mtx);
while(((write_position+1)%repository_size) == read_position)
{
std::cout << "Producer is waiting for an empty slot..." << std::endl;
repo_not_full.wait(lck);
}
item_buffer[write_position] = i;
write_position++;
if(write_position == repository_size)
{
write_position = 0;
}
repo_not_empty.notify_all();
lck.unlock();
}
int consume_item()
{
int data;
std::unique_lock<std::mutex> lck(mtx);
while(write_position == read_position)
{
std::cout << "Consumer is waiting for items..." << std::endl;
repo_not_empty.wait(lck);
}
data = item_buffer[read_position];
read_position++;
if(read_position >= repository_size)
{
read_position = 0;
}
repo_not_full.notify_all();
lck.unlock();
return data;
}
void Producer_thread()
{
for(int i=1; i<=item_total; i++)
{
std::cout << "Producer product #" << i << std::endl;
produce_item(i);
}
}
void Consumer_thread()
{
bool read_to_exit = false;
while(1)
{
std::this_thread::sleep_for(t);
std::unique_lock<std::mutex> lck(mtx_counter);
if(item_counter < item_total)
{
int item = consume_item();
item_counter++;
std::cout << "Consumer #" << std::this_thread::get_id() << " consume #" << item << std::endl;
}
else
{
read_to_exit = true;
}
if(read_to_exit == true)
break;
}
std::cout << "Consumer thread " << std::this_thread::get_id() << " is exiting..." << std::endl;
}
int main()
{
std::thread producer(Producer_thread);
std::vector<std::thread> thread_vector;
for(int i=0; i<5; i++)
{
thread_vector.push_back(std::thread(Consumer_thread));
}
producer.join();
for(auto &thr: thread_vector)
{
thr.join();
}
}
3. 多生产者-单消费者模型
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
static const int repository_size = 10;
static const int item_total = 20;
std::mutex mtx;
std::mutex mtx_counter;
std::condition_variable repo_not_full;
std::condition_variable repo_not_empty;
int item_buffer[repository_size];
static std::size_t read_position = 0;
static std::size_t write_position = 0;
static std::size_t item_counter = 0;
std::chrono::seconds t(1);
void produce_item(int i)
{
std::unique_lock<std::mutex> lck(mtx);
while(((write_position+1)%repository_size) == read_position)
{
std::cout << "Producer is waiting for an empty slot..." << std::endl;
repo_not_full.wait(lck);
}
item_buffer[write_position] = i;
write_position++;
if(write_position == repository_size)
{
write_position = 0;
}
repo_not_empty.notify_all();
lck.unlock();
}
int consume_item()
{
int data;
std::unique_lock<std::mutex> lck(mtx);
while(write_position == read_position)
{
std::cout << "Consumer is waiting for items..." << std::endl;
repo_not_empty.wait(lck);
}
data = item_buffer[read_position];
read_position++;
if(read_position >= repository_size)
{
read_position = 0;
}
repo_not_full.notify_all();
lck.unlock();
return data;
}
void Producer_thread()
{
bool read_to_exit = false;
while(1)
{
std::unique_lock<std::mutex> lck(mtx_counter);
if(item_counter < item_total)
{
item_counter++;
produce_item(item_counter);
std::cout << "Producer #" << std::this_thread::get_id() << " produce #" << item_counter << std::endl;
}
else
{
read_to_exit = true;
}
if(read_to_exit == true)
break;
}
std::cout << "Producer thread #" << std::this_thread::get_id() << " is exiting..." << std::endl;
}
void Consumer_thread()
{
static int cnt = 0;
while(1)
{
std::this_thread::sleep_for(t);
int item = consume_item();
std::cout << "Consumer consume #" << item << std::endl;
if(++cnt == item_total)
break;
}
}
int main()
{
std::vector<std::thread> thread_vector;
for(int i=0; i<5; i++)
{
thread_vector.push_back(std::thread(Producer_thread));
}
std::thread consumer(Consumer_thread);
for(auto &thr: thread_vector)
{
thr.join();
}
consumer.join();
}
4. 多生产者-多消费者模型
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
#include <chrono>
static const int repository_size = 10;
static const int item_total = 20;
std::mutex mtx;
std::mutex producer_count_mtx;
std::mutex consumer_count_mtx;
std::condition_variable repo_not_full;
std::condition_variable repo_not_empty;
int item_buffer[repository_size];
static std::size_t read_position = 0;
static std::size_t write_position = 0;
static size_t produced_item_counter = 0;
static size_t consumed_item_counter = 0;
std::chrono::seconds t(1);
std::chrono::microseconds t1(1000);
void produce_item(int i)
{
std:: unique_lock<std::mutex> lck(mtx);
while(((write_position+1)%repository_size) == read_position)
{
std::cout << "Producer is waiting for an empty slot..." << std::endl;
repo_not_full.wait(lck);
}
item_buffer[write_position] = i;
write_position++;
if(write_position == repository_size)
{
write_position = 0;
}
repo_not_empty.notify_all();
lck.unlock();
}
int consume_item()
{
int data;
std::unique_lock<std::mutex> lck(mtx);
while(write_position == read_position)
{
std::cout << "Consumer is waiting for items..." << std::endl;
repo_not_empty.wait(lck);
}
data = item_buffer[read_position];
read_position++;
if(read_position >= repository_size)
{
read_position = 0;
}
repo_not_full.notify_all();
lck.unlock();
return data;
}
void Producer_thread()
{
bool ready_to_exit = false;
while(1)
{
std::unique_lock<std::mutex> lock(producer_count_mtx);
if(produced_item_counter < item_total)
{
produced_item_counter++;
produce_item(produced_item_counter);
std::cout << "Producer #" << std::this_thread::get_id() << " produce #" << produced_item_counter << std::endl;
}
else
{
ready_to_exit = true;
}
lock.unlock();
if(ready_to_exit == true)
{
break;
}
}
std::cout << "Producer thread " << std::this_thread::get_id() << " is exiting..." << std::endl;
}
void Consumer_thread()
{
bool read_to_exit = false;
while(1)
{
std::this_thread::sleep_for(t1);
std::unique_lock<std::mutex> lck(consumer_count_mtx);
if(consumed_item_counter < item_total)
{
int item = consume_item();
consumed_item_counter++;
std::cout << "Consumer #" << std::this_thread::get_id() << " consume #" << item << std::endl;
}
else
{
read_to_exit = true;
}
if(read_to_exit == true)
{
break;
}
}
std::cout << "Consumer thread " << std::this_thread::get_id() << " is exiting..." << std::endl;
}
int main()
{
std::vector<std::thread> thread_vector1;
std::vector<std::thread> thread_vector2;
for(int i=0; i<5; i++)
{
thread_vector1.push_back(std::thread(Producer_thread));
thread_vector2.push_back(std::thread(Consumer_thread));
}
for(auto &thr1: thread_vector1)
{
thr1.join();
}
for(auto &thr2: thread_vector2)
{
thr2.join();
}
}