loading...

Rust futures: an uneducated, short and hopefully not boring tutorial - Part 5 - Streams

mindflavor profile image Francesco Cogno ・6 min read

Intro

In our last post we saw how to build a complete Future. While being a contrived example we saw what is expected from a future: do not block as much as possible and unpark the task only when necessary. In this post we will expand the future in order to return a stream of values.

Streams are akin to iterators: they produce multiple outputs of the same type over time. The only difference here is how we consume them. Instead of using Rust's for statement we will ask our reactor to do it for us. How?

ForEach combinator

Instead of iterating manually between a streams items we use a specific combinator called for_each. But... ForEach implements Future so we can pass it to a reactor or even chain, join etc... with other futures! It's really cool.

Let's build one, shall we?

impl Stream

The Stream trait is very similar to the Future trait:

pub trait Future {
    type Item;
    type Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error>;

    // <-- CUT -->
}

pub trait Stream {
    type Item;
    type Error;
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;

    // <-- CUT -->
}

Both traits have many more functions; since those functions have defaults you do not need to implement them if you don't need to. Let's focus on the poll function:

    // Future
    fn poll(&mut self) -> Poll<Self::Item, Self::Error>;

    // Stream
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;

The Stream trait can optionally return an Item. The Future must do so. The convention here is: if your stream has more data to produce and is readily available return a Ok(Async::Ready(Some(t))). If the data to produce is not yet ready return a Ok(Async::NotReady). If you are done return Ok(Async::Ready(None)). In case of errors you can return Err(e) as usual. So, to summarize:

Situation Future Stream
Item to return ready Ok(Async::Ready(t)) Ok(Async::Ready(Some(t)))
Item to return not ready Ok(Async::NotReady) Ok(Async::NotReady)
No more items to return N.A. Ok(Async::Ready(None))
Error Err(e) Err(e)

Simple stream

Let's build a very simple stream: one that returns the integers from 0 up to X. Let's see the code first:

struct MyStream {
    current: u32,
    max: u32,
}

impl MyStream {
    pub fn new(max: u32) -> MyStream {
        MyStream {
            current: 0,
            max: max,
        }
    }
}

impl Stream for MyStream {
    type Item = u32;
    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        match self.current {
            ref mut x if *x < self.max => {
                *x = *x + 1;
                Ok(Async::Ready(Some(*x)))
            }
            _ => Ok(Async::Ready(None)),
        }
    }
}

The important part here is the poll function. poll takes a mutable reference of self so we can mutate the inner fields. What we do it to check if we have returned the last number (max) and then either terminate the stream or return the actual number and increment it.

Note that while here we have an upper bound you are not forced to specify it. So we could return all the numbers from here to infinity (or overflow, whichever comes first :)).

Consume a stream

To consume a stream we can use the for_each combinator we saw earlier. Let's print those numbers:

let mut reactor = Core::new().unwrap();
let my_stream = MyStream::new(5);

let fut = my_stream.for_each(|num| {
    println!("num === {}", num);
    ok(())
});

I will not spoil the surprise by posting the results :). Anyway note that each closure is a future in itself. You can tell because we are calling the ok(()) lowercase function.
We can call other futures, chain them and so on as usual. In our example we are simply returning ok.

Spawn futures during the event loop

Sometimes when producing items in streams we might want to spawn other futures. There are various reasons to do so (mainly: do not block if you can help it!). Rust's futures allow you to add futures to an existing event loop using the execute function of a reactor. There is a gotcha, however: the spawned future must not return anything. The function signature is this one:

For example we use a slightly modified wait-in-another-thread future written in the previous posts. We revise our stream like this:

impl Stream for MyStream {
    type Item = u32;
    type Error = Box<Error>;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        use futures::future::Executor;

        match self.current {
            ref mut x if *x < self.max => {
                *x = *x + 1;

                self.core_handle.execute(WaitInAnotherThread::new(
                    Duration::seconds(2),
                    format!("WAIT {:?}", x),
                ));
                Ok(Async::Ready(Some(*x)))
            }
            _ => Ok(Async::Ready(None)),
        }
    }
}

The important part here is the execute code. Here we are spawning another future in the event loop (a future that will just wait for two seconds and then print WAIT <number>). Remember, this is supposed to be a daemon-like future so it must not return anything (except maybe an error).

To test this we'll use this code:

fn main() {
    let mut reactor = Core::new().unwrap();

    // create a Stream returning 5 items
    // Each item will spawn an "inner" future
    // into the same reactor loop
    let my_stream = MyStream::new(5, reactor.handle());

    // we use for_each to consume
    // the stream
    let fut = my_stream.for_each(|num| {
        println!("num === {:?}", num);
        ok(())
    });

    // this is a manual future. it's the same as the
    // future spawned into our stream
    let wait = WaitInAnotherThread::new(Duration::seconds(3), "Manual3".to_owned());

    // we join the futures to let them run concurrently
    let future_joined = fut.map_err(|err| {}).join(wait);

    // let's run the future
    let ret = reactor.run(future_joined).unwrap();
    println!("ret == {:?}", ret);
}

