DEV Community

Alan
Alan

Posted on

Parallel Programming: 1thread per core in Rust

This tweet boomed the other days. It's about the rare computation model of the newly released game by id Software, Doom Eternal.

The idea is foreign and crazy because when typically modern games would rely on data-oriented, CPU cache-friendly code first and having a single thread to handle render jobs, this game does differently.

The game creates a thread for each core to ensure all the CPU cores full. Data-oriented code is still critical toward performance, but the OP claims that keeping all CPU cores full is more important.

Architecture, either it's code, system, or information, always fascinate me, so does this novel computation model in the Doom Eternal. I'm also fascinated by Rust language and it's been months I'm learning more and more about it, though I haven't got the chance to write anything serious with it.

Tempted, I wrote a similar but simple computation model in Rust.

For the exercise, I wrote an executable that does simple CPU heavy calculations using a similar computation model. bcrypt hash is used as the CPU heavy calculation. The executable applies bcrypt-hash to 100 equal-length strings and writes the result into a file. I also wrote other approaches to compare with this model.

QThread

I love naming things, so I named this QThread.

QThread is a wrapper around a thread join handle and it's mpsc communication channel. The thread wrapped around has a specific job, receiving either pointers of functions and then executing them, or receiving None and then kill itself.

I just realized there's a cargo called QThread already that does entirely different thing, I should rename this someday

struct QThread {
    join_handle: JoinHandle<()>,
    active_arc: Arc<Option<()>>,
    tx: Sender<Option<Box<dyn Fn() -> () + Send>>>,
}

impl QThread {
    fn new() -> QThread {
        let (tx, rx) = channel::<Option<Box<dyn Fn() -> () + Send>>>();
        let join_handle: JoinHandle<()> = thread::spawn(move || loop {
            match rx.recv().unwrap() {
                Some(boxed_fn) => {
                    boxed_fn();
                }
                None => break,
            }
        });

        let active_arc: Arc<Option<()>> = Arc::new(None);
        QThread {
            tx,
            join_handle,
            active_arc,
        }
    }

}

To queue job into QThread, exec method is implemented into it.

The exec method receives a pointer of function. It wraps the pointer with another intermediate function, before passing it into the thread. This intermediate function creates a Weak, executes the inner function, and then drop it. The Weak is used to track the number of inner functions running.

The method active_count is introduced to get current running function by counting the number of Weak pointing to the active_arc.

impl QThread {
    fn exec(&self, func: Box<dyn Fn() -> () + Send>) {
        let strong = Arc::clone(&self.active_arc);
        self.tx
            .send(Some(Box::new(move || {
                let weak = Arc::downgrade(&strong);
                func();
                drop(weak);
            })))
            .unwrap()
    }

    fn active_count(&self) -> usize {
        Arc::weak_count(&self.active_arc)
    }
}

Remember that QThread will stop executing if it's sent None signal? schedule_kill method is introduced to schedule termination of the thread.

impl QThread {
    fn schedule_kill(&self) {
        self.tx.send(None);
    }
}

Now, we have a thread wrapper that receives a function pointer and executes the function. What's remaining is we will make multiple QThread and to achieve job balancing between multiple QThread, we need a regulator.

Let's make a QThreadRegulator. QThreadRegulator will contain many QThread.

struct QThreadsRegulator {
    workers: Vec<Rc<QThread>>,
}

impl QThreadsRegulator {
    fn new(worker_num: &usize) -> QThreadsRegulator {
        let mut workers: Vec<Rc<QThread>> = vec![];
        for i in 0..*worker_num {
            workers.push(Rc::new(QThread::new()));
        }
        QThreadsRegulator { workers }
    }
}

We need QThreadRegulator to help us find the currently available QThread. Let's make find_available_worker which returns the first QThread which has 0 active_count.

impl QThreadsRegulator {
    fn find_available_worker(&self) -> Option<&QThread> {
        for worker in &self.workers {
            if worker.active_count() == 0 {
                return Some(worker);
            }
        }
        None
    }
}

Last, we need some way to decommission all QThreads inside QThreadRegulator and to sync them all. We'll introduce schedule_kill and also wait_all.

impl QThreadsRegulator {

    fn schedule_kill(&self) {
        for worker in &self.workers {
            worker.schedule_kill();
        }
    }

    fn wait_all(&mut self) {
        let workers = &mut self.workers;
        while let Some(worker) = workers.pop() {
            match Rc::try_unwrap(worker) {
                Ok(worker_raw) => worker_raw.join_handle.join().unwrap(),
                Err(_) => eprintln!("join handle failed"),
            };
        }
    }
}

