DEV Community

Cover image for Rust Async Programming: Stream Trait
Leapcell
Leapcell

Posted on

2 1 1 1 1

Rust Async Programming: Stream Trait

Cover

The Stream trait is similar to the Future trait. While Future represents the state change of a single item, Stream, akin to the Iterator trait in the standard library, can yield multiple values before it finishes. Or simply put, a Stream is made up of a series of Futures, from which we can read each Future’s result until the Stream completes.

Definition of Stream

The Future is the most fundamental concept in asynchronous programming. If a Future represents a one-time asynchronous value, then a Stream represents a series of asynchronous values. Future is 1, while Stream is 0, 1, or N. The signature of Stream is as follows:

pub trait Stream {
    type Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}
Enter fullscreen mode Exit fullscreen mode

The concept of Stream corresponds to the Iterator in synchronous primitives. Recall how similar even their signatures are!

pub trait Iterator {
    type Item;

    fn next(&mut self) -> Option<Self::Item>;
}
Enter fullscreen mode Exit fullscreen mode

Stream is used to abstract continuous data sources, although it can also end (when poll returns None)

A common example of a Stream is the consumer Receiver in the futures crate's message channel. Every time a message is sent from the Send side, the receiver gets a Some(val) value. Once the Send side is closed (dropped) and there are no more messages in the channel, it receives a None.

use futures::channel::mpsc;
use futures::{executor::block_on, SinkExt, StreamExt};

async fn send_recv() {
    const BUFFER_SIZE: usize = 10;
    let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);

    println!("tx: Send 1, 2");
    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
    drop(tx);

    // `StreamExt::next` is similar to `Iterator::next`, but instead of returning a value,
    // it returns a `Future<Output = Option<T>>`, so you need `.await` to get the actual value
    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}

fn main() {
    block_on(send_recv());
}
Enter fullscreen mode Exit fullscreen mode

Differences Between Iterator and Stream

  • Iterator allows repeatedly calling the next() method to get new values until it returns None. Iterator is blocking: each call to next() occupies the CPU until a result is obtained. In contrast, the asynchronous Stream is non-blocking and yields the CPU while waiting.
  • Stream's poll_next() method is quite similar to Future's poll() method, and its function is akin to the next() method of Iterator. However, calling poll_next() directly is inconvenient because you need to manually handle the Poll state, which isn’t very ergonomic. That’s why Rust provides StreamExt, an extension trait for Stream, which offers a next() method that returns a Future implemented by the Next struct. This way, you can directly iterate over a value with stream.next().await.

Note: StreamExt stands for Stream Extension. In Rust, it's a common practice to keep the minimal trait definition (like Stream) in one file, and put additional APIs (like StreamExt) in a separate, related file.

Note: Unlike Future, the Stream trait is not yet in Rust’s core library (std::core). It resides in the futures-util crate, and StreamExtensions is also not part of the standard library. This means different libraries might provide conflicting imports. For example, Tokio provides its own StreamExt, separate from futures-util. If possible, prefer using futures-util, as it's the most commonly used crate for async/await.

Implementation of StreamExt's next() method and the Next struct:

pub trait StreamExt: Stream {
    fn next(&mut self) -> Next<'_, Self> where Self: Unpin {
        assert_future::<Option<Self::Item>, _>(Next::new(self))
    }
}

// `next` returns the `Next` struct
pub struct Next<'a, St: ?Sized> {
    stream: &'a mut St,
}

// If Stream is Unpin, then Next is also Unpin
impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {}

impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> {
    pub(super) fn new(stream: &'a mut St) -> Self {
        Self { stream }
    }
}

// Next implements Future, each poll() is essentially polling from the stream via poll_next()
impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> {
    type Output = Option<St::Item>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.stream.poll_next_unpin(cx)
    }
}
Enter fullscreen mode Exit fullscreen mode

Creating Streams

