Before reading, check this to know what R3Async is.
In that post, I wrote:
This implementation is better suited to common backend scenarios where a subscription wraps an external service (for example, a remote queue) rather than a local, in-process event source. In such cases, subscribing and unsubscribing must be genuinely asynchronous.
Makes sense, but without a concrete example, it’s just abstract philosophy. Let’s see how this works in practice with RabbitMQ.
Step 1: Add the RabbitMQ.Client Package and R3Async
First, we need to reference the official RabbitMQ .NET client package and
R3Async.
dotnet add package RabbitMQ.Client
dotnet add package R3Async
Step 2: Create a connection to RabbitMQ Server
Connections are expensive, so we want a single connection for the app. It should be established only when needed and disposed when unused.
R3Async helps here:
extension(IConnection)
{
public static AsyncObservable<IConnection> CreateSharedConnectionObservable(ConnectionFactory connectionFactory)
{
return AsyncObservable.Using(
async token => await connectionFactory.CreateConnectionAsync(token),
static connection => AsyncObservable.CreateAsBackgroundJob<IConnection>(
(observer, ct) => observer.OnNextAsync(connection, ct),
true))
.OnErrorResumeAsFailure()
.StatelessReplayLatestPublish()
.RefCount();
}
}
We use:
-
OnErrorResumeAsFailure()because an error here is fatal. -
StatelessReplayLatestPublish()to share the connection (stateless so we don’t remember past errors). -
RefCount()to create the connection when the first subscriber appears and dispose it when the last one leaves.
Step 3: Create Channels
Channels in RabbitMQ are more lightweight than connections, but still we can't abuse them. Also they are not thread-safe. Sharing them across threads without protection is risky.
To handle this safely, we can wrap a channel in a small synchronized wrapper that uses a semaphore to control access:
public class SynchronizedChannel(IChannel channel) : IAsyncDisposable
{
readonly SemaphoreSlim _gate = new(1, 1);
readonly IChannel _channel = channel;
public async ValueTask<ChannelReleaser> GetChannelLockAsync(CancellationToken cancellationToken)
{
await _gate.WaitAsync(cancellationToken);
return new ChannelReleaser(this);
}
public async ValueTask DisposeAsync()
{
await _channel.DisposeAsync();
}
public readonly struct ChannelReleaser(SynchronizedChannel parent) : IDisposable
{
public void Dispose()
{ parent._gate.Release();
}
public IChannel Channel => parent._channel;
}
}
Step 4: Create Shared Channels
Now we are ready to create shared channels.
We can have multiple channels, but the same rules as connections apply:
- If no one is using a channel, it should be disposed.
- If someone subscribes, it should be created on demand.
This allows to not bloat with channels while keeping access thread-safe with the synchronized wrapper. I'ts a trade-off. We can still create multiple SynchronizedChannel instances if we want to.
public static AsyncObservable<SynchronizedChannel> SharedSynchronizedChannel(
this IConnection connection,
Func<IConnection, CancellationToken, ValueTask<IChannel>> channelFactory)
{
return AsyncObservable.Using(
async ct =>
{
var channel = await channelFactory(connection, ct);
return channel.ToSynchronizedChannel();
},
static channel => AsyncObservable.CreateAsBackgroundJob<SynchronizedChannel>(
(observer, ct) => observer.OnNextAsync(channel, ct),
true))
.OnErrorResumeAsFailure()
.StatelessReplayLatestPublish()
.RefCount();
}
Small note: We use CreateAsBackgroundJob instead of Create so that SubscribeAsync can return even if the channel instance is not completely notified to the observer (SubscribeAsync will still return only after the Channel is created, that would be impossible with sync R3 or Rx.NET).
Step 5: Convert Basic Consume to an AsyncObservable
Now that we have shared, synchronized channels, we can wrap RabbitMQ’s basic consume in an AsyncObservable.
public record BasicConsumerArgs<T>(SynchronizedChannel SyncChannel, T Value, BasicDeliverEventArgs OriginalArgs);
extension(SynchronizedChannel syncChannel)
{
public AsyncObservable<BasicConsumerArgs<T>> CreateBasicConsumeObservable(
Func<IChannel, IAsyncBasicConsumer, CancellationToken, ValueTask<string>> basicConsume,
bool unsubscriptionNoWait = false)
{
var observable = AsyncObservable.Create<BasicConsumerArgs<Unit>>(async (observer, subscribeToken) =>
{
using var channelLock = await syncChannel.GetChannelLockAsync(subscribeToken);
var channel = channelLock.Channel;
var consumer = new AsyncEventingBasicConsumer(channel);
var cts = new CancellationTokenSource();
var streamToken = cts.Token;
consumer.ReceivedAsync += OnReceivedAsync;
var consumerTag = await basicConsume(channel, consumer, subscribeToken);
async Task OnReceivedAsync(object sender, BasicDeliverEventArgs args)
{
try
{
await observer.OnNextAsync(new(syncChannel, Unit.Default, args), streamToken)
.AsTask()
.WaitAsync(Timeout.InfiniteTimeSpan, streamToken);
}
catch (OperationCanceledException)
{
}
}
return AsyncDisposable.Create(async () =>
{
using (cts)
{
consumer.ReceivedAsync -= OnReceivedAsync;
await Task.Run(cts.Cancel, CancellationToken.None);
using var innerChannelLock = await syncChannel.GetChannelLockAsync(CancellationToken.None);
await innerChannelLock.Channel.BasicCancelAsync(consumerTag, unsubscriptionNoWait, CancellationToken.None);
}
});
});
return observable;
}
}
Here, we map the SubscribeAsync of the AsyncObservable to RabbitMQ’s += ReceivedAsync + BasicConsume, the DisposeAsync of the subscription to -= and BasicCancelAsync, and ReceivedAsync to OnNextAsync.
A small detail for clever readers: we handle reentrancy because ReceivedAsync might trigger DisposeAsync while processing a message.
The RabbitMQ client I think does not support disposing a channel from within ReceivedAsync, so which could otherwise lead to deadlocks.
Full Example
We register the shared connection in the DI
builder.Services.AddSingleton<AsyncObservable<IConnection>>(s =>
{
var uri = s.GetRequiredService<IConfiguration>()["MESSAGING_URI"]!;
return IConnection.CreateSharedConnectionObservable(new ConnectionFactory
{
Uri = new(uri)
});
});
We create a background service to consume messages (30 maximum messages concurrently):
public class ConsumerHostedService(AsyncObservable<IConnection> sharedConnection) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await sharedConnection.Select(connection => connection.SharedSynchronizedChannel(async (connection, token) => await connection.CreateChannelAsync(cancellationToken: token)))
.Switch()
.Select(syncChannel => syncChannel.CreateBasicConsumeObservable(async (channel, consumer, token) =>
{
await channel.QueueDeclareAsync(queue: "hello", durable: false, exclusive: false, autoDelete: false,
arguments: null, cancellationToken: token);
return await channel.BasicConsumeAsync("hello", autoAck: false, consumer: consumer, cancellationToken: token);
}))
.Switch()
.Select(message => AsyncObservable.FromAsync(async token =>
{
await message.HandleAndAck(async t =>
{
var body = Encoding.UTF8.GetString(message.OriginalArgs.Body.Span);
Console.WriteLine($" [x] Processing {body}");
await Task.Delay(2000, t);
Console.WriteLine($" [x] Processed {body}");
}, cancellationToken: token);
}))
.Merge(30)
.OnDispose(() => Console.WriteLine("Consumer disposed"))
.WaitCompletionAsync(stoppingToken);
}
}
HandleAsync is just an helper extension methods that acks if function executed succesfully, or nacks otherwise:
Other approaches
Some operators of AsyncObservable are a bit dangerous to use in this context, since we should ack (or nack) messages. We can easly fix this with a thin wrapper
Step 6: Wrap AsyncObservable into a dedicated Akc-aware observable
We can define a small wrapper, BasicConsumerObservable, that wraps AsyncObservable<BasicConsumerArgs<T>> and provides LINQ-style operations safely:
public record struct BasicConsumerObservable<T>(AsyncObservable<BasicConsumerArgs<T>> RawObservable);
// LINQ-like extensions for BasicConsumerObservable
public static class BasicConsumerObservable
{
public static BasicConsumerObservable<T> ToBasicConsumerObservable<T>(
this AsyncObservable<BasicConsumerArgs<T>> @this) => new(@this);
extension<T>(BasicConsumerObservable<T> @this)
{
public BasicConsumerObservable<TDest> Select<TDest>(
Func<BasicConsumerArgs<T>, CancellationToken, ValueTask<TDest>> selector)
{
var mappedObservable = @this.RawObservable.Select(async (args, token) =>
{
var result = await selector(args, token);
return new BasicConsumerArgs<TDest>(args.SyncChannel, result, args.OriginalArgs);
});
return new BasicConsumerObservable<TDest>(mappedObservable);
}
public BasicConsumerObservable<TDest> Select<TDest>(
Func<BasicConsumerArgs<T>, TDest> selector)
{
var mappedObservable = @this.RawObservable.Select(args =>
{
var result = selector(args);
return new BasicConsumerArgs<TDest>(args.SyncChannel, result, args.OriginalArgs);
});
return new BasicConsumerObservable<TDest>(mappedObservable);
}
public BasicConsumerObservable<T> Where(
Func<BasicConsumerArgs<T>, CancellationToken, ValueTask<bool>> predicate,
bool multiple = false)
{
var filteredObservable = @this.RawObservable.Where(async (args, token) =>
{
var ret = await predicate(args, token);
if (!ret)
{
using var channelLock = await args.SyncChannel.GetChannelLockAsync(token);
await channelLock.Channel.BasicAckAsync(args.OriginalArgs.DeliveryTag, multiple, token);
}
return ret;
});
return @this with { RawObservable = filteredObservable };
}
public BasicConsumerObservable<T> Where(
Func<BasicConsumerArgs<T>, bool> predicate,
bool multiple = false)
{
var filteredObservable = @this.RawObservable.Where(async (args, token) =>
{
var ret = predicate(args);
if (!ret)
{
using var channelLock = await args.SyncChannel.GetChannelLockAsync(token);
await channelLock.Channel.BasicAckAsync(args.OriginalArgs.DeliveryTag, multiple, token);
}
return ret;
});
return @this with { RawObservable = filteredObservable };
}
public BasicConsumerObservable<T> Merge(BasicConsumerObservable<T> other)
{
var mergedObservable = @this.RawObservable.Merge(other.RawObservable);
return @this with { RawObservable = mergedObservable };
}
}
}
This is just an idea, but you get the point. The heavy lifting is done by R3Async:
-
AsyncObservables handle subscriptions, async resource lifetimes, and backpressure. - You can integrate DI and scoped handlers by creating scopes inside your
HandleAsyncfunction. - Be creative — the core idea is that AsyncObservables map naturally to RabbitMQ streams, making it easy to build safe, composable, and fully asynchronous pipelines.
- You can also convert them to
IAsyncEnumerables and “return” them in SignalR hubs, Server-Sent Events (SSE), or other streaming endpoints.
Top comments (0)