Prerequistic Knowledge
I highly recommend to have some knowledge of c++, threads and programming in general, otherwise it would be tedious for you to research and understand all of this.
Thread Pools
What are Thread Pools?
They are a group of threads that are initially created, while waiting for a job or task to execute.
The idea is to have threads that are always executing without being destroyed, hence avoiding the hassle to create or destroy threads when necessary.
To do this in C++ we'll have to look at some of the features provided by the language. I am providing a small explaination of what the concepts do below, but i recommend you to have a look at them by yourself through the documentation.
std::thread, std::mutex and std::condition_variable
Threads allow multiple functions to execute concurrently.
std::thread
represents a single thread of execution.If you have ever used threads in your program, you must've heard of thread safety and mutexes. The
std::mutex
is a synchronization primitive that can be used to protect shared data from being simutaneously accessed by multiple threads.-
The purpose of this class is to wait for some condition to become
true
. Thestd::condition_variable
is something used with mutexes - To block one or more threads, until another thread does two things- Modify a shared variable
- Notifies the condition_variable.
std::forward, perfect forwarding and std::bind
std::forward
is used to implement perfect forwarding. Perfect forwarding is the process of forwarding arguments in such a way that contains it's original value type - whether it's an rvalue or an lvalue.std::bind
class is used for partial function application, meaning if the arguments are pre-specified, it generates a forwarding call wrapper for the function say 'f'. Calling this wrapper is equivalent to calling the original function 'f' with some arguments already bound to list of arguments to bind.
std::future and std::packaged_task
-
std::future
provides a method to access the results of asynchronous operations. They are associated with a shared state and can be constructed by usingstd::packaged_task
. -
std::packaged_task
wraps any callable target so it can be called asynchronously.
Program Logic
We will be creating a packaged_task
in our main thread, which uses get_future
to obtain std::future. As they use shared resource, we can move the packaged_task
into the other threads and start the job or task as it wraps a callable function.
Using the condition_variable
we can put the thread to no task running mode or wake them up if a task piles up.
Before we get into coding, let's look at things we're gonna need
- a vector of threads
- mutex and condition_variable
- a queue of the tasks or functions to be executed
- a bool variable so we can destroy threads
- a worker function that acts as our task
We will be using an enqueue
function where our actual logic for threadpool will be implemented.
Let's look at our code until yet:
class ThreadPool {
private:
std::vector<std::thread> workers;
std::mutex mutex;
std::condition_variable cv;
std::queue<std::function<void()>> queue;
void worker();
bool stop;
public:
ThreadPool(std::size_t nr_threads = std::thread::hardware_concurrency());
~ThreadPool();
template<typename F, typename... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<decltype(f(args...))>;
ThreadPool(ThreadPool&) = delete;
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
};
std::thread::hardware_concurrency
returns the amount of concurrent threads supported by the implementation.
Now, let's initialize our constructor first, we would need to simply fill the vector with a worker
function:
ThreadPool::ThreadPool(std::size_t nr_workers) {
stop = false;
for (auto i{ 0 }; i < nr_workers; i++) {
workers.emplace_back(&ThreadPool::worker, this);
}
}
We use emplace_back
as instead of taking a value_type, it takes a variadic list of arguments, so that means that you can now perfectly forward the arguments and construct directly an object into a container.
Thread Worker
The ThreadPool::worker
function is our ThreadWorker, and it's necessary to process what's inside the queue, let's see how we will implement it:
- First we will create a lock to acquire mutex.
- We would want to loop until shutdown/stop is requested.
- The thread stops until it's woken up again and the condition_variable will be checked if it's true, which will check if the queue is empty or stop = true.
- We will grab the function from queue and execute it.
We will use unique lock with a scope until we grab the function.
void ThreadPool::worker() {
for (;;) {
std::function<void()> cur_task;
{
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock, [this]() {
return stop || !queue.empty();
});
if (stop && queue.empty())
break;
if (queue.empty())
continue;
cur_task = queue.front();
queue.pop();
// grab the fx from queue
}
cur_task();
}
}
ThreadPool::enqueue
It is the "add task" function of our ThreadPool, returning a future through which we can get the result of the task, let's see what steps we need to build the function:
- Templatize function and variadic arguments.
- Bind the function itself and it's arguments using
std::bind
. - Since the function we are gonna push inside the queue needs to be copyable, as
std::function
target must be a copy constructor, hence we will wrap it inside a shared pointer. - We will acquire a lock, since the queue might be accessed by other, put the encapsulated or wrapped function inside the queue, hence executing it.
- Use
cv.notify_one()
to notify one thread and process the task. - Return the future object.
template<typename F, typename... Args>
inline auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<decltype(f(args...))> {
auto func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
auto encapsulated_ptr =
std::make_shared<std::packaged_task<decltype(f(args...))()>>
(func);
std::future<std::result_of_t<F(Args...)>> future_object = encapsulated_ptr->get_future();
{
std::unique_lock<std::mutex> lock(mutex);
queue.emplace([encapsulated_ptr]() {
(*encapsulated_ptr)(); // execute the fx
});
}
cv.notify_one();
return future_object;
}
We cannot deduce the return type at compile time hence we use the auto keyword and provide the return hint using decltype
.
Okay now 90% of our task is done, let us create the destructor of the class which will notify all using the condition variable and join all the threads.
ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(mutex);
stop = true;
}
cv.notify_all();
for (auto& worker : workers) {
worker.join();
}
}
Testing our ThreadPool
int main() {
ThreadPool pool(4);
std::vector<std::future<int>> results;
for (int i = 0; i < 8; ++i)
{
auto future = pool.enqueue([i] {
return i + i;
});
results.emplace_back(std::move(future));
}
for (auto& result : results)
std::cout << result.get() << ' ';
std::cout << std::endl;
}
We create a pool of 4 threads, a vector containing the future of the threads and hence using the enqueue
to execute the function.
This gives us:
0 2 4 6 8 10 12 14
If you reached till the end, thankyou for reading.
Top comments (4)
Great blog! The content is insightful, well-researched, and presented in a user-friendly manner. I appreciate the in-depth analysis of current tech trends and the clear explanations provided. Keep up the excellent work!
Quite daunting man, appreciate the research π
Great work!
Finally some good fucking blog