DEV Community

loading...
CN Group CZ a.s.

Asynchronous Streams in C#

Josef Matějka
Updated on ・5 min read

C# Asynchronous streams

I think everyone will agree that collections and data enumeration is usually bread and butter of C# programming. As a programmers we have at our disposal great tools for synchronous enumeration, we have IEnumerable and its generic variant, which can be combined with foreach expression and/or with LINQ. But until C# 8.0 the options for asynchronous enumeration were limited. Iterators were strictly synchronous, thus programmers had to come up with their own solutions - the usual aproach was to create a method returning type Task<IEnumerable<T>>. A task that will asynchronously get a part of the collection and then synchronously enumerate it, when the task succesfully finished. In this article we will explore IAsyncEnumerable<T> which provides an fast and elegant solution to asynchronous enumeration.

Motivation

As I said in some cases it is necesarry to load data asynchronously because

  • they are behind a blocking call (files, data send through the network),
  • or we have to wait before data are generated (like in case of Channel<T>), and we don't want to block a thread by waiting for the result. Obtaining part of data asynchronously and then enumerating the result works good from the perfomance perspective, but syntactically it can be a bit awkward and over-complicated considering how elegant does the enumeration look in case of synchronous code.

IAsyncEnumerable and IAsycnEnumerator

I assume the reader is already familiar with IEnumerable<T> and IEnumerator<T>, the asynchronous variant closesly mirrors the synchronous version, but there are few differences.

  • GetAsyncEnumerator (counterpart of GetEnumerator) accepts a CancellationToken,
  • MoveNextAsync (counterpart of MoveNext) returns ValueTask<T> instead of T,
  • and disposing is done via DisposeAsync instead of Dispose.
  • Reset method is missing (but for synchronous part it is usually not implemented).

As in the case of IEnumerable<T> and IEnumerator<T> programmers usually does not have to implement explicitly, but they can use iterators which will generate it. Since C# 8.0 can also generate IAsyncEnumerable<T> and IAsyncEnumerator<T> via asynchronous iterators and C# provides direct support which allows you to use asynchronous stream in similar way as their synchronous counterpart. Which is why they should feel familiar, therefore it should not be problem to embrace them.

Creating asynchronous iterator block

Rules for the creating a asynchronous iterator with yield return or yield break are again analogous to the synchronous version. Your function must be of a return type IAsyncEnumerable<T> or IAsyncEnumerator<T> it also must be async function. That means no parameter can be ref or out and it should contain an await expression. For example, consider this function:

private static async IAsyncEnumerable<int> FibonacciAsync([EnumeratorCancellation] CancellationToken token = default)
{
    var f0 = 0;
    var f1 = 1;
    yield return f0;
    yield return f1;
    while (true)
    {
        var fn = f0 + f1;
        if (fn < 1)
        {
            yield break;
        }
        yield return fn;
        await Task.Delay(TimeSpan.FromSeconds(1), token).ConfigureAwait(false);
        f0 = f1;
        f1 = fn;
    }
}
Enter fullscreen mode Exit fullscreen mode

This function returns a Fibonacci sequence of all numbers that fit into an int, overflow is check by this condition if (fn < 1), we use Task.Delay for simulation of asynchronous execution, in reality the asynchronous part would be for example an database fetch or any other similar blocking operation.

Since we are talking about asynchronous execution, we have to ponder a bit about cancellation of execution. If we look at GetAsyncEnumerator it accepts an optional CancellationToken. But in our case the IAsyncEnumerable<T> is generated by compiler, how can we work with a token passed to GetAsyncEnumerator? If you put attribute EnumeratorCancellation in front of an CancellationToken argument and compiler knows that for the enumerator cancellation serves said argument.

Enumerating throught asynchronous stream

Let's continue by enumeration of first 5 Fibonacci's numbers. Seasoned C# programmer would immediatelly use Take from System.Linq. Sadly for asynchronous enumerable it is necessary to download nuget package System.Linq.Async first. It contains overloads of classic extension methods for IAsyncEnumerable<T>. We expect that in future version of C# you will not have to download this nuget package, the methods will be automatically incorporated in base framework and no additional packages will be requiered.

public static async Task Main()
{
    var source = new CancellationTokenSource();
    await foreach (var number in FibonacciAsync().Take(5).WithCancellation(source.Token).ConfigureAwait(false))
    {
        Console.WriteLine(number);
    }
}
Enter fullscreen mode Exit fullscreen mode

As you can see, C# supports asynchronous version of foreach with await word in front of it, the difference is that the underneath MoveNextAsync calls are awaited and after finishing the enumerator is disposed asynchronously. If you need to pass a CancellationToken you can do that via extension method WithCancellation, which distributes the token to each internal MoveNextAsync call. This token we get in our FibonacciAsync, because we decorated our token argument with EnumeratorCancellation attribute.

Performance concerns

Like in syncrhonous case, using an async iterator means, that a state machine is generated. It is represented by a hidden class implementing these interfaces: IAsyncEnumerable<T>, IAsyncEnumerator<T>, IAsyncStateMachine, IValueTaskSource<bool>, and IValueTaskSource. The reason why one class implements both IAsyncEnumerable<T> and IAsyncEnumerator<T> is to avoid unnecessary allocations. If you have a class implementing IAsyncEnumerable<T> and another one implementing IAsyncEnumerator<T> you need to create a new instance of enumerator each enumeration. That can be avoided if one class implement both interfaces. If no one is using the object, it can be reused as an enumerator (the same principle also goes for synchronous iterators).

The main trick to keep the allocation low is to use ValueTask<T> over a Task and implementing IValueTaskSource<bool> and IValueTaskSource. As the name suggest, the ValueTask<T> has value semantics (whereas classic Task<T> is a reference type). This is important in cases when the MoveNextAsync call runs synchronously, because in that case the ValueTask<T> can be put on the stack and we can avoid heap allocation and garbage collection of this task - which hleps performance-wise. So it is expected to use async enumeration with some kind of buffering, when in asynchronous call some data are loaded and then they're returned synchronously. If you are interested in the ValueTask<T> more, I recommend reading this blog post: Understanding the Whys, Whats, and Whens of ValueTask, which explains this type and its usage in depth.

Last interface IAsyncStateMachine is only intended for compiler, the runtime and internal APIs uses this interface to perform awaits.

The number of unecessary allocations is low, the microsoft itself states this: "The compiler implementation has been designed to keep allocations incredibly low; in fact, no matter how many times an async iterator yields, the most common case is that it incurs at most two allocations of overhead."

Thread safety warning

This may be surprising to some, but in general MoveNextAsync is not thread-safe. On a given enumerator each call to MoveNextAsync must happen after the last MoveNextAsync has finished, therefore you cannot invoke enumeration concurrently. It is not a problem for a MoveNextAsync to be called from a different thread, but you must not call this function from multiple threads at the same time. The main problem is, that anything can happen. It can seem to be running correctly in some cases, in some cases an exception could be emitted. So if you want to save yourself some debugging time, stay away from enumerating IAsyncEnumerable<T> concurrently.

Conclusion

Newly added asynchronous iterators provide a great tool for dealing with reading data collections asynchronously, this can be useful when loading data from database, files, internet or it can be usefull for in-memory collections too, like in case of Channel data structure. Plus side is, there's very small performance overhead and they closely mirror their synchronous counterpart, which makes them easier to use, because most programmers have already dealt with enumerables.

Discussion (0)