The problem
public async Task<IEnumerable<User>> GetUsers()
{
var allResults = new List<User>();
var nextUrl = "https://account.zendesk.com/api/v2/users.json";
while (nextUrl != null)
{
var page = await _client.GetAsync(nextUrl)
.Content.ReadAsAsync<UsersListResponse>();
allResults.AddRange(page.Users);
nextUrl = page.NextPage;
// eg "https://account.zendesk.com/api/v2/users.json?page=2"
}
return allResults;
}
Take a look at the above code, you may have run into this familiar issue yourself. We'd like to represent the pageable results of this HTTP API in our types, while still be asynchronous.
Traditionally there would be 4 ways to approach this;
- Block on the async code with
.Result
/.GetAwaiter().GetResult()
or.Wait()
. This is not a good idea. - The above approach of awaiting each asynchronous task, but doing it all eagerly and returning the complete results in a materialised collection. This defeats the purpose of this paging API, and more generally we lose the laziness of the problem we are trying to model.
- We flip the return type to
IEnumerable<Task<User>>
. This would require that we trust any consumers of this code to await the result of each task after every enumeration. There are ways to enforce this at runtime, and throw an exception if it's not consumed correctly, however, this ultimately is a misleading type, and the shape of the type doesn't communicate its hidden constraints. - We don't try returning a single type such as
Task<IEnumerable<T>>
and we model it ourselves. This can be a good idea, but we lose the benefits of having a familiar type to work with.
Well, it's about time we adopted a new type and end this madness. That's what IAsyncEnumerable<T>
is for.
About Ix.Async
Currently IAsyncEnumerable<T>
is a concept which exists in a few places, with no singular definition. The version I will be using today lives in the Reactive Extensions repo, in a fork that is based on the latest C# 8.0 proposal.
Reactive Extensions (Rx) is the home of Observable implementation and extensions, it is also home to a sibling project named Interactive Extensions (Ix for short). Rx has lots of extensions and tools for composing pushed based sequences, and Ix is very similar but for pull-based sequences (IEnumerable<T>
). The part I am interested in for this post is the async part, which I'll be referring to as Ix.Async, this is shipped in its own nuget package, and I will generally be referring to the IAsyncEnumerable<T>
definition that lives here (although this will map trivially to other implementations).
In the near future, C# 8.0 will introduce Async Streams (I prefer the term Sequence, as Stream
is already a different .NET concept) as a language feature, and there will be a new definition of IAsyncEnumerable<T>
it will work with, but that doesn't stop us using Ix.Async today, either using the current definition which slightly differs from the C# 8.0 proposal or building the fork with the latest definition in.
Definition
public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetAsyncEnumerator();
}
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
T Current { get; }
ValueTask<bool> MoveNextAsync();
}
public interface IAsyncDisposable
{
ValueTask DisposeAsync();
}
This is the definition of IAsyncEnumerable<T>
from the C# 8.0 proposal, it should look very familiar, it is just IEnumerable<T>
with an async MoveNext
method, as you might expect.
We can now see the relationship with IObservable<T>
and IEnumerable<T>
.
Being in this familiar family means that we don't have to learn new concepts to start consuming and composing operations over this type.
IAsyncEnumerable<Bar> ConvertGoodFoosToBars(IAsyncEnumerable<Foo> items)
{
return items
.Where(foo => foo.IsGood)
.Select(foo => Bar.FromFoo(foo));
}
These extension methods are immediately understandable to us and are ubiquitous in C# already.
Producing sequences
All of this would be pretty academic if we couldn't generate sequences to be consumed. Today there are a few options.
1. Implement the IAsyncEnumerable<T>
and IAsyncEnumerator<T>
interfaces directly
You can do this, and for performance critical code, this might be the most suitable approach.
It does require a fair bit of boilerplate code, however, so here is a starting point:
// A starting point for your own IAsyncEnumerable extensions
public static class AsyncEnumerableExtensions
{
public static IAsyncEnumerable<T> MyExtensionMethod<T>(this IAsyncEnumerable<T> source)
{
return new MyAsyncEnumerable<T>(source);
}
public struct MyAsyncEnumerable<T> : IAsyncEnumerable<T>
{
readonly IAsyncEnumerable<T> enumerable;
internal MyAsyncEnumerable(IAsyncEnumerable<T> enumerable)
{
this.enumerable = enumerable;
}
public IAsyncEnumerator<T> GetAsyncEnumerator()
{
return new MyAsyncEnumerator(enumerable.GetAsyncEnumerator());
}
public struct MyAsyncEnumerator : IAsyncEnumerator<T>
{
readonly IAsyncEnumerator<T> enumerator;
internal MyAsyncEnumerator(IAsyncEnumerator<T> enumerator)
{
this.enumerator = enumerator;
}
public ValueTask DisposeAsync()
{
return enumerator.DisposeAsync();
}
public T Current => enumerator.Current;
public ValueTask<bool> MoveNextAsync()
{
return enumerator.MoveNextAsync();
}
}
}
}
2. Use the static helper methods in Ix.NET
IAsyncEnumerable<int> GenerateWithIx()
{
return AsyncEnumerable.CreateEnumerable(
() =>
{
var current = 0;
async Task<bool> f(CancellationToken ct)
{
await Task. Delay(TimeSpan.FromSeconds(0.5));
current++;
return true;
}
return AsyncEnumerable.CreateEnumerator(
moveNext: f,
current: () => current,
dispose: () => { }
);
});
}
3. Use CXuesong.AsyncEnumerableExtensions
I wanted to build something like this myself, and then I found this library, so I don't need to! Credit to Chen, this is a great library.
// using CXuesong.AsyncEnumerableExtensions
async Task Generator(IAsyncEnumerableSink<int> sink)
{
var i = 1;
while (true)
{
await Task.Delay(TimeSpan.FromSeconds(0.5));
await sink.YieldAndWait(i++);
}
}
AsyncEnumerableFactory.FromAsyncGenerator<int>(Generator)
This library offers a very nice and simple way to express sequences. You build an async function that takes an IAsyncEnumberableSink<T>
(defined by the library) and returns a Task
. Now you can do your awaits, but when you want to yield an item to the sequence, you call sink.YieldAndWait(value)
where sink
is that parameter.
4. Coming soon to a C# 8.0 near you
Today you cannot use the async
keyword and iterator methods together, so having an async iterator method would require a new language feature. Well, good news, it's in the works, take a sneak peek here.
Here is a snippet showing what it could look like.
static async IAsyncEnumerable<int> Mylterator()
{
try
{
for (int i = 0; i < 100; i++)
{
await Task.Delay(1000);
yield return i;
}
}
finally
{
await Task.Delay(200);
Console.WriteLine("finally");
}
}
Consuming sequencing
We can produce sequences, but that won't be much use to us if we cannot consume them.
1. ForEachAsync
Just like the .ForEach(...)
extension method on List<T>
, we have .ForEachAsync(...)
from Ix.Async, this lets us do work on each item, and gives us a Task
to await to drive the whole chain of pull-based work.
await seq.ForEachAsync(x => Console.WriteLine(x));
Unfortunately, dogmatism fails here, ForEachAsync
is suffixed with Async
because it returns a Task
and operates asynchronously, however, the delegate it takes is synchronous, this led me to build a method that can take an async delegate and name it ForEachAsyncButActuallyAsync
. 🤦
await seq.ForEachAsyncButActuallyAsync(x => Console.WriteLine(x));
2. C# 8.0 foreach
Again, we have language support on the way. Here's what it would look like:
var asyncSequence = GetMyAsyncSequence(cancellationToken: ct);
await foreach (var item in asyncSequence)
{
...
}
Design Decisions
One of the problems that have meant that we've had to wait so long a first class IAsyncEnumberable<T>
and language features is because there are many design decisions that need answering, for example;
- Does
IAsyncEnumerator<T>
implementIDisposable
or a new async version (IAsyncDisposable
)? UpdateIAsyncDisposable
it is! - If there is going to be an
IAsyncDisposable
, should the language support theusing
syntax for it? - Does the
CancellationToken
get passed intoMoveNext
each move orGetEnumerator
once? UpdateCancellationToken
s are not going to be handled by syntax, so you should flow it into theIAsyncEnumerable<T>
types yourself. - Should it be
MoveNext
, orMoveNextAsync
? UpdateMoveNextAsync
wins! - Should
MoveNextAsync
return aTask<bool>
or aValueTask<bool>
? UpdateValueTask<bool>
has it! - In the foreach syntax, where does the
await
modifier go? Outside the brackets? (Yes, of course, what sort of monster do you take me for?) - In the foreach syntax, how do you do the equivalent of
.ConfigureAwait(false)
? Update like this. - Will the foreach syntax look for the type, or the pattern?
await
doesn't just apply toTask
for example.
and that's just what comes immediately to mind, the more you think, the more you uncover.
Who is using it today?
There are a couple of large projects using this today:
- Entity Framework Core - Currently using an internal definition, but there is talk of plans to use whatever comes in C# 8.
- Google Cloud Platform Libraries - This one was a bit of a surprise to me. If you install any Google Cloud package, it will reference their Core package, which uses and references Ix.Async. One of the members of the team that builds this is (the) Jon Skeet, so that's quite an endorsement!
Stay tuned, there is more to come on this topic.
Top comments (0)