DEV Community

Diogo Martins
Diogo Martins

Posted on

C# Networking Deep Dive With io_uring part 2 - Bridge the Async Model

In part 1 we built the minimal io_uring loop: setup, mmaps, SQE/CQE draining, accept/recv/send via opcode dispatch.This was the first step to understanding how the kernel interface works but the dispatch logic was basically hand coded, a state machine that would not scale.
This second part is all about introducing the asynchronous model to await data pushed by the kernel. For simplicity, in this article scope we will simply return the number of bytes received, in later parts it will be described how to adapt this to return the actual request data and parse it.

As usual, the full source code can be found at ....

Why not just use Task?

The direct option would be to just return a Task, completed by the dispatcher when a CQE arrives, to avoid the allocations this would cause for every asynchronous read we are going with the zero allocation option. ValueTask over a reusable source, the high performance path, the source is a single object that lives as long as the TCP connection.

public interface IValueTaskSource<out TResult> 
{
   TResult GetResult(short token);

   ValueTaskSourceStatus GetStatus(short token);

   void OnCompleted(Action<object?> continuation, object? state, short token, ValueTaskSourceOnCompletedFlags flags);

}  
Enter fullscreen mode Exit fullscreen mode

These three methods are all the runtime needs to drive the await "machinery". The token is just a safety net, every Reset() bumps an internal version, like a generation counter so that a stale awaiter passing an old token gets caught.

Typically we don't need to implement these three methods from scratch, the BCL provides ManualResetValueTaskSourceCore, a struct that holds/owns the value, version, captured continuation and the scheduling context.

We can delegate the interface methods to it

private ManualResetValueTaskSourceCore<RecvSnapshot> _readSignal;        

RecvSnapshot IValueTaskSource<RecvSnapshot>.GetResult(short token) => _readSignal.GetResult(token);     

ValueTaskSourceStatus IValueTaskSource<RecvSnapshot>.GetStatus(short token) => _readSignal.GetStatus(token);

void IValueTaskSource<RecvSnapshot>.OnCompleted(Action<object?> c, object? s, short t, ValueTaskSourceOnCompletedFlags f)                         
      => _readSignal.OnCompleted(c, s, t, f); 
Enter fullscreen mode Exit fullscreen mode

The generic T is RecvSnapshot, a small struct that captures what is available to read, the handler awaits this snapshot and drains the items it covers. This will be covered further on, for now just think of it as a snapshot that points to a circular buffer tail, the reason we need this and can't just drain the entire buffer is because we can receive data as we process this, the tail can change anytime and that would potentially cause a desync when extracting data.

_readSignal exposes Version, Reset(), SetResult(value) and SetException(ex), that is the whole API surface we will use. The "Manual" in ManualResetValueTaskSourceCore is the contract, Reset() must be called between completions, without it the next SetResult() would throw, this is intentional design, reuses without explicit resets could mask state management bugs.

Meeting point

Each Connection has two sides:

  • Producer, the CQE dispatcher. When a CQE arrives it must be delivered.
  • Consumer, the application handler parked on await ReadAsync(), waiting for that result.

Most of the time these two are synchronized in the simplest case: the consumer parks waiting for data, the producer wakes it. There are however gaps that need to be addressed.

  • Consumer calls ReadAsync() to park but the producer already had a result read from a previous CQE. In this case we don't want to block/await, we want to return the buffered result synchronously.
  • Producer fires a result but the consumer hasn't called ReadAsync() yet. This can typically happen between two consumer awaits, we can't drop the result or overwrite it if more than one CQE arrives back to back.

Both cases boil down to the same issue/need, a buffer between the producer and consumer so that data is preserved no matter what runs first. A solution is a bounded single producer single consumer ring, producer enqueues from the dispatcher and the consumer dequeues from ReadAsync, this structure implementation can be found at SpscRecvRing.cs.

internal sealed class SpscRecvRing
{
   public struct Item { public ushort Bid; public int Len; public bool HasBuffer; }

