DEV Community

toyster
toyster

Posted on

Using blocking code in async

Tokio's block in place example, this is discussed here, same thing in golang is here

// source - https://gist.github.com/miquels/8576d1394d3b26c6811f4fc1e7886a1c
use std::{
fs::File,
io::{Read, Write},
time::Instant,
};
use tokio::task::{self, JoinHandle};

async fn compute() {
let handles: Vec<JoinHandle<_>> = (0..1000)
    .map(|_| {
        tokio::spawn(async move {
            let mut buffer = [0; 10];
            {

                task::block_in_place(move || {
                    let mut dev_urandom = File::open("/dev/urandom").unwrap();
                    dev_urandom.read(&mut buffer).unwrap();
                });
            }
            task::block_in_place(move || {
                let mut dev_null = File::create("/dev/null").unwrap();
                dev_null.write(&mut buffer).unwrap();
            });
        })
    })
    .collect();
for handle in handles {
    handle.await.unwrap();
}
}

#[tokio::main]
async fn main() {
// warmup
compute().await;

let before = Instant::now();
for _ in 0usize..1000 {
    compute().await;
}
let elapsed = before.elapsed();
println!(
    "{:?} total, {:?} avg per iteration",
    elapsed,
    elapsed / 1000
);
}
Enter fullscreen mode Exit fullscreen mode

I like async-std better here

Parallel Hacker news scraping using tokio's block_in_place

use serde_json::Value;
use tokio::task::{self, JoinHandle};

const URL: &str = "https://hacker-news.firebaseio.com/v0/topstories.json";
const ITEMURLBASE: &str = "https://hacker-news.firebaseio.com/v0/item";

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

    let resp = reqwest::get(URL).await?.json::<Vec<i32>>().await?;

    let total_stories = resp.len();
    let concurrency = 50;

    // Create a task group, get 10 story titles at a time
    let only_some = resp[0..total_stories].to_vec();

    for chunk in only_some.chunks(concurrency) {

        let handles: Vec<JoinHandle<_>> = chunk
            .iter()
            .map(|&i| {
                let item_url = format!("{}/{}.json", ITEMURLBASE, i);
                tokio::spawn(async move {
                    // Using block_in_place to avoid context switching
                    task::block_in_place(move || {
                        let r = reqwest::blocking::get(&item_url).unwrap().text().unwrap();
                        let json: Value = serde_json::from_str(&r).unwrap();
                        let title = json["title"].clone();
                        (item_url, title)
                    })
                })
            })
            .collect();

        for handle in handles {
            let (_urls, titles) = handle.await.unwrap();
            println!("{}", titles.as_str().unwrap());
        }

    }

    Ok(())
}
Enter fullscreen mode Exit fullscreen mode

Discussion (0)