The futures library provides several convenient methods to create basic Streams, such as:

  • empty(): creates an empty Stream
  • once(): creates a Stream containing a single value
  • pending(): creates a Stream that never yields a value and always returns Poll::Pending
  • repeat(): creates a Stream that repeatedly yields the same value
  • repeat_with(): creates a Stream that lazily yields values via a closure
  • poll_fn(): creates a Stream from a closure that returns Poll
  • unfold(): creates a Stream from an initial state and a closure that returns a Future
use futures::prelude::*;

#[tokio::main]
async fn main() {
    let mut st = stream::iter(1..10)
        .filter(|x| future::ready(x % 2 == 0))
        .map(|x| x * x);

    // Iterate over the stream
    while let Some(x) = st.next().await {
        println!("Got item: {}", x);
    }
}
Enter fullscreen mode Exit fullscreen mode

In the code above, stream::iter generates a Stream, which is then passed through filter and map operations. Finally, the stream is iterated, and the resulting data is printed.

When you’re not concerned with async/await and only care about the stream behavior, Stream::iter is quite handy for testing. Another interesting method is repeat_with, which lets you pass a closure to lazily generate values on demand, for example:

use futures::stream::{self, StreamExt};

// From the zeroth to the third power of two:
async fn stream_repeat_with() {
    let mut curr = 1;
    let mut pow2 = futures::stream::repeat_with(|| { let tmp = curr; curr *= 2; tmp });

    assert_eq!(Some(1), pow2.next().await);
    assert_eq!(Some(2), pow2.next().await);
    assert_eq!(Some(4), pow2.next().await);
    assert_eq!(Some(8), pow2.next().await);
}
Enter fullscreen mode Exit fullscreen mode

Implementing a Stream

Creating your own Stream involves two steps:

  1. First, define a struct to hold the stream’s state
  2. Then, implement the Stream trait for that struct

Let’s create a stream called Counter that counts from 1 to 5:

#![feature(async_stream)]

// First, the struct:
/// A stream that counts from one to five
struct Counter {
    count: usize,
}

// We want the counter to start from one, so let’s add a `new()` method as a helper.
// This isn’t strictly necessary, but it’s convenient.
// Note that we start `count` from zero — the reason will be clear in the implementation of `poll_next()`.
impl Counter {
    fn new() -> Counter {
        Counter { count: 0 }
    }
}

// Then, we implement `Stream` for `Counter`:
impl Stream for Counter {
    // We’ll use `usize` for counting
    type Item = usize;

    // `poll_next()` is the only required method
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // Increment the counter. That’s why we started from zero.
        self.count += 1;