Using QThread

Now it's the turn to make the code that uses QThread to bcrypt-hash 100 equal-length strings. This code will be using bcrypt crate to bcrypt-hash the strings.

First, we open the input and output files. Then we create lines iterator for the input file.

pub fn qthreaded() -> io::Result<()> {
    let (in_path, out_path) = get_in_out_paths()?;
    let in_file = OpenOptions::new().read(true).open(in_path.as_os_str())?;
    let out_file_mutex = Arc::new(Mutex::new(
        OpenOptions::new()
            .read(true)
            .write(true)
            .create(true)
            .open(out_path.as_os_str())?,
    ));
    let mut lines_iter = BufReader::new(&in_file).lines();

Create the QThreadsRegulator.

    let mut worker_regulator = QThreadsRegulator::new(&num_cpus::get());

Loop the lines iterator. For each line, there's a nested loop inside. In the inner loop, QThread's find_available_worker will be called to see if there is any available QThread. If there's one, pass a job function pointer into it and move on to the next line. If there's none, simply sleep to let the main orchestration thread rest.

    let mut iteration = 0;
    for line in lines_iter {
        let pass = line?;
        println!("qthreaded count: {}", iteration);
        iteration += 1;
        if pass.len() > 0 {
            loop {
                let pass_clone = pass.clone();
                let out_file_mutex_clone = Arc::clone(&out_file_mutex);
                match worker_regulator.find_available_worker() {
                    Some(worker) => {
                        println!("worker-found");
                        worker.exec(Box::new(move || {
                            let pass_u8 = pass_clone.as_bytes();
                            match hash_and_write(&pass_u8, &out_file_mutex_clone) {
                                Ok(_) => (),
                                Err(_) => (),
                            }
                        }));
                        break;
                    },
                    None => {
                        println!("no-worker-found");
                        std::thread::sleep(std::time::Duration::from_millis(5));
                    },
                }
            }
        }
    }

Finally, kill the QThreads and wait for all QThreads to finish their jobs.

    worker_regulator.schedule_kill();
    worker_regulator.wait_all();

    Ok(())
}

We're done.

The Repo

Here's the repository where you can clone and cargo run it.

GitHub logo Kelerchian / expr-bcrypt-qthreaded

One thread per core in Rust

Inside the repo there's also 2 more calculations method to be compared, both are doing the same, bcrypt-hash-ing the same 100 equal-length string with the same bcrypt cost.

The first is sequential, sequential is simply one thread running the bcrypt-hash. I created it to see how much faster QThread is compared to doing it with a single core.

The second is batched. Batched is similar to sequential, but each iteration will create threads as many as available CPUs in the device, and run concurrent bcrypt-hash. It's not so much a fair comparison to the QThread one because in this one the out_file are written using the main thread.

Disclaimer:
QThread and QThreadRegulator are runnable but not perfect. There are many things to fix, few of them are:

  • No recovery from panic in the thread.
  • No job graph like Doom Eternal.
  • Weak calculation is inaccurate due to it being created after the intermediate function is run, not before.
  • Decommissioning QThread should use impl Drop.
  • Dynamic QThread number inside the regulator (imagine plug-and-play processor in the future!)

It's a POC for fun and educational, but I might make it a full-fledged crate in the future.

Comparing time

The time needed for QThread method is the fastest of the three. Batched and QThread has almost the same time though, while sequential need 4 times the time required by batched and QThread.

As of why QThread is always faster is maybe because either 1.) there's a cost in creating a thread or 2.) the one thread file write is slow.

Results will vary between machines.

Conclusion

Not much to conclude here, the comparison I made is not exactly fair and there's not much metric to see other than time, there's so much more to explore like core usage.

From this experiment, I find that writing 1thread/core code is not as complex as I thought it would be. I can still read the code easily despite the new necessary complexity. There's an additional need to check for an available thread but that's it.

This computation model may be useful for other fields of computing. Now, while it is now a hidden treasure laying in the Doom Eternal codebase, may this blog post and the tweet sparks interest to the computation model to other games and other fields of computing. Unfortunately, multi-threading is now unavailable in wasm and that's a part where I would really like to explore, making a games in wasm and HTML Canvas.

Rust is really fascinating. It is surprisingly easy to write in it despite it is as low as c++ in terms of its closeness to bare-metal. And that makes me continue to love Rust. It's been fun, and moreover it is actually easy to write for people used to operate on a higher-level language. I write TypeScript every day and I don't feel any difficulties moving paradigm from TypeScript to Rust.

That's all, thanks for reading.

Top comments (0)