GreensnoWorld
记录点滴,分享乐趣,一块凝固的时间
1010.C++并发编程-生产者消费者模式
C/C++  2023年4月13日

1. 单生产者-单消费者模型

#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>

static const int repository_size = 10;
static const int item_total = 20;

std::mutex mtx;

std::condition_variable repo_not_full;
std::condition_variable repo_not_empty;

int item_buffer[repository_size];

static std::size_t read_position = 0;
static std::size_t write_position = 0;

std::chrono::seconds t(1);

void produce_item(int i)
{
	std::unique_lock<std::mutex> lck(mtx);
	while(((write_position+1)%repository_size) == read_position)
	{
		std::cout << "Producer is waiting for an empty slot..." << std::endl;
		repo_not_full.wait(lck);
	}

	item_buffer[write_position] = i;
	write_position++;

	if(write_position == repository_size)
	{
		write_position = 0;
	}

	repo_not_empty.notify_all();
	lck.unlock();
}

int consume_item()
{
	int data;
	std::unique_lock<std::mutex> lck(mtx);
	while(write_position == read_position)
	{
		std::cout << "Consume is waiting for items..." << std::endl;
		repo_not_empty.wait(lck);
	}

	data = item_buffer[read_position];
	read_position++;

	if(read_position >= repository_size)
	{
		read_position = 0;
	}

	repo_not_full.notify_all();
	lck.unlock();

	return data;
}

void Producer_thread()
{
	for(int i=1; i<=item_total; i++)
	{
		std::cout << "Producer product #" << i << std::endl;
		produce_item(i);
	}
}

void Consumer_thread()
{
	static int cnt = 0;
	while(1)
	{
		int item = consume_item();
		std::cout << "Consumer consume #" << item << std::endl;
		if(++cnt == item_total)
		{
			break;
		}
	}
}

int main()
{
	std::thread producer(Producer_thread);
	std::thread consumer(Consumer_thread);
	producer.join();
	consumer.join();
}

2. 单生产者-多消费者模型

#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>

static const int repository_size = 10;
static const int item_total = 20;

std::mutex mtx;
std::mutex mtx_counter;

std::condition_variable repo_not_full;
std::condition_variable repo_not_empty;

int item_buffer[repository_size];

static std::size_t read_position = 0;
static std::size_t write_position = 0;
static std::size_t item_counter = 0;

std::chrono::seconds t(1);

void produce_item(int i)
{
	std::unique_lock<std::mutex> lck(mtx);
	while(((write_position+1)%repository_size) == read_position)
	{
		std::cout << "Producer is waiting for an empty slot..." << std::endl;
		repo_not_full.wait(lck);
	}
	item_buffer[write_position] = i;
	write_position++;

	if(write_position == repository_size)
	{
		write_position = 0;
	}
	repo_not_empty.notify_all();
	lck.unlock();
}

int consume_item()
{
	int data;
	std::unique_lock<std::mutex> lck(mtx);
	while(write_position == read_position)
	{
		std::cout << "Consumer is waiting for items..." << std::endl;
		repo_not_empty.wait(lck);
	}
	data = item_buffer[read_position];
	read_position++;

	if(read_position >= repository_size)
	{
		read_position = 0;
	}

	repo_not_full.notify_all();
	lck.unlock();

	return data;
}

void Producer_thread()
{
	for(int i=1; i<=item_total; i++)
	{
		std::cout << "Producer product #" << i << std::endl;
		produce_item(i);
	}
}

void Consumer_thread()
{
	bool read_to_exit = false;
	while(1)
	{
		std::this_thread::sleep_for(t);
		std::unique_lock<std::mutex> lck(mtx_counter);
		if(item_counter < item_total)
		{
			int item = consume_item();
			item_counter++;
			std::cout << "Consumer #" << std::this_thread::get_id() << " consume #" << item << std::endl;
		}
		else
		{
			read_to_exit = true;
		}
		if(read_to_exit == true)
			break;
	}
	std::cout << "Consumer thread " << std::this_thread::get_id() << " is exiting..." << std::endl;
}

int main()
{
	std::thread producer(Producer_thread);
	std::vector<std::thread> thread_vector;
	for(int i=0; i<5; i++)
	{
		thread_vector.push_back(std::thread(Consumer_thread));
	}
	producer.join();
	for(auto &thr: thread_vector)
	{
		thr.join();
	}
}

3. 多生产者-单消费者模型

#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>

static const int repository_size = 10;
static const int item_total = 20;

std::mutex mtx;
std::mutex mtx_counter;

std::condition_variable repo_not_full;
std::condition_variable repo_not_empty;

int item_buffer[repository_size];

static std::size_t read_position = 0;
static std::size_t write_position = 0;

static std::size_t item_counter = 0;

std::chrono::seconds t(1);

void produce_item(int i)
{
	std::unique_lock<std::mutex> lck(mtx);
	while(((write_position+1)%repository_size) == read_position)
	{
		std::cout << "Producer is waiting for an empty slot..." << std::endl;
		repo_not_full.wait(lck);
	}
	item_buffer[write_position] = i;
	write_position++;

	if(write_position == repository_size)
	{
		write_position = 0;
	}

	repo_not_empty.notify_all();
	lck.unlock();
}

int consume_item()
{
	int data;
	std::unique_lock<std::mutex> lck(mtx);

	while(write_position == read_position)
	{
		std::cout << "Consumer is waiting for items..." << std::endl;
		repo_not_empty.wait(lck);
	}

	data = item_buffer[read_position];
	read_position++;

	if(read_position >= repository_size)
	{
		read_position = 0;
	}

	repo_not_full.notify_all();
	lck.unlock();

	return data;
}

void Producer_thread()
{
	bool read_to_exit = false;
	while(1)
	{
		std::unique_lock<std::mutex> lck(mtx_counter);
		if(item_counter < item_total)
		{
			item_counter++;
			produce_item(item_counter);
			std::cout << "Producer #" << std::this_thread::get_id() << " produce #" << item_counter << std::endl;
		}
		else
		{
			read_to_exit = true;
		}
		if(read_to_exit == true)
			break;
	}
	std::cout << "Producer thread #" << std::this_thread::get_id() << " is exiting..." << std::endl;
}

void Consumer_thread()
{
	static int cnt = 0;
	while(1)
	{
		std::this_thread::sleep_for(t);
		int item = consume_item();
		std::cout << "Consumer consume #" << item << std::endl;

		if(++cnt == item_total)
			break;
	}
}

int main()
{
	std::vector<std::thread> thread_vector;
	for(int i=0; i<5; i++)
	{
		thread_vector.push_back(std::thread(Producer_thread));
	}
	std::thread consumer(Consumer_thread);

	for(auto &thr: thread_vector)
	{
		thr.join();
	}
	consumer.join();
}

4. 多生产者-多消费者模型

#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
#include <chrono>

static const int repository_size = 10;
static const int item_total = 20;

std::mutex mtx;
std::mutex producer_count_mtx;
std::mutex consumer_count_mtx;

std::condition_variable repo_not_full;
std::condition_variable repo_not_empty;

int item_buffer[repository_size];

static std::size_t read_position = 0;
static std::size_t write_position = 0;

static size_t produced_item_counter = 0;
static size_t consumed_item_counter = 0;

std::chrono::seconds t(1);
std::chrono::microseconds t1(1000);

void produce_item(int i)
{
	std:: unique_lock<std::mutex> lck(mtx);
	while(((write_position+1)%repository_size) == read_position)
	{
		std::cout << "Producer is waiting for an empty slot..." << std::endl;
		repo_not_full.wait(lck);
	}
	item_buffer[write_position] = i;
	write_position++;

	if(write_position == repository_size)
	{
		write_position = 0;
	}
	repo_not_empty.notify_all();
	lck.unlock();
}

int consume_item()
{
	int data;
	std::unique_lock<std::mutex> lck(mtx);
	while(write_position == read_position)
	{
		std::cout << "Consumer is waiting for items..." << std::endl;
		repo_not_empty.wait(lck);
	}

	data = item_buffer[read_position];
	read_position++;

	if(read_position >= repository_size)
	{
		read_position = 0;
	}

	repo_not_full.notify_all();
	lck.unlock();

	return data;
}

void Producer_thread()
{
	bool ready_to_exit = false;
	while(1)
	{
		std::unique_lock<std::mutex> lock(producer_count_mtx);
		if(produced_item_counter < item_total)
		{
			produced_item_counter++;
			produce_item(produced_item_counter);
			std::cout << "Producer #" << std::this_thread::get_id() << " produce #" << produced_item_counter << std::endl;
		}
		else
		{
			ready_to_exit = true;
		}
		lock.unlock();
		if(ready_to_exit == true)
		{
			break;
		}
	}
	std::cout << "Producer thread " << std::this_thread::get_id() << " is exiting..." << std::endl;
}

void Consumer_thread()
{
	bool read_to_exit = false;
	while(1)
	{
		std::this_thread::sleep_for(t1);
		std::unique_lock<std::mutex> lck(consumer_count_mtx);
		if(consumed_item_counter < item_total)
		{
			int item = consume_item();
			consumed_item_counter++;
			std::cout << "Consumer #" << std::this_thread::get_id() << " consume #" << item << std::endl;
		}
		else
		{
			read_to_exit = true;
		}

		if(read_to_exit == true)
		{
			break;
		}
	}
	std::cout << "Consumer thread " << std::this_thread::get_id() << " is exiting..." << std::endl;
}

int main()
{
	std::vector<std::thread> thread_vector1;
	std::vector<std::thread> thread_vector2;
	for(int i=0; i<5; i++)
	{
		thread_vector1.push_back(std::thread(Producer_thread));
		thread_vector2.push_back(std::thread(Consumer_thread));
	}

	for(auto &thr1: thread_vector1)
	{
		thr1.join();
	}
	for(auto &thr2: thread_vector2)
	{
		thr2.join();
	}
}

 

LIJG
余本顽劣,生于紫云下,长于汝水滨。早年求学,兴趣广泛,好高骛远,学无所成,仓皇入世。兴趣所致,投身互联网,求知未证,而立已至,始悟光阴荏苒,终需务实钻研。故有此站,记录时光,积累点滴,验证所学,分享愚见。指舞方寸间,心系万千年。
留言