Peeking Inside Rust Futures: A Stream Implementation with Manual Poll Tracing via tokio-console
Async Rust’s power comes from its zero-cost futures and lightweight task system, but debugging async code can feel like peeking into a black box. Futures and Streams rely on manual polling via the Future::poll and Stream::poll_next methods, and understanding their lifecycle is key to writing correct async code. In this guide, we’ll build a custom Stream implementation, add manual poll tracing to observe its behavior, and use tokio-console to inspect its internals in real time.
Background: Rust Futures and Streams
The Future trait is the core of async Rust. It defines a single method: fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll. The Context provides a waker to notify the executor when the future is ready to make progress, and Poll is an enum with two variants: Ready(T) (the future has completed) and Pending (the future is waiting for work).
Streams are async iterators, defined by the Stream trait (from futures crate or tokio-stream). Its core method is fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>, where Poll> means: Ready(Some(item)) (next item available), Ready(None) (stream is exhausted), or Pending (waiting for the next item).
Building a Custom Counter Stream
We’ll start by implementing a simple CounterStream that yields integers from 0 to 4, with a 1-second delay between each item. First, add dependencies to Cargo.toml:
[dependencies]
tokio = { version = "1.38", features = ["full", "macros", "time"] }
tokio-stream = "0.1"
futures = "0.3"
Next, the Stream implementation. We’ll store a current count and a delay future to simulate async work between items:
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::time::{sleep, Sleep};
use tokio_stream::Stream;
struct CounterStream {
count: u32,
delay: Pin>,
}
impl CounterStream {
fn new() -> Self {
Self {
count: 0,
delay: Box::pin(sleep(std::time::Duration::from_secs(1))),
}
}
}
impl Stream for CounterStream {
type Item = u32;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
// Manual poll tracing: log every poll call
eprintln!("[POLL] CounterStream::poll_next called, count: {}", self.count);
if self.count >= 5 {
eprintln!("[POLL] Stream exhausted, returning Ready(None)");
return Poll::Ready(None);
}
// Poll the delay future to check if 1s has elapsed
match self.delay.as_mut().poll(cx) {
Poll::Ready(_) => {
let current = self.count;
self.count += 1;
// Reset the delay for the next item
self.delay = Box::pin(sleep(std::time::Duration::from_secs(1)));
eprintln!("[POLL] Delay elapsed, returning Ready(Some({}))", current);
Poll::Ready(Some(current))
}
Poll::Pending => {
eprintln!("[POLL] Delay pending, returning Pending");
Poll::Pending
}
}
}
}
We’ve added manual poll tracing via eprintln! statements that log every poll call, the current count, and whether the stream is pending or ready. This is the "manual poll tracing" mentioned in the topic.
Adding tokio-console Support
tokio-console is a diagnostics tool for async Rust applications using tokio. It lets you inspect task lifecycles, poll counts, waker notifications, and more. To use it, we need to add the tokio-console feature to tokio and include the console-subscriber crate to export telemetry.
Update Cargo.toml:
[dependencies]
tokio = { version = "1.38", features = ["full", "macros", "time", "tracing", "tokio-console"] }
tokio-stream = "0.1"
futures = "0.3"
console-subscriber = "0.3"
Next, initialize the console subscriber in our main function, then run the stream:
use tokio_stream::StreamExt;
#[tokio::main]
async fn main() {
// Initialize tokio-console telemetry
console_subscriber::init();
let mut stream = CounterStream::new();
// Consume the stream
while let Some(item) = stream.next().await {
println!("Got item: {}", item);
}
println!("Stream done!");
}
Running and Observing with tokio-console
First, run the application:
RUST_LOG=debug cargo run
In a separate terminal, launch tokio-console:
tokio-console
You’ll see a terminal UI showing all active tokio tasks. Our main task will be listed, and if you drill into its details, you can see poll counts for the CounterStream future. The manual eprintln! logs will also appear in the console output, but tokio-console adds structured telemetry: you can see how many times poll_next was called, how long each poll took, and when waker notifications were triggered.
For example, the first time poll_next is called, the delay is pending, so it returns Pending and the waker is registered. When the 1-second delay elapses, the waker is notified, and the executor polls the stream again, this time returning Ready(Some(0)). This cycle repeats until the count reaches 5, where the stream returns Ready(None) and is exhausted.
Why This Matters
Manual poll tracing and tools like tokio-console demystify the async polling model. You can use this approach to debug stuck futures, unexpected pending states, or performance issues in production async code. Understanding how poll works under the hood makes you a better async Rust developer, and tokio-console turns that internal knowledge into actionable diagnostics.
Conclusion
In this guide, we built a custom Stream implementation, added manual poll tracing to observe its polling behavior, and used tokio-console to inspect the stream’s lifecycle in real time. The combination of manual tracing (for quick local debugging) and tokio-console (for structured, real-time diagnostics) gives you full visibility into your async Rust code. Try extending this example with more complex Stream logic, or apply the same techniques to debug your own async applications!
Top comments (0)