DEV Community

Federico Alterio
Federico Alterio

Posted on • Edited on

AsyncR3 (Async reactive extensions for .NET)

In modern .NET development, asynchronous programming has become essential for building responsive, scalable applications. While the traditional Reactive Extensions library (Rx.NET) has served developers well for years, it was designed in an era before async/await became ubiquitous. This creates friction with modern asynchronous code.

What is R3?

R3 is a modern reimplementation of Reactive Extensions for .NET, created by Cysharp. It has many improvements, but the most important one is to not stop the observable pipeline when an exception occurs.

R3Async: Bringing R3's Philosophy to Async observables

R3Async is the fully asynchronous counterpart to R3. While R3 focuses on synchronous reactive streams, R3Async reimagines the same philosophy with deeper async/await integration. It's a complete reimplementation built on top of ValueTask and IAsyncDisposable.

This implementation is better suited to common backend scenarios where a subscription wraps an external service (for example, a remote queue) rather than a local, in-process event source. In such cases, subscribing and unsubscribing must be genuinely asynchronous.

Background

The Async Gap

The fundamental problem with traditional Rx.NET is that Subscribe and Dispose are synchronous. In Rx.NET, when you call Subscribe() or dispose a subscription, these operations complete immediately and synchronously.

This creates a critical limitation: operators can't await the completion of cancellation. Consider the Switch operator, which should cancel the previous subscription when a new one arrives:

// Traditional Rx.NET - Switch can't await cancellation
observable
    .Switch()  // Initiates cancellation but doesn't wait for it!
    .Subscribe(x => Console.WriteLine(x));

// This can lead to overlapping operations because:
// 1. New observable starts immediately
// 2. Previous observable is only BEGINNING to cancel
// 3. Both may be running simultaneously for a brief period
Enter fullscreen mode Exit fullscreen mode

Because IDisposable.Dispose() is synchronous, Rx.NET can only initiate cancellation—it can't wait for cleanup to complete. This can lead to race conditions, resource conflicts, and overlapping async operations.

Also, the fact that OnNext is synchronous doesn't allow the producer to handle backpressure properly, because consumers can't just block the thread under pressure.

Enter R3Async

R3Async solves this by making subscription, disposal and notifications inherently asynchronous. SubscribeAsync returns ValueTask<IAsyncDisposable>, allowing operators to truly await cleanup:

AsyncObservable

The heart of R3Async is the AsyncObservable<T> and AsyncObserver<T> abstract classes:

public abstract class AsyncObservable<T>
{
    public ValueTask<IAsyncDisposable> SubscribeAsync(
        AsyncObserver<T> observer, 
        CancellationToken cancellationToken);    
}
Enter fullscreen mode Exit fullscreen mode

Key design decisions:

  • ValueTask Returns: Subscription is now async
  • IAsyncDisposable: Subscriptions return IAsyncDisposable for proper async cleanup
  • CancellationToken Support: First-class cancellation support throughout the API

AsyncObserver

Observers handle the stream events with three async methods:

public abstract class AsyncObserver<T> : IAsyncDisposable
{
    public ValueTask OnNextAsync(T value, CancellationToken cancellationToken);
    public ValueTask OnErrorResumeAsync(Exception error, CancellationToken cancellationToken);
    public ValueTask OnCompletedAsync(Result result);
}
Enter fullscreen mode Exit fullscreen mode

Installation

dotnet add package R3Async
Enter fullscreen mode Exit fullscreen mode

Basic Example

using R3Async;

// Create a simple observable that emits values every second
var subscription = await AsyncObservable
    .Interval(TimeSpan.FromSeconds(1))
    .Where(x => x % 2 == 0)
   .Select(x => x * 10)
    .SubscribeAsync(
        onNextAsync: async (value, ct) => 
        {
            Console.WriteLine($"Received: {value}");
        },
        cancellationToken: cancellationToken
    );

// Dispose when done
await subscription.DisposeAsync();
Enter fullscreen mode Exit fullscreen mode

Common Operators

R3Async provides a rich set of operators for transforming and composing streams:

Transformation Operators

Select: Transform each value

var transformed = await AsyncObservable
    .Range(1, 5)
    .Select(async (x, ct) => 
    {
        await Task.Delay(10, ct);
        return x * 2;
    })
    .ToListAsync(cancellationToken);
// Result: [2, 4, 6, 8, 10]
Enter fullscreen mode Exit fullscreen mode

Where: Filter values

var filtered = await AsyncObservable
    .Range(1, 10)
    .Where(x => x % 2 == 0)
    .ToListAsync(cancellationToken);
// Result: [2, 4, 6, 8, 10]
Enter fullscreen mode Exit fullscreen mode

Scan: Accumulate values

var accumulated = await AsyncObservable
    .Range(1, 5)
    .Scan(0, (acc, x) => acc + x)
    .ToListAsync(cancellationToken);
// Result: [1, 3, 6, 10, 15]
Enter fullscreen mode Exit fullscreen mode

Filtering Operators

Take: Take the first N items

var first3 = await AsyncObservable
    .Interval(TimeSpan.FromMilliseconds(100))
    .Take(3)
    .ToListAsync(cancellationToken);
// esult: [0, 1, 2]
Enter fullscreen mode Exit fullscreen mode