        // Check if we've finished counting.
        if self.count < 6 {
            Poll::Ready(Some(self.count))
        } else {
            Poll::Ready(None)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Stream Traits

There are several traits related to streams in Rust, such as Stream, TryStream, and FusedStream.

  • Stream is very similar to Iterator. However, when it returns None, it signifies that the stream is exhausted and should no longer be polled. Continuing to poll a stream after it returns None leads to undefined behavior and may cause unpredictable results.

  • TryStream is a specialized trait for streams that yield Result<value, error> items. TryStream provides functions that make it easy to match and transform the inner Results. You can think of it as an API designed for streams that produce Result items, making it more convenient to work with error-handling cases.

  • FusedStream is similar to a regular stream but adds the ability for users to know whether the stream is truly exhausted after returning None, or if it can be safely polled again. For example, if you’re creating a stream backed by a circular buffer, the stream might return None on the first iteration, but with FusedStream, it would be safe to poll again later to resume a new round of iteration over the buffer.

Iteration and Concurrency

Just like the Iterator trait, Stream also supports iteration. For example, you can use methods like map, filter, fold, for_each, skip, as well as their error-aware counterparts: try_map, try_filter, try_fold, try_for_each, and so on.

Unlike Iterator, however, for loops can’t be used directly to iterate over a Stream. Instead, imperative-style loops like while let or loop can be used, repeatedly calling next or try_next explicitly. For example, you can read from a stream in either of the following ways:

// Iteration pattern 1
while let Some(value) = s.next().await {}

// Iteration pattern 2
loop {
    match s.next().await {
        Some(value) => {}
        None => break;
    }
}
Enter fullscreen mode Exit fullscreen mode

An example of computing the sum of values in a stream:

use futures_util::{pin_mut, Stream, stream, StreamExt};

async fn sum(stream: impl Stream<Item=usize>) -> usize {
    // Don’t forget to pin the stream before iteration
    pin_mut!(stream);
    let mut sum: usize = 0;
    // Iterate over the stream
    while let Some(item) = stream.next().await {
        sum = sum + item;
    }
    sum
}
Enter fullscreen mode Exit fullscreen mode

If you process one value at a time, you might miss out on the benefits of concurrency, which defeats the purpose of asynchronous programming. To process multiple values concurrently from a Stream, you can use for_each_concurrent and try_for_each_concurrent:

use std::{pin::Pin, io};
use futures_util::{Stream, TryStreamExt};

async fn jump_around(stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>) -> Result<(), io::Error> {
    // Use `try_for_each_concurrent`
    stream.try_for_each_concurrent(100, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}

async fn jump_n_times(num: i32) -> Result<(), io::Error> {
    println!("jump_n_times :{}", num + 1);
    Ok(())
}
async fn report_n_jumps(num: i32) -> Result<(), io::Error> {
    println!("report_n_jumps : {}", num);
    Ok(())
}
Enter fullscreen mode Exit fullscreen mode

Summary

Stream is similar to Future, but while Future represents the state change of a single item, Stream behaves more like an Iterator that can yield multiple values before completion. Or put simply: a Stream consists of a series of Futures, and we can retrieve the result of each Future from the Stream until it finishes—making it an asynchronous iterator.

The poll_next function of a Stream can return one of three possible values:

  • Poll::Pending: indicates that the next value is not ready yet and we still need to wait.
  • Poll::Ready(Some(val)): indicates that a value is ready and has been successfully returned; you can call poll_next again to retrieve the next one.
  • Poll::Ready(None): indicates that the stream has ended and poll_next should no longer be called.

We are Leapcell, your top choice for hosting Rust projects.

Leapcell

Leapcell is the Next-Gen Serverless Platform for Web Hosting, Async Tasks, and Redis:

Multi-Language Support

  • Develop with Node.js, Python, Go, or Rust.

Deploy unlimited projects for free

  • pay only for usage — no requests, no charges.

Unbeatable Cost Efficiency

  • Pay-as-you-go with no idle charges.
  • Example: $25 supports 6.94M requests at a 60ms average response time.

Streamlined Developer Experience

  • Intuitive UI for effortless setup.
  • Fully automated CI/CD pipelines and GitOps integration.
  • Real-time metrics and logging for actionable insights.

Effortless Scalability and High Performance

  • Auto-scaling to handle high concurrency with ease.
  • Zero operational overhead — just focus on building.

Explore more in the Documentation!

Try Leapcell

Follow us on X: @LeapcellHQ


Read on our blog

Hostinger image

Get n8n VPS hosting 3x cheaper than a cloud solution

Get fast, easy, secure n8n VPS hosting from $4.99/mo at Hostinger. Automate any workflow using a pre-installed n8n application and no-code customization.

Start now

Top comments (0)

The best way to debug slow web pages cover image

The best way to debug slow web pages

Tools like Page Speed Insights and Google Lighthouse are great for providing advice for front end performance issues. But what these tools can’t do, is evaluate performance across your entire stack of distributed services and applications.

Watch video

👋 Kindness is contagious

Explore a trove of insights in this engaging article, celebrated within our welcoming DEV Community. Developers from every background are invited to join and enhance our shared wisdom.

A genuine "thank you" can truly uplift someone’s day. Feel free to express your gratitude in the comments below!

On DEV, our collective exchange of knowledge lightens the road ahead and strengthens our community bonds. Found something valuable here? A small thank you to the author can make a big difference.

Okay