DEV Community

Christopher Durham
Christopher Durham

Posted on

On "Async Streams" in Rust

Context: For await loops discussion started by @withoutboats on Internals.

Step 1: What is a Stream?

I found the most clear consise explanation comes from @withoutboats again on a different Internals thread:

            | Evaluated immediately     | Evaluated asynchronously
------------------------------------------------------------------------
Return once | `fn() -> T`    (Function) | `async fn() -> T`    (Future)
Yield many  | `fn() yield T` (Iterator) | `async fn() yield T` (Stream)
  • When you use a Function, you evaluate some routine to retrieve a value.
  • When you use an Iterator, you evaluate some routine to retrieve the next value, multiple times.
  • When you use a Future, you await some routine to retrieve a value.
  • When you use a Stream, you await some routine to retrieve the next value, multiple times.

Step 2: How do we use a Stream?

To determine this, let's look at the other points in the matrix.

Function

Trivial: function().

Iterator

A for loop: for item in iterator().

This effectively desugars to code using the Iterator trait (simplified):

let __iter = iterator();
while let Some(item) = __iter.next() {
    // your block
}

Future

The exact syntax to be used to await a Future is still being decided by the lang team. The only fairly decided thing is that it will involve the reserved keyword await. I'll use await!(future) in this document as it's the current unstable syntax and fully unambiguous.

This is extremely simplified, but awaiting a future expands to roughly the following in concept:

let __future = __stack_pin!(future);
loop {
    match __future.poll(__async_context_waker!()) {
        Poll::Ready(value) => break value,
        Poll::Pending => __async_yield_execution!(),
    }
}

Stream

So, how are Streams used before syntax sugar? This is important to understand the semantics we're trying to represent.

We have Iterator, but instead of the function Iterator::next, we have the future Stream::poll_next.

let __stream = __stack_pin!(stream);
while let Some(item) = await!(__future_from_poll!(__stream::poll_next)) {
    // your block
}

This still hides some details in the await!(__future_from_poll!()), so lets inline that:

let __stream = __stack_pin!(stream);
while let Some(item) = loop {
    match __stream.poll_next(__async_context_waker!()) {
        Poll::Ready(value) => break value,
        Poll::Pending => __async_yield_execution!(),
    }
} {
    // your block
}

Wow, everything makes some amount of sense! But how do we actually use this, because this is a lot of boilerplate to use what seems like a useful data type.

Step 3: What Not

The logical operation happening here is awaiting the next value from the stream, for every element in the stream: a combination of iterators (for item in iterator) and futures (await!(future)).

The "obvious" solution would be async for item in stream. But this is the wrong direction: async is the enabler, but await is how you poll a future to completion.

Another enticing solution is to make for just work with Streams and do the equivalent of impl<I: Iterator> Stream for I. But we made (are making) the await operation explicit on futures for good reason, and shouldn't lose that for streams, if for no other reason than it would be inconsistent.

I've seen in multiple places the suggestion to use for item in await!(stream). Hopefully, the above expansions illustrate why that doesn't work semantically. This would instead be awaiting a Future<Item=Iterator>.

Step 4: Possibilities

I personally think that await for item in stream reads perfectly for what the semantics are. Those semantics are that we await "something" to get each item out of the stream, or in other words, we "(a)wait for each item in the stream". There's also no possibility of being confused with await!(for item in stream {}), as for loops don't (and can't) return a value.

@withoutboats also suggests that we could make await a pattern, giving us for await item in stream. I don't hate it, but there's a problem with this.

await as a pattern (here I'll still use await!(pattern) for clarity) I would expect to be the same as awaiting the rhs of the pattern. Or in code, the following two statements would be equivalent:

let value = await!(future);
let await!(value) = future;

This was actually proposed previously in the venerable async/await bikeshedding thread.

The problem is that this now suggests that Stream<Item=T> is Iterator<Item=Future<Item=T>>. Maybe some streams could meet this type. But the current definition of Stream is closer to a StreamingIterator<Item=Future<Item=T>>, as the poll_next requires a pin mut ref to the stream. This borrowing is required for many real streams. (I expect Stream to be abused for StreamingIterator if it works for such and we don't have StreamingIterator yet.)

This could be made to work fairly simply: make for work with StreamingIterator with a fallback for Iterator behavior, and let streams be used by the above composition of StreamingIterator and Future.

But this still lacks the "await?" behavior that many real futures will want (as most async is so because of IO). And even if we add ? to patterns as well, this still means we have two ways of doing the same await operation: expr position (for chaining, which was a focus of the bikeshedding thread) and pat position.

Step 5: Conclusions

I think that await for is the best of the currently proposed syntaxes, at least while awaiting on futures includes the keyword await.

In any case, this is a difficult choice with many tradeoffs, and one that's even harder to make without knowing what the final await syntax is going to be. But it's one that needs to be at least considered when stabilizing await.

Discussion (0)