DEV Community

ANKUSH CHOUDHARY JOHAL
ANKUSH CHOUDHARY JOHAL

Posted on • Originally published at johal.in

Peeking inside Rust futures: a Stream implementation with manual poll tracing via tokio‑console

Peeking Inside Rust Futures: A Stream Implementation with Manual Poll Tracing via tokio-console

Async Rust’s power comes from its zero-cost futures and lightweight task system, but debugging async code can feel like peeking into a black box. Futures and Streams rely on manual polling via the Future::poll and Stream::poll_next methods, and understanding their lifecycle is key to writing correct async code. In this guide, we’ll build a custom Stream implementation, add manual poll tracing to observe its behavior, and use tokio-console to inspect its internals in real time.

Background: Rust Futures and Streams

The Future trait is the core of async Rust. It defines a single method: fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll. The Context provides a waker to notify the executor when the future is ready to make progress, and Poll is an enum with two variants: Ready(T) (the future has completed) and Pending (the future is waiting for work).

Streams are async iterators, defined by the Stream trait (from futures crate or tokio-stream). Its core method is fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>, where Poll> means: Ready(Some(item)) (next item available), Ready(None) (stream is exhausted), or Pending (waiting for the next item).

Building a Custom Counter Stream

We’ll start by implementing a simple CounterStream that yields integers from 0 to 4, with a 1-second delay between each item. First, add dependencies to Cargo.toml:

[dependencies]
tokio = { version = "1.38", features = ["full", "macros", "time"] }
tokio-stream = "0.1"
futures = "0.3"
Enter fullscreen mode Exit fullscreen mode

Next, the Stream implementation. We’ll store a current count and a delay future to simulate async work between items:

use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::time::{sleep, Sleep};
use tokio_stream::Stream;

struct CounterStream {
    count: u32,
    delay: Pin>,
}

impl CounterStream {
    fn new() -> Self {
        Self {
            count: 0,
            delay: Box::pin(sleep(std::time::Duration::from_secs(1))),
        }
    }
}

impl Stream for CounterStream {
    type Item = u32;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {
        // Manual poll tracing: log every poll call
        eprintln!("[POLL] CounterStream::poll_next called, count: {}", self.count);

        if self.count >= 5 {
            eprintln!("[POLL] Stream exhausted, returning Ready(None)");
            return Poll::Ready(None);
        }

        // Poll the delay future to check if 1s has elapsed
        match self.delay.as_mut().poll(cx) {
            Poll::Ready(_) => {
                let current = self.count;
                self.count += 1;
                // Reset the delay for the next item
                self.delay = Box::pin(sleep(std::time::Duration::from_secs(1)));
                eprintln!("[POLL] Delay elapsed, returning Ready(Some({}))", current);
                Poll::Ready(Some(current))
            }
            Poll::Pending => {
                eprintln!("[POLL] Delay pending, returning Pending");
                Poll::Pending
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

We’ve added manual poll tracing via eprintln! statements that log every poll call, the current count, and whether the stream is pending or ready. This is the "manual poll tracing" mentioned in the topic.

Adding tokio-console Support

tokio-console is a diagnostics tool for async Rust applications using tokio. It lets you inspect task lifecycles, poll counts, waker notifications, and more. To use it, we need to add the tokio-console feature to tokio and include the console-subscriber crate to export telemetry.

Update Cargo.toml:

[dependencies]
tokio = { version = "1.38", features = ["full", "macros", "time", "tracing", "tokio-console"] }
tokio-stream = "0.1"
futures = "0.3"
console-subscriber = "0.3"
Enter fullscreen mode Exit fullscreen mode

Next, initialize the console subscriber in our main function, then run the stream:

use tokio_stream::StreamExt;

#[tokio::main]
async fn main() {
    // Initialize tokio-console telemetry
    console_subscriber::init();

    let mut stream = CounterStream::new();

    // Consume the stream
    while let Some(item) = stream.next().await {
        println!("Got item: {}", item);
    }

    println!("Stream done!");
}
Enter fullscreen mode Exit fullscreen mode

Running and Observing with tokio-console

First, run the application:

RUST_LOG=debug cargo run
Enter fullscreen mode Exit fullscreen mode

In a separate terminal, launch tokio-console:

tokio-console
Enter fullscreen mode Exit fullscreen mode

You’ll see a terminal UI showing all active tokio tasks. Our main task will be listed, and if you drill into its details, you can see poll counts for the CounterStream future. The manual eprintln! logs will also appear in the console output, but tokio-console adds structured telemetry: you can see how many times poll_next was called, how long each poll took, and when waker notifications were triggered.

For example, the first time poll_next is called, the delay is pending, so it returns Pending and the waker is registered. When the 1-second delay elapses, the waker is notified, and the executor polls the stream again, this time returning Ready(Some(0)). This cycle repeats until the count reaches 5, where the stream returns Ready(None) and is exhausted.

Why This Matters

Manual poll tracing and tools like tokio-console demystify the async polling model. You can use this approach to debug stuck futures, unexpected pending states, or performance issues in production async code. Understanding how poll works under the hood makes you a better async Rust developer, and tokio-console turns that internal knowledge into actionable diagnostics.

Conclusion

In this guide, we built a custom Stream implementation, added manual poll tracing to observe its polling behavior, and used tokio-console to inspect the stream’s lifecycle in real time. The combination of manual tracing (for quick local debugging) and tokio-console (for structured, real-time diagnostics) gives you full visibility into your async Rust code. Try extending this example with more complex Stream logic, or apply the same techniques to debug your own async applications!

Top comments (0)