Skip: Skip the first N items

var skipped = await AsyncObservable
    .Range(1, 10)
    .Skip(5)
    .ToListAsync(cancellationToken);
// Result: [6, 7, 8, 9, 10]
Enter fullscreen mode Exit fullscreen mode

Distinct: Remove duplicates

var unique = await AsyncObservable
    .FromAsync([1, 2, 2, 3, 3, 3, 4])
    .Distinct()
    .ToListAsync(cancellationToken);
// Result: [1, 2, 3, 4]
Enter fullscreen mode Exit fullscreen mode

Aggregation Operators

CountAsync: Count items

var count = await AsyncObservable
    .Range(1, 100)
    .Where(x => x % 2 == 0)
    .CountAsync(cancellationToken);
// esult: 50
Enter fullscreen mode Exit fullscreen mode

FirstAsync / LastAsync: Get first or last item

var first = await AsyncObservable
    .Range(1, 10)
    .FirstAsync(cancellationToken);
// Result: 1

var last = await AsyncObservable
    .Range(1, 10)
    .LastAsync(cancellationToken);
// Result: 10
Enter fullscreen mode Exit fullscreen mode

ToListAsync / ToDictionaryAsync: Materialize to collections

var list = await observable.ToListAsync(cancellationToken);
var dict = await observable.ToDictionaryAsync(
    x => x.Id, 
    x => x.Name, 
    cancellationToken);
Enter fullscreen mode Exit fullscreen mode

Combination Operators

Merge: Merge multiple observables

var merged = AsyncObservable.Merge(
    AsyncObservable.Return(1),
    AsyncObservable.Return(2),
    AsyncObservable.Return(3)
);
var result = await merged.ToListAsync(cancellationToken);
// Result: [1, 2, 3] (order may vary)
Enter fullscreen mode Exit fullscreen mode

Concat: Concatenate observables sequentially

var concatenated = AsyncObservable.Concat(
    AsyncObservable.Range(1, 3),
    AsyncObservable.Range(4, 3)
);
var result = await concatenated.ToListAsync(cancellationToken);
// Result: [1, 2, 3, 4, 5, 6]
Enter fullscreen mode Exit fullscreen mode

Creating Custom Observables

R3Async provides several factory methods for creating observables:

FromAsync

// From Task
var taskObservable = AsyncObservable.FromAsync(async ct => 
{
    await Task.Delay(100, ct);
    return 42;
});
Enter fullscreen mode Exit fullscreen mode

Factory Methods

// Single value
var single = AsyncObservable.Return(42);

// Empty stream
var empty = AsyncObservable.Empty<int>();

// ever completes
var never = AsyncObservable.Never<int>();

// Throw error
var error = AsyncObservable.Throw<int>(new Exception("Error"));

// Range of values
var range = AsyncObservable.Range(1, 100);

// Time-based
var interval = AsyncObservable.Interval(TimeSpan.FromSeconds(1));
Enter fullscreen mode Exit fullscreen mode

Converting to IAsyncEnumerable and Backpressure

R3Async provides ToAsyncEnumerable to convert observables into IAsyncEnumerable<T>, enabling integration with async pull-based streams. However, this conversion introduces important backpressure considerations. R3Async requires you to explicitly provide a channel factory to define buffering semantics:

// Bounded channel - limits buffer size, applies backpressure
var boundedEnum = observable.ToAsyncEnumerable(
    () => Channel.CreateBounded<int>(new BoundedChannelOptions(10)
    {
        FullMode = BoundedChannelFullMode.Wait  // Blocks producer when full
    }));

// Unbounded channel - unlimited buffering (memory risk!)
var unboundedEnum = observable.ToAsyncEnumerable(
    () => Channel.CreateUnbounded<int>());

// Rendezvous channel - no buffering, direct handoff
var rendezvousEnum = observable.ToAsyncEnumerable(
    () => Channel.CreateBounded<int>(0));
Enter fullscreen mode Exit fullscreen mode

This explicit design forces you to think about backpressure strategies:

  • Bounded channels apply backpressure by blocking the producer when the buffer is full, preventing memory exhaustion but potentially slowing down fast sources
  • Unbounded channels never block but can consume unlimited memory if the consumer is slower than the producer
  • DropOldest/DropNewest strategies discard items when full, useful for scenarios where only the latest data matters

Example with backpressure handling:

// Fast producer, slow consumer scenario
var fastProducer = AsyncObservable.Interval(TimeSpan.FromMilliseconds(10));

// Convert with bounded buffer and backpressure
var asyncEnum = fastProducer.ToAsyncEnumerable(
    () => Channel.CreateBounded<long>(new BoundedChannelOptions(100)
    {
        FullMode = BoundedChannelFullMode.Wait  // Apply backpressure
    }));

// Slow consumer - the producer will automatically slow down
await foreach (var item in asyncEnum.WithCancellation(cancellationToken))
{
    await Task.Delay(100, cancellationToken);  // Slow processing
    Console.WriteLine(item);
}
// Producer is throttled by backpressure - no memory overflow!
Enter fullscreen mode Exit fullscreen mode

This design makes esource management decisions explicit rather than hiding them behind default behaviors.

Top comments (0)