   public bool TryEnqueue(in Item item);
   public bool TryDequeue(out Item item);
   public long SnapshotTail();
   public bool TryDequeueUntil(long tailSnapshot, out Item item);
   public bool IsEmpty();
}
Enter fullscreen mode Exit fullscreen mode

One important characteristic is that this is a ring or circular buffer, which size must be a power of 2, this removes the need to ever need to clear or reset the buffer.
In this part 2 scope we only care about Len, the number of bytes received, Bid(Buffer Id) and HasBuffer are for returning the buffer to the kernel ring and will be properly covered in part 3.

The two interesting methods are SnapshotTail and TryDequeueUntil, together they let the consumer take a "picture" of what was available at that moment and drain everything up to that point in a single batch, without chasing a moving tail, as was explained above.

internal readonly struct RecvSnapshot
{
   public readonly long Tail;
   public readonly bool IsClosed;
   public RecvSnapshot(long tail, bool isClosed) { Tail = tail; IsClosed = isClosed; }
   public static RecvSnapshot Closed() => new(0, isClosed: true);
}
Enter fullscreen mode Exit fullscreen mode

Before moving on I'd like to leave a "small" paragraph here, I've been confronted many times with "but why do you say run synchronously? Don't we want everything asynchronously to be more efficient?", so.. typically when we hear about asynchronous execution we think about parallelism, thread pool and multi threading, this is generally not wrong but not precise either. The typical async workflow is to tell the CPU to execute a piece of code/logic in the "future", a callback, promise or however we many name it, in C# this callback will execute in a thread from the thread pool (as typically .ConfigureAwait is set with false) and you could say that is one of the core mechanisms of c# await/async model which makes it so good. But in the end of the day if we can avoid this future callback and immediately execute the logic which can happen if the CQE was received before we call ReadAsync, this would be the most efficient scenario as we have if there is data already available when we call ReadAsync, short circuiting the whole callback and immediately execute it.

Delegating the Task execution to the thread pool sounds great but it comes with a price which becomes noticeable when we want to deal with millions of requests per second and more important, more threads does not always mean faster. Software runs on a CPU which has a limited number of CPU threads typically the same number of physical cores or 2x that value. While we can create thousands or tens of thousands of software threads in C#, those will run in a limited number of CPU threads. Here is where IValueTaskSource can shine, by default it is set to run the OnCompleted callback synchronously but do not get confused here, this synchronously means the callback will run on the same thread of the caller, not that will block the await.

Before moving into how IValueTaskSource is bridged between the handler and the reactor loop let's first understand the four flags/states owned by the Connection class.

private ManualResetValueTaskSourceCore<RecvSnapshot> _readSignal;     // wake signal carrying the snapshot
private int _armed;                                                    // 1 when handler is parked
private int _closed;                                                   // sticky once recv returned <=0 or send failed
private readonly SpscRecvRing _recv = new(capacityPow2: 16);           // buffered recv items                                                                                                                                
Enter fullscreen mode Exit fullscreen mode

_armed means the consumer is parked. After enqueueing the data into the ring, the producer checks _armed, if set, it fires SetResult with a fresh snapshot to wake the consumer. If not armed, the data just sits in the ring waiting for the next ReadAsync to pick it up via the synchronous fast path. There is no pending data mechanism, the SPSC ring is the bridge from edge triggered wakes (one CQE at a time) to level-triggered consumption (the handler reads when it can).
_closed is sticky, once set all future ReadAsync will return immediately.

On the producer side

public void Complete(int res, ushort bid, bool hasBuffer) {
   if (res <= 0) {
       _closed = 1;
       if (hasBuffer) _reactor.ReturnBuffer(bid);   // defensive: kernel rarely attaches a buffer here
   }
   else if (!_recv.TryEnqueue(new SpscRecvRing.Item { Bid = bid, Len = res, HasBuffer = hasBuffer })) {
       _closed = 1;
       if (hasBuffer) _reactor.ReturnBuffer(bid);   // overflow → handler can't keep up; return the buffer, close
   }

   if (_armed == 1) {
       _armed = 0;
       _readSignal.SetResult(new RecvSnapshot(_recv.SnapshotTail(), _closed != 0));
   }
}
Enter fullscreen mode Exit fullscreen mode

On failure path (res<=0 or can't enqueue the Item) we hand the buffer back to the kernel ring so that it doesn't slowly run out of recv buffers. If the consumer is already parked, wake it with SetResult and pass a snapshot of the ring's tail at this moment. The snapshot is what makes this meaningful, it basically tells the handler "everything from your current head up to this tail can be drained in s single batch", if three CQEs arrived back to back while the handler was busy, all three are visible in the next snapshot.

The consumer side

public ValueTask<RecvSnapshot> ReadAsync() {
   if (!_recv.IsEmpty())  // synchronous fast path
       return new ValueTask<RecvSnapshot>(new RecvSnapshot(_recv.SnapshotTail(), _closed != 0));

   if (_closed != 0)
       return new ValueTask<RecvSnapshot>(RecvSnapshot.Closed());

   if (_armed == 1)
       throw new InvalidOperationException("ReadAsync already armed.");
   _armed = 1;

   return new ValueTask<RecvSnapshot>(this, _readSignal.Version);
}
Enter fullscreen mode Exit fullscreen mode

If the ring has data, take a fresh snapshot and return it as a complete ValueTask, no state machine yeild, allocation or wake required, the synchronous fast path.
If the ring is empty and the connection is closed, return a closed snapshot.
The check on _armed==1 is a guardrail, two simultaneous ReadAsync calls on the same connection would "clobber" each other's continuation.
Then the typical case, set _armed and handle the awaiter a ValueTask(this, _readSignal.Version), the handler suspends, the runtime stores the continuation on _readSignal and the thread is freed.

Draining the snapshot

So, once the handler receives a snapshot it pulls items out one by one with TryGetItem.

public bool TryGetItem(in RecvSnapshot snap, out SpscRecvRing.Item item)
   => _recv.TryDequeueUntil(snap.Tail, out item);
Enter fullscreen mode Exit fullscreen mode

TryDequeueUntil advances the consumer's head as far as snapshot's tail even if the producer has since moved ahead. The handler always sees a stable batch.

RecvSnapshot snap = await conn.ReadAsync();

while (conn.TryGetItem(snap, out SpscRecvRing.Item item)) {
  // process item.Len bytes; in part 3 we will also use item.Bid to read the data
   if (item.HasBuffer) reactor.ReturnBuffer(item.Bid);
   conn.QueueResponse(fd);
}

if (snap.IsClosed) { conn.Close(fd); return; }
conn.ResetRead();
Enter fullscreen mode Exit fullscreen mode

IValueTaskSource "magic"

In simple terms, the "trick" here is that the continuation callback will always run on the same thread, avoiding the thread pool cost. The reactor/dispatcher loop always runs on the same software AND CPU thread even though it is asynchronous and await boundaries are hit.

When a recv CQE arrives, the reactor calls Complete which enqueues into the ring and calls SetResult with a fresh snapshot. With RunContinuationsAsynchronously left at its default false and no captured SynchronizationContext, the stored continuation is invoked inline, on the reactor thread:

Reactor.Run
--Dispatch(recv_cqe)
----Connection.Complete(res, bid, hasBuffer)
------_recv.TryEnqueue(...)
------_readSignal.SetResult(new RecvSnapshot(tail, isClosed))
--------continuation(state) ← function pointer call
----------HandleAsync.MoveNext ← state machine resumes after the await
------------RecvSnapshot snap = result; ← GetResult returns the snapshot
------------while (TryGetItem(...)) { ... process, send response, return buffer ... }
------------ResetRead, await ReadAsync
------------// ...the loop starts over, still on the reactor thread

The handler runs on top of the reactor's call stack. No thread-pool hop, no scheduler. SetResult is, mechanically, just a function call into the handler's continuation that happens to carry a snapshot as its payload. When the handler finishes draining, hits the next await and parks, the stack unwinds back to Dispatch which moves on to the next CQE.

The reactor/dispatcher loop always runs on the same software AND CPU thread, and so does the handler — even though the code is fully asynchronous and await boundaries are hit on every iteration.

Top comments (0)