DEV Community

Ishan Tripathi
Ishan Tripathi

Posted on

Making a Thread Pool in C++ from scratch

Prerequistic Knowledge


I highly recommend to have some knowledge of c++, threads and programming in general, otherwise it would be tedious for you to research and understand all of this.

Thread Pools

What are Thread Pools?

They are a group of threads that are initially created, while waiting for a job or task to execute.
The idea is to have threads that are always executing without being destroyed, hence avoiding the hassle to create or destroy threads when necessary.

To do this in C++ we'll have to look at some of the features provided by the language. I am providing a small explaination of what the concepts do below, but i recommend you to have a look at them by yourself through the documentation.

std::thread, std::mutex and std::condition_variable

  • Threads allow multiple functions to execute concurrently.
    std::thread represents a single thread of execution.

  • If you have ever used threads in your program, you must've heard of thread safety and mutexes. The std::mutex is a synchronization primitive that can be used to protect shared data from being simutaneously accessed by multiple threads.

  • The purpose of this class is to wait for some condition to become true. The std::condition_variable is something used with mutexes - To block one or more threads, until another thread does two things

    • Modify a shared variable
    • Notifies the condition_variable.

std::forward, perfect forwarding and std::bind

  • std::forward is used to implement perfect forwarding. Perfect forwarding is the process of forwarding arguments in such a way that contains it's original value type - whether it's an rvalue or an lvalue.

  • std::bind class is used for partial function application, meaning if the arguments are pre-specified, it generates a forwarding call wrapper for the function say 'f'. Calling this wrapper is equivalent to calling the original function 'f' with some arguments already bound to list of arguments to bind.

std::future and std::packaged_task

  • std::future provides a method to access the results of asynchronous operations. They are associated with a shared state and can be constructed by using std::packaged_task.
  • std::packaged_task wraps any callable target so it can be called asynchronously.

Program Logic

We will be creating a packaged_task in our main thread, which uses get_future to obtain std::future. As they use shared resource, we can move the packaged_task into the other threads and start the job or task as it wraps a callable function.

Using the condition_variable we can put the thread to no task running mode or wake them up if a task piles up.

Before we get into coding, let's look at things we're gonna need

  • a vector of threads
  • mutex and condition_variable
  • a queue of the tasks or functions to be executed
  • a bool variable so we can destroy threads
  • a worker function that acts as our task

We will be using an enqueue function where our actual logic for threadpool will be implemented.

Let's look at our code until yet:

 class ThreadPool {
private:
    std::vector<std::thread> workers;
    std::mutex mutex;
    std::condition_variable cv;
    std::queue<std::function<void()>> queue;
    void worker();
    bool stop;

public:
    ThreadPool(std::size_t nr_threads = std::thread::hardware_concurrency());
    ~ThreadPool();

    template<typename F, typename... Args>
    auto enqueue(F&& f, Args&&... args) -> std::future<decltype(f(args...))>;

    ThreadPool(ThreadPool&) = delete;
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool& operator=(ThreadPool&&) = delete;
    ThreadPool& operator=(const ThreadPool&) = delete;
};
Enter fullscreen mode Exit fullscreen mode

std::thread::hardware_concurrency returns the amount of concurrent threads supported by the implementation.

Now, let's initialize our constructor first, we would need to simply fill the vector with a worker function:

ThreadPool::ThreadPool(std::size_t nr_workers) {
    stop = false;
    for (auto i{ 0 }; i < nr_workers; i++) {
        workers.emplace_back(&ThreadPool::worker, this);
    }
}
Enter fullscreen mode Exit fullscreen mode

We use emplace_back as instead of taking a value_type, it takes a variadic list of arguments, so that means that you can now perfectly forward the arguments and construct directly an object into a container.

Thread Worker

The ThreadPool::worker function is our ThreadWorker, and it's necessary to process what's inside the queue, let's see how we will implement it:

  • First we will create a lock to acquire mutex.
  • We would want to loop until shutdown/stop is requested.
  • The thread stops until it's woken up again and the condition_variable will be checked if it's true, which will check if the queue is empty or stop = true.
  • We will grab the function from queue and execute it.

We will use unique lock with a scope until we grab the function.

void ThreadPool::worker() {
    for (;;) {
        std::function<void()> cur_task;
        {
            std::unique_lock<std::mutex> lock(mutex);
            cv.wait(lock, [this]() {
                return stop || !queue.empty();
            });

            if (stop && queue.empty()) 
                break;
            if (queue.empty())
                continue; 

            cur_task = queue.front();
            queue.pop();
            // grab the fx from queue
        }
        cur_task();
    }
} 
Enter fullscreen mode Exit fullscreen mode

ThreadPool::enqueue

It is the "add task" function of our ThreadPool, returning a future through which we can get the result of the task, let's see what steps we need to build the function:

  • Templatize function and variadic arguments.
  • Bind the function itself and it's arguments using std::bind.
  • Since the function we are gonna push inside the queue needs to be copyable, as std::function target must be a copy constructor, hence we will wrap it inside a shared pointer.
  • We will acquire a lock, since the queue might be accessed by other, put the encapsulated or wrapped function inside the queue, hence executing it.
  • Use cv.notify_one() to notify one thread and process the task.
  • Return the future object.
template<typename F, typename... Args>
inline auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<decltype(f(args...))> {
    auto func = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
    auto encapsulated_ptr =
        std::make_shared<std::packaged_task<decltype(f(args...))()>>
        (func);

    std::future<std::result_of_t<F(Args...)>> future_object = encapsulated_ptr->get_future();
    {
        std::unique_lock<std::mutex> lock(mutex);
        queue.emplace([encapsulated_ptr]() {
            (*encapsulated_ptr)(); // execute the fx
            });
    }
    cv.notify_one();
    return future_object;
}
Enter fullscreen mode Exit fullscreen mode

We cannot deduce the return type at compile time hence we use the auto keyword and provide the return hint using decltype.

Okay now 90% of our task is done, let us create the destructor of the class which will notify all using the condition variable and join all the threads.

ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(mutex);
        stop = true;
    }

    cv.notify_all();
    for (auto& worker : workers) {
        worker.join();
    }
}
Enter fullscreen mode Exit fullscreen mode

Testing our ThreadPool

int main() {
    ThreadPool pool(4);
    std::vector<std::future<int>> results;

    for (int i = 0; i < 8; ++i)
    {
        auto future = pool.enqueue([i] {
            return i + i;
            });
        results.emplace_back(std::move(future));
    }

    for (auto& result : results)
        std::cout << result.get() << ' ';
    std::cout << std::endl;
}
Enter fullscreen mode Exit fullscreen mode

We create a pool of 4 threads, a vector containing the future of the threads and hence using the enqueue to execute the function.

This gives us:

0 2 4 6 8 10 12 14

If you reached till the end, thankyou for reading.

Top comments (4)

Collapse
 
dhananjay_jsr profile image
Dhananjay-JSR

Great blog! The content is insightful, well-researched, and presented in a user-friendly manner. I appreciate the in-depth analysis of current tech trends and the clear explanations provided. Keep up the excellent work!

Collapse
 
eliza502 profile image
Elizabeth

Quite daunting man, appreciate the research 🙌

Collapse
 
coderfek profile image
Praveen

Great work!

Collapse
 
raj_ profile image
Rajvardhan Agrawal

Finally some good fucking blog