Here we also show how to join a stream with a future. The map_err dance is needed, as before, to make errors compatible (see Rust futures: an uneducated, short and hopefully not boring tutorial - Part 2).

If we run this code you can expect an output like this:

num === 1
num === 2
num === 3
num === 4
num === 5
"Manual3" starting the secondary thread!
"Manual3" not ready yet! parking the task.
"WAIT 1" starting the secondary thread!
"WAIT 1" not ready yet! parking the task.
"WAIT 2" starting the secondary thread!
"WAIT 2" not ready yet! parking the task.
"WAIT 3" starting the secondary thread!
"WAIT 3" not ready yet! parking the task.
"WAIT 4" starting the secondary thread!
"WAIT 4" not ready yet! parking the task.
"WAIT 5" starting the secondary thread!
"WAIT 5" not ready yet! parking the task.
"WAIT 1" the time has come == 2017-12-06T10:23:30.853796527Z!
"WAIT 1" ready! the task will complete.
"WAIT 2" the time has come == 2017-12-06T10:23:30.853831227Z!
"WAIT 2" ready! the task will complete.
"WAIT 3" the time has come == 2017-12-06T10:23:30.853842927Z!
"WAIT 3" ready! the task will complete.
"WAIT 5" the time has come == 2017-12-06T10:23:30.853856927Z!
"WAIT 5" ready! the task will complete.
"WAIT 4" the time has come == 2017-12-06T10:23:30.853850427Z!
"WAIT 4" ready! the task will complete.
"Manual3" the time has come == 2017-12-06T10:23:31.853775627Z!
"Manual3" ready! the task will complete.
ret == ((), ())

Notice even if we spawned the task 5 after the task 4 the completion order is scrambled. Also, your output would likely be different.

But what if we did not join the "Wait for 3 seconds" future?

The revised code is like this:

fn main() {
    let mut reactor = Core::new().unwrap();

    // create a Stream returning 5 items
    // Each item will spawn an "inner" future
    // into the same reactor loop
    let my_stream = MyStream::new(5, reactor.handle());

    // we use for_each to consume
    // the stream
    let fut = my_stream.for_each(|num| {
        println!("num === {:?}", num);
        ok(())
    });

    // let's run the future
    let ret = reactor.run(fut).unwrap();
    println!("ret == {:?}", ret);
}

We will notice that the code will return almost immediately with this output:

num === 1
num === 2
num === 3
num === 4
num === 5
ret == ()

The background futures did not get a chance to run.

Next steps

In the following posts we will, at least, cover the await! macro to streamline our futures.


Happy Coding

Francesco Cogno

Posted on Dec 6 '17 by:

mindflavor profile

Francesco Cogno

@mindflavor

Started coding when numbered BASIC was cool. Now I work mostly with databases but I still count in increments of ten, just to be sure :).

Discussion

markdown guide
 

Hi Francesco, thanks for this series. However in this part5, I found that you didn't describe changes required last to run() examples to work (maybe intentionally as exercise?)

Anyway, if anyone is struggling, here's a cheat sheet _.

Add core_handle to MyStream

impl MyStream {
    pub fn new(max: u32, core_handle: tokio_core::reactor::Handle) -> MyStream {
        MyStream {
            current: 0,
            max,
            core_handle
        }
    }
}

Add thread_name to WaitInAnotherThread Future

pub struct WaitInAnotherThread {
    end_time: DateTime<Utc>,
    running: bool,
    thread_name: String
}

and extend its constructor

impl WaitInAnotherThread {
    pub fn new(delay_seconds: i64, thread_name: String) -> WaitInAnotherThread {
        WaitInAnotherThread {
            end_time:  Utc::now() +  chrono::Duration::seconds(delay_seconds),
            running: false,
            thread_name: thread_name
        }
    }

WaitForAnotherThread Future must have Error type (). Also add thread_name to print outs.

impl Future for WaitInAnotherThread {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        if Utc::now() < self.end_time {
            println!("{} :: not ready yet! parking the task.", self.thread_name);

            if !self.running {
                println!("{} :: side thread not running! starting now!", self.thread_name);
                self.run(task::current());
                self.running = true;
            }

            Ok(Async::NotReady)
        } else {
            println!("{} :: ready! the task will complete.", self.thread_name);
            Ok(Async::Ready(()))
        }
    }
}
 
 

Hi Francesco,

Let's build a very simple stream: one that returns the integers from 0 up to X

It seems that the poll implementation returns 1..max (both ends inclusively).