This is part of a series of posts about building b.roll, a terminal session recorder and search tool written in Rust. If you're just joining, here are the previous posts:
When broll records a terminal session, three things need to happen at the same time: keystrokes must flow into the shell, the shell's output must appear on screen, and that output must be processed and stored in a database. Doing all three sequentially would mean dropped keystrokes and laggy rendering. So broll uses three threads, coordinated through channels and atomics.
This post walks through the concurrency architecture of broll's recorder, explaining each primitive: std::thread::spawn, mpsc channels, Arc, AtomicBool, AtomicU16, Ordering, move closures, and signal_hook for terminal resize handling. All code is from src/recorder/mod.rs.
The Three-Thread Architecture
Here is the layout:
+------------------+
Real stdin ------> | Thread 1 (stdin) | ------> PTY master (writer)
+------------------+
+------------------+
PTY master ------> | Main thread | ------> Real stdout
(reader) | (PTY -> stdout) | ------> mpsc channel (tx)
+------------------+
+------------------+
mpsc channel ----> | Thread 2 | ------> SQLite (via Database)
(rx) | (storage) |
+------------------+
Thread 1 (stdin) reads from the real terminal and writes to the PTY master, forwarding your keystrokes to the sub-shell.
Main thread reads from the PTY master and writes to the real terminal, displaying the shell's output. It also sends a copy of every chunk through an mpsc channel.
Thread 2 (storage) receives data from the channel, parses OSC markers, renders output through a VT100 virtual terminal, and stores the result in SQLite.
Spawning Threads with move Closures
Rust's std::thread::spawn takes a closure and runs it on a new OS thread. The catch: the closure must own everything it uses. Rust enforces this because threads can outlive the scope that created them, and borrowing data across thread boundaries would create data races.
The move keyword transfers ownership of captured variables into the closure. Here is the stdin thread:
let mut pty_writer = writer;
let _stdin_handle = std::thread::spawn(move || {
let mut stdin = std::io::stdin();
let mut buf = [0u8; 1024];
loop {
match stdin.read(&mut buf) {
Ok(0) | Err(_) => break,
Ok(n) => {
if pty_writer.write_all(&buf[..n]).is_err() {
break;
}
}
}
}
});
Without move, the compiler would reject this code. The closure captures pty_writer, and Rust cannot guarantee the closure will not outlive the variable. With move, ownership of pty_writer transfers into the closure. The original scope can no longer access it, which eliminates any possibility of concurrent access.
The pattern Ok(0) | Err(_) => break handles both clean EOF and I/O errors in one arm. When stdin closes or the PTY writer breaks, the thread exits cleanly.
mpsc Channels: One Producer, One Consumer
The main thread and the storage thread communicate through an mpsc (multiple-producer, single-consumer) channel:
let (tx, rx) = mpsc::channel::<Vec<u8>>();
The main thread holds tx (the sender). The storage thread holds rx (the receiver). Each time the main thread reads a chunk from the PTY, it sends a copy through the channel:
// Main thread: PTY -> stdout + channel
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
let raw = &buf[..n];
stdout.write_all(raw)?;
stdout.flush()?;
let _ = tx.send(raw.to_vec());
}
Err(_) => break,
}
The raw.to_vec() call creates an owned copy of the bytes. The main thread cannot send a reference because the buffer will be reused on the next read. The let _ = discards any send error, which only occurs if the receiver has been dropped (meaning the storage thread has exited).
On the receiving side, the storage thread uses recv_timeout instead of a blocking recv:
match rx.recv_timeout(INCOMPLETE_LINE_TIMEOUT) {
Ok(data) => {
// Process the data, parse markers, store chunks
}
Err(mpsc::RecvTimeoutError::Timeout) => {
// Flush accumulated output that has not been terminated by a marker
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
// Channel closed, flush remaining data and exit
break;
}
}
The timeout (2 seconds) serves a practical purpose. Some commands produce output without a trailing newline or marker. Without a timeout, that partial output would sit in the buffer indefinitely. The timeout flushes it to the database so it does not get lost.
When the main thread finishes its loop, it drops tx:
drop(tx);
This causes the storage thread's next recv_timeout to return Disconnected, triggering a clean shutdown. This is the graceful teardown pattern for channel-based architectures: dropping the sender signals the receiver.
Arc: Shared Ownership Across Threads
Some state needs to be read by multiple threads. Rust's ownership model does not allow two threads to own the same value. Arc (Atomic Reference Counting) provides a solution: it wraps a value in a heap-allocated container with a thread-safe reference count. Cloning an Arc increments the count; dropping one decrements it. The inner value is freed when the count hits zero.
broll uses Arc for two things: the resize flag and the terminal column width.
let resize_flag = Arc::new(AtomicBool::new(false));
signal_hook::flag::register(signal_hook::consts::SIGWINCH, Arc::clone(&resize_flag))?;
let term_cols = Arc::new(AtomicU16::new(cols));
let term_cols_storage = Arc::clone(&term_cols);
Arc::clone does not clone the inner data. It creates a new pointer to the same allocation and increments the reference count. This is cheap: just an atomic increment.
Why Arc and Not Rc?
Rust has Rc (Reference Counting) for single-threaded shared ownership. It is slightly faster because it uses non-atomic operations. But Rc is not Send, meaning Rust will refuse to compile any code that tries to move an Rc to another thread. Arc uses atomic operations for its reference count, making it safe to share across threads. The compiler enforces this distinction at the type level.
Atomics: Lock-Free Shared State
Inside each Arc, broll uses atomic types rather than a Mutex. Atomics provide lock-free reads and writes for primitive values. No thread ever blocks waiting for another thread to release a lock.
AtomicBool for the Resize Flag
Terminal resize events arrive as SIGWINCH signals. The signal_hook crate registers a handler that sets an AtomicBool to true when the signal fires:
let resize_flag = Arc::new(AtomicBool::new(false));
signal_hook::flag::register(signal_hook::consts::SIGWINCH, Arc::clone(&resize_flag))?;
The main thread checks the flag on every loop iteration using swap:
if resize_flag.swap(false, Ordering::Relaxed)
&& let Ok((cols, rows)) = crossterm::terminal::size()
{
term_cols.store(cols, Ordering::Relaxed);
let _ = pair.master.resize(PtySize {
rows,
cols,
pixel_width: 0,
pixel_height: 0,
});
}
swap(false, Ordering::Relaxed) atomically reads the current value and replaces it with false, returning the old value. If it was true, a resize happened. This pattern ensures that even if multiple SIGWINCH signals arrive between checks, the resize logic runs exactly once per check.
AtomicU16 for Terminal Width
The storage thread needs the current terminal width to render output correctly through its VT100 parser. The main thread updates this value on resize:
term_cols.store(cols, Ordering::Relaxed);
The storage thread reads it when rendering:
let rendered = render_vt(&cmd_output_bytes, term_cols_storage.load(Ordering::Relaxed));
What Does Ordering::Relaxed Mean?
Atomic operations take a memory ordering parameter that controls how they interact with surrounding memory operations. Ordering::Relaxed provides the weakest guarantee: the atomic operation itself is atomic, but it places no constraints on the ordering of other reads and writes.
For broll's use case, this is sufficient. The resize flag is a simple boolean signal where a missed update just means the resize happens on the next loop iteration. The column width is a hint for rendering, where using a slightly stale value produces output that is one render cycle behind. Neither case requires the stronger guarantees of Ordering::SeqCst or Ordering::Acquire/Release.
Signal Handling with signal_hook
Unix signals are tricky in any language. Signal handlers run in an interrupt context with severe restrictions on what they can do. The signal_hook crate provides a safe interface: instead of running arbitrary code in the handler, it simply sets an atomic flag.
signal_hook::flag::register(signal_hook::consts::SIGWINCH, Arc::clone(&resize_flag))?;
This registers a handler for SIGWINCH (window size change) that atomically sets resize_flag to true. The main thread polls this flag in its event loop. This is safe because the signal handler only performs a single atomic store, which is async-signal-safe.
Shutdown Sequence
The cleanup order matters:
drop(tx); // 1. Close the channel
let _ = child.wait(); // 2. Wait for the shell to exit
drop(_raw_guard); // 3. Restore terminal mode
let _ = storage_handle.join(); // 4. Wait for storage to finish
db.end_session(&session_id)?; // 5. Mark session as ended
Dropping tx signals the storage thread to flush and exit. The RawModeGuard is dropped before the final message so that eprintln! renders correctly. The storage thread is joined to ensure all data reaches SQLite before the session is marked as ended.
Note that the stdin thread is deliberately not joined. It blocks on stdin.read(), which cannot be interrupted on all platforms. Since the process is about to exit anyway, the OS will clean it up.
A Standalone Example
Here is a minimal program that demonstrates two threads sharing an Arc<AtomicBool>:
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
fn main() {
let flag = Arc::new(AtomicBool::new(false));
// Spawn a worker thread that checks the flag
let worker_flag = Arc::clone(&flag);
let handle = thread::spawn(move || {
println!("Worker: waiting for signal...");
loop {
if worker_flag.load(Ordering::Relaxed) {
println!("Worker: signal received, shutting down.");
break;
}
thread::sleep(Duration::from_millis(100));
}
});
// Main thread does some work, then signals the worker
thread::sleep(Duration::from_secs(1));
println!("Main: sending signal.");
flag.store(true, Ordering::Relaxed);
handle.join().unwrap();
println!("Main: worker exited cleanly.");
}
This is the same pattern broll uses for SIGWINCH. Replace thread::sleep with a signal handler, and you have the real architecture.
Why Not async/await?
broll uses OS threads rather than async runtimes like Tokio. The reasons are practical:
-
Blocking I/O is unavoidable.
stdin.read()and the PTY reader are blocking calls. An async runtime would needspawn_blockingfor these, adding complexity without benefit. - Three threads is a fixed, small number. There is no need for the thousands-of-tasks scalability that async provides.
-
No additional dependencies.
std::threadandstd::syncare in the standard library. No runtime, no executor, no pinning.
When your concurrency model is "a few long-lived threads passing messages," the standard library gives you everything you need.
Key Takeaways
-
moveclosures transfer ownership into threads. This is not optional: the compiler requires it to prevent data races. - mpsc channels provide typed, safe communication. Dropping the sender signals the receiver, enabling clean shutdown.
-
Arcenables shared ownership across threads. It isRcwith atomic reference counting, and the compiler enforces that you use the right one. -
Atomics provide lock-free access to simple values.
Ordering::Relaxedis often sufficient for flags and counters where eventual consistency is acceptable. -
signal_hookmakes Unix signals safe. Instead of running complex code in a signal handler, set a flag and check it in your event loop. - Explicit drop ordering controls shutdown. Channel close, child wait, guard drop, thread join: the sequence determines correctness.
broll's entire concurrency model fits in one file, uses no external runtime, and relies on the compiler to verify thread safety at compile time. That is the Rust value proposition for systems programming: fearless concurrency is not a slogan, it is enforced by the type system.
Try it out:
Top comments (0)