DEV Community

Cover image for Let's build a thread pool in Rust 🦀 that can execute multiple tasks concurrently
Abinash Sahoo
Abinash Sahoo

Posted on

Let's build a thread pool in Rust 🦀 that can execute multiple tasks concurrently

Imagine, you built a system and you are using concurrency to make it more responsive.

This means, your main thread is receiving tasks from your users and you are creating threads to execute those tasks so that while executing the tasks, you do not want your system to go down and not receive any further tasks.

Now imagine, your user base grows and you are receiving lots of tasks to do, so now if you create different threads for each task you are wasting your time creating and deleting threads every time a new task arrives or is done.

In that situation, you should switch to a thread pool architecture.

Because a thread pool is the most efficient way to execute multiple tasks concurrently.

So now let's explore how you can create a thread pool to manage your multiple tasks concurrently.

Before starting the actual coding part, let's first understand what a thread pool is and how it works.


Thread Pool

A thread pool is a collection of worker threads that are created to do multiple tasks simultaneously and wait for the new tasks to arrive.

This means, multiple threads are created and all of them sit idle.

Whenever your system gets tasks it can quickly assign tasks to those threads and save lots of time without creating and deleting threads multiple times.

thread-pool-in-rust

As you can see, a thread pool is a collection of multiple threads that are waiting to receive tasks from the main thread to execute.

In that picture, there are a total of 15 tasks in the main thread and all those are forwarded to different worker threads to execute concurrently.

As you got the idea of thread pool let's understand the inner workings of thread pool.


How Does a Thread Pool Work Under the Hood?

In a thread pool architecture, there is a main thread that has only two tasks.

  1. Receive all the tasks and store them in a place.
  2. Create multiple threads and assign them different tasks periodically.

Now let's understand how the thread pool works.

A thread pool is created before receiving the tasks and stored somewhere with an ID so that we can identify them by their ID.

Then every thread is just waiting to receive any task, if they get the task they start working on it.

After finishing, they again wait for the next task.

While that thread is busy doing the task, the main thread assigns more tasks to other threads so that no thread has to sit idle until the main thread goes out of task.

After completion of all tasks, the main thread terminates all threads and closes the thread pool.

Now you know how a thread pool operates. So, let's do some programming and implement a thread pool architecture in Rust.


Implementing A Thread Pool in Rust

To explain the concepts, I am just using small code snippets, it's not the whole code. But, you can access to whole main.rs file here.

Let's take a rough structural overview of what we need to create a thread pool.

So, we need three things.

  1. A way to create threads.
  2. A place to store them.
  3. A way to assign them different tasks.

Now let's go through it one by one.

1. A way to create threads

We need a function that spawns a thread and returns its JoinHandle.

Also, we need some sort of ID, so that if we messed up then we can log the errors with thread ID so we can know which thread got wrong.

As you can tell, if two interrelated data need to combine, we need a struct. So, let's create one.

struct Worker {
    id: usize,
    thread: JoinHandle<()>
}
Enter fullscreen mode Exit fullscreen mode

Now we have to implement a constructor function that can return a new Worker.

impl Worker {
    fn new(id: usize) -> Self {
        let thread = thread::spawn(|| {});

        Self {id, thread}
    }
}
Enter fullscreen mode Exit fullscreen mode

Now our function is ready to create threads and return them to the caller.

So, let's create a thread pool that can hold all the threads and can execute tasks on different threads.

2. A place to store them

So we need a struct that holds all the JoinHandles of all threads, also we want to control how many threads our thread pool can have.

This means, a struct with a contractor function that tasks a number to indicate the number of threads and it must call the thread contractor to create threads.

struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    fn new(size: usize) -> Self {
        assert!(size > 0, "Need at least 1 worker!");

        let mut workers = Vec::with_capacity(size);

        for i in 0..size {
            workers.push(Worker::new(i));
        }

        Self { workers }
    }
}
Enter fullscreen mode Exit fullscreen mode

We have functions to create threads and manage threads, now it's time to create a function that can assign tasks to different threads.

3. A way to assign them different tasks.

Our thread pool struct must have a function that can assign and make them execute the task inside the threads.

But we have a problem, how can we send tasks to threads so that they can execute them?

To do so, we need a task type that represents the tasks we need to complete.

type task = Box<dyn FnOnce() + Send + 'static>;
Enter fullscreen mode Exit fullscreen mode

Here you do not have to know all these keywords but for context, Box<dyn> means our tasks must implement these traits.

Do not worry, we will cover traits in more detail in another post, but for now, just know that our task implement FnOnce() means our task is a function that can be run only one time.

Then Send, as we are sending tasks from the main thread to worker threads, so, we make our task Send type so that it can safely transferred between threads.

Next is 'static which means we need our task must live as long as the program is running.

So, now it's time to send the task to the worker thread.

But to do so, we have to establish a channel between our main thread to all worker threads.

So we need Arc<Mutex<()>>.

Let's update both constrictor methods.

struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>
}

impl ThreadPool {
    fn new(size: usize) -> Self {
        assert!(size > 0, "Need at least 1 worker!");

        let (sender, reciever) = mpsc::channel();
        let reciever = Arc::new(Mutex::new(reciever));

        let mut workers = Vec::with_capacity(size);

        for i in 0..size {
            workers.push(Worker::new(i, Arc::clone(&reciever)));
        }

        Self {
            workers,
            sender: Some(sender)
        }
    }
}

impl Worker {
    fn new(id: usize, reciever: Arc<Mutex<Receiver<Task>>>) -> Self {
        let thread = thread::spawn(move || {});

        Self {
            id,
            thread
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Here in the ThreadPool constructor, we create a new channel and wrap the receiver in Arc<Mutex<()>> because we will send the receiver to the worker thread so that from the main thread we can send tasks and worker threads can receive them.

Also, we have to update the ThreadPool struct to include a sender which is going to be used by the main thread to send tasks to different threads.

Now, lets implement the logic to execute the task inside the worker threads.

fn new(id: usize, reciever: Arc<Mutex<Receiver<task>>>) -> Self {
    let thread = thread::spawn(move || {
        loop {
            let receiver = reciever.lock()
                .expect("Failed to grab the lock!")
                .recv();

            match receiver {
                Ok(task) => {
                    println!("Thread {} got the task& executing.", id);
                    task();
                    thread::sleep(Duration::from_millis(10));
                },

                Err(_) => {
                    println!("No got the task");
                    break;
                }
            }
        }
    });

    Self {
        id,
        thread
    }
}
Enter fullscreen mode Exit fullscreen mode

Here, we are looping and in every loop we are trying to get the lock and calling recv() on the lock so that we can get the task sent by the main thread.

Calling recv(), returns a Result so instead of unwrapping it, we do a match on it so we can log when the thread got the task or not.

If we fail to get no task then we are breaking out of the loop to avoid extra CPU cycles.

Now it's time to implement a function in ThreadPool to send tasks to different threads.

We used ThreadPool to store threads and then used Worker to execute tasks but we have not implemented a function to send tasks to different threads.

Let's implement it.

impl ThreadPool {
    fn new(size: usize) -> Self {
        // snip
    }

    fn execute<F>(&self, job: F)
    where
        F: FnOnce() + Send + 'static
    {
        let job = Box::new(job);

        self.sender.send(job)
            .expect("Failed to send the job to workers!");
    }
}
Enter fullscreen mode Exit fullscreen mode

Now, that we have completed the all functionalities we need to create a thread pool, let's run it and test our first thread pool.

thread-pool-in-rust

Ohh! We have made a great mistake.

Can you tell me what it is?

You have 10 seconds to answer it.

1 2 3 4 ...

Okay, let me explain.

We spawn different threads and execute different tasks on them but we forget to join them before the main thread terminates that is why our program keeps running.

Let's fix it.

As our thread pool ThreadPool manages our all threads, we need to crate a function that dynamically terminates all threads when the ThreadPool end.

In simple terms, we have to manually implement the Drop trait for ThreadPool where we will terminate all the threads.

So, let's do it.

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Thread {} is shutting down.", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().expect(format!("Failed to join the thread {}", worker.id).as_str());
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Here we have to also drop the sender because if we do not do it then the receiver is going to loop forever. If we drop the sender then the receiver automatically drops and we can successfully go out of the program.

Let's give it a run and see what happens.

thread-pool-in-rust

Nice, we have successfully created a working thread pool.


Conclusion

Thread pool architecture is really efficient when it comes to handling multiple tasks simultaneously.

Big companies are also using thread pools to handle their requests but their thread pools are more advanced.

I hope you find it helpful, also if you have any questions feel free to ask in the comment section.

Have a great day.


GitHub        Twitter/X        Sponsor Me        Oxide Org.

Top comments (0)