DEV Community

Paweล‚ bbkr Pabian
Paweล‚ bbkr Pabian

Posted on

Dynamically pairing tokio spawns

Hi everyone!

While learning Rust I stumbled upon problem not covered by popular online tutorials. When they talk about tokio and MPSC (multi-producer single-consumer) channels they usually connect spawned threads in some fixed way. However in my project I have to match dynamically asynchronous producers and consumers in various configurations. So let me share useful pattern I've discovered in my Rust journey.

Let's say we have a restaurant:

$ cargo init restaurant
    Creating binary (application) package
$ cd factory
$ cargo add tokio --features=full
    Updating crates.io index
      Adding tokio v1.38.0 to dependencies
      ...
Enter fullscreen mode Exit fullscreen mode

As a manager we can assign different cooking stands to asynchronously prepare different types of food (don't worry about undefined values for now):

async fn cooking_stand (food: char) {
    loop {
        somewhere.send(food.clone()).await;
    }
}
Enter fullscreen mode Exit fullscreen mode

Food should be delivered to tables awaiting it.

async fn table (number: u8) {
    loop {
        let food = somehow.recv().await;
        println!("Got {} at table {}", food, number);
    }
}
Enter fullscreen mode Exit fullscreen mode

Now we can organize our restaurant:

#[tokio::main]
async fn main () {

    // cooking stands
    tokio::spawn(cooking_stand('๐Ÿฅ—')); // salad
    tokio::spawn(cooking_stand('๐Ÿ”')); // burger
    ...
    // tables for guests
    tokio::spawn(table(1));
    tokio::spawn(table(2));
    ...
    // keep our restaurant open for 1s
    sleep(Duration::from_millis(1000)).await;
}
Enter fullscreen mode Exit fullscreen mode

Problem

For simplicity let's assume we accept orders through application. So restaurant manager (main thread) knows for example that table 1 is waiting for ๐Ÿฅ— and table 3 is waiting for ๐Ÿ”. But how to actually fullfil those orders?

Naive approach

cooking_stand -> ๐Ÿฅ—๐Ÿฅ—๐Ÿฅ—๐Ÿฅ—๐Ÿฅ— ->         -> table 1
cooking_stand -> ๐Ÿ•๐Ÿ•๐Ÿ•๐Ÿ•๐Ÿ• -> manager -> table 2
cooking_stand -> ๐Ÿ”๐Ÿ”๐Ÿ”๐Ÿ”๐Ÿ” ->         -> table 3
Enter fullscreen mode Exit fullscreen mode

If we force manager to do the job he can wait for ๐Ÿฅ— cooking stand to prepare salad and then pass it to table 1. Then wait for ๐Ÿ” cooking stand to prepare burger and carry it to table 3. This is obviously flawed design:

  • Cooking stands produce food whether it is needed or not.
  • If cooking stand is slow then manager will be waiting for food to be prepared.
  • Manager should not do the heavy lifting because it affects his responsiveness.

We need waiters

Fortunately tokio gives perfect tool for the job - oneshot channels. Those channels are designed and optimized to pass single value one time.

let (waiter_rx, waiter_tx) = oneshot::channel::<char>();
Enter fullscreen mode Exit fullscreen mode

To make waiter deliver ๐Ÿฅ— to table 1 first we need to modify our cooking stands:

async fn cooking_stand (
    product: char,
    mut waiters: tokio::sync::mpsc::Receiver<oneshot::Sender<char>>
) {
    while let Some(waiter) = waiters.recv().await {
        waiter.send(product.clone());
    }
}
Enter fullscreen mode Exit fullscreen mode

Where tokio::sync::mpsc::Receiver<oneshot::Sender<char>> is a queue of waiters. Yes, you read it right. You can send oneshot channels through other channels. When waiter arrives at cooking stand then cooking stand prepares food and gives it to waiter for being delivered to table. Let's do the same for tables, but they should get receiving part of specific waiter who will bring food to them:

async fn table (
    number: u8,
    mut waiters: tokio::sync::mpsc::Receiver<oneshot::Receiver<char>>
) {
    while let Some(waiter) = waiters.recv().await {
        let food = waiter.await.unwrap();
        println!("Got {} at table {}", food, number);
    }
}
Enter fullscreen mode Exit fullscreen mode

When waiter is assigned to table customer waits for this waiter to deliver food produced by food stand. And to complete puzzle let's modify our main function. Manager, instead of doing the heavy lifting himself, can hire waiters and assign them to pairs of cooking stands and tables to fullfill food orders.

#[tokio::main]
async fn main () {

    // used by manager to send waiters to cooking stands
    let (stand_salad_tx, stand_salad_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
    let (stand_pizza_tx, stand_pizza_rx) = mpsc::channel::<oneshot::Sender<char>>(100);
    let (stand_burger_tx, stand_burger_rx) = mpsc::channel::<oneshot::Sender<char>>(100);

    // set up cooking stands
    tokio::spawn(cooking_stand('๐Ÿฅ—', stand_salad_rx));
    tokio::spawn(cooking_stand('๐Ÿ•', stand_pizza_rx));
    tokio::spawn(cooking_stand('๐Ÿ”', stand_burger_rx));

    // used by manager to send waiters to tables
    let mut tables: Vec<tokio::sync::mpsc::Sender<oneshot::Receiver<char>>> = Vec::new();

    // set up tables
    for number in 1..=4 {
        let (table_tx, table_rx) = mpsc::channel::<oneshot::Receiver<char>>(100);
        tables.push(table_tx);
        tokio::spawn(table(number, table_rx));
    }
}
Enter fullscreen mode Exit fullscreen mode

Let's check if it works by adding following code at the end of our main:

    // create waiter
    let (waiter_tx, waiter_rx) = oneshot::channel::<char>();
    // send him for food to salad stand
    stand_salad_tx.send(waiter_tx).await;
    // send him to deliver food to table `1`
    tables.get(0).unwrap().send(waiter_rx).await;
    // manager can go back to doing his stuff

    // keep our restaurant open for 1s
    sleep(Duration::from_millis(1000)).await;
Enter fullscreen mode Exit fullscreen mode

When ran it produces following output:

Got ๐Ÿฅ— at table 1
Enter fullscreen mode Exit fullscreen mode

Yay!

Conclusions

This pattern of sending two halves of oneshot channels through regular channels to tokio spawns can be used to implement all kind of traffic control. Passing messages with given ratio, with throttling, etc.

  • Is it efficient? Very! I was surprised how well oneshot channels are optimized. Single core of my Ryzen 6800U processor was able to create over 5_000_000 oneshot channels and send them to corresponding spawns per second. That's crazy fast.

  • How to scale it? There will be situations when manager may encounter overfill of channels (in tokio all channels are capped) and will not be able to immediately send oneshot channel. In that cases you may for example increase amount of producers/consumers by issuing more spawns. Like add another burger stand and send oneshots to them in round robin order. Everything depends on what your spawns are actually doing.

  • What about error handling? You must have oneshot channel behavior in mind: If the Receiver is closed before receiving a message which has already been sent, the message will remain in the channel until the receiver is dropped, at which point the message will be dropped immediately. So even if two halves of oneshot channels were sent to corresponding spawns it still does not mean it's purpose will be fullfilled. Error handling in this case depends on which scenario you implement and how you need to react on delivery issues.

Thanks for reading

This is my first Rust post and I'm still discovering its features. If you think something could/should be better implemented then let me know in comments.

Top comments (0)