GreensnoWorld
记录点滴,分享乐趣,一块凝固的时间
1011.C++并发编程-线程池
C/C++  2023年4月13日
#include <iostream>
#include <thread>
#include <atomic>
#include <mutex>
#include <future>
#include <vector>

#include <queue>
#include <condition_variable>
#include <memory>

template<typename T>
class threadsafe_queue
{
private:
	mutable std::mutex mut;
	std::queue<T> data_queue;
	std::condition_variable data_cond;
public:
	threadsafe_queue(){}
	void push(T new_value)
	{
		std::lock_guard<std::mutex> lk(mut);
		data_queue.push(std::move(new_value));
		data_cond.notify_one();
	}

	void wait_and_pop(T& value)
	{
		std::unique_lock<std::mutex> lk(mut);
		data_cond.wait(lk, [this]{return !data_queue.empty();});
		value = std::move(data_queue.front());
		data_queue.pop();
	}

	std::shared_ptr<T> wait_and_pop()
	{
		std::unique_lock<std::mutex> lk(mut);
		data_cond.wait(lk, [this]{return !data_queue.empty(); });
		std::shared_ptr<T> res(std::make_shared<T> (std::move(data_queue.front())));
		data_queue.pop();
		return res;
	}

	bool try_pop(T& value)
	{
		std::lock_guard<std::mutex> lk(mut);
		if(data_queue.empty())
			return false;
		value = std::move(data_queue.front());
		data_queue.pop();
	}

	std::shared_ptr<T> try_pop()
	{
		std::lock_guard<std::mutex> lk(mut);
		if(data_queue.empty())
			return std::shared_ptr<T>();
		std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
		data_queue.pop();
		return res;
	}

	bool empty() const
	{
		std::lock_guard<std::mutex> lk(mut);
		return data_queue.empty();
	}
};

class join_threads
{
	std::vector<std::thread>& threads;
public:

	explicit join_threads(std::vector<std::thread>& threads_): threads(threads_){}
	~join_threads()
	{
		for(unsigned long i=0; i<threads.size(); i++)
		{
			if(threads[i].joinable())
				threads[i].join();
		}
	}
};

class thread_pool
{
	std::atomic<bool> done;
	threadsafe_queue<std::function<void()>> work_queue;
	std::vector<std::thread> threads;
	join_threads joiner;

	void worker_thread()
	{
		while(!done)
		{
			std::function<void()> task;
			if(work_queue.try_pop(task))
			{
				task();
			}
			else
			{
				std::this_thread::yield();
			}
		}
	}
public:
	thread_pool(): done(false), joiner(threads)
	{
		unsigned const thread_count = std::thread::hardware_concurrency();
		try{
			for(unsigned i=0; i<thread_count; i++)
			{
				threads.push_back(std::thread(&thread_pool::worker_thread, this));
			}
		}catch(...){
			done = true;
			throw;
		}
	}
	~thread_pool()
	{
		done = true;
	}

	template<typename FunctionType>
	void submit(FunctionType f)
	{
		work_queue.push(std::function<void()>(f));
	}
};

 

LIJG
余本顽劣,生于紫云下,长于汝水滨。早年求学,兴趣广泛,好高骛远,学无所成,仓皇入世。兴趣所致,投身互联网,求知未证,而立已至,始悟光阴荏苒,终需务实钻研。故有此站,记录时光,积累点滴,验证所学,分享愚见。指舞方寸间,心系万千年。
留言