In part 2 we introduced an asynchronous API when reading data from the wire, where only the number of received bytes was considered. On this part 3 let's extend it to access the actual received data.
As usual, the entire source code can be found at Minima
Before diving into the code let's understand which data structures make sense to consider.
Data will be pushed to the CQ shared ring buffers by the kernel whenever some data arrives from the wire, this can represent a partial, complete or more than one request. By request I mean it could be a HTTP/1.1, HTTP/2, gRPC, websocket, etc pretty much a request from any protocol. Whenever we call a ReadAsync, we receive a snapshot with a tail that should point us to a snapshot of CQE metadata, currently this metadata only includes the number of bytes from each CQE, we need to add a byte* pointing to where the data is.
public struct Item
{
public byte* Ptr; // new
public ushort Bid;
public int Len;
public bool HasBuffer;
public ReadOnlySpan<byte> AsSpan() => new(Ptr, Len); // new
}
then on the reactor's dispatch receive branch
else if (kind == KindRecv)
{
bool hasBuf = (cqe.flags & IORING_CQE_F_BUFFER) != 0;
ushort bid = hasBuf ? (ushort)(cqe.flags >> IORING_CQE_BUFFER_SHIFT) : (ushort)0;
(...)
byte* ptr = hasBuf ? _bufSlab + (nuint)bid * (nuint)BufferSize : null;
conn.Complete(cqe.res, bid, hasBuf, ptr);
(...)
}
_hasBuf - True when the kernel attached a provided buffer to this completion.
bid - the index of the buffer slot the kernel picked from the provided-buffer ring for this recv.
_bufSlab - The contiguous unmanaged memory block backing every recv buffer the kernel can DMA into for this reactor.
So ptr points to the "slot" which can be calculated by knowing the buffer Id and the size of each buffer. These slots are contiguous memory allocated during initialization.
Now for each received CQE we store the byte* where kernel stored the received data, each ReadAsync will return a snapshot that contains one or more Item, each Item contains the metadata for one CQE. On the handler side we must consume this data. We don't want to be dealing with pointers though, that would force us to use unsafe everywhere we touch the received data.
We already have the ReadOnlySpan view of the data via the AsSpan() but spans are ref structs and can't be freely used anywhere, but why is that? Spans are used to create views over stack allocated data unlike its heap allocated counterpart Memory/ReadOnlyMemory, we can't directly use ReadOnlyMemory either because it can't be directly created from a byte* even though this byte* points at heap allocated data stored in each reactor's _bufSlab we initialize for each reactor.
I present you UnmanagedMemoryManager
public sealed unsafe class UnmanagedMemoryManager : MemoryManager<byte>
{
private readonly byte* _ptr;
private readonly int _length;
public ushort BufferId { get; }
public byte* Ptr => _ptr;
public int Length => _length;
public UnmanagedMemoryManager(byte* ptr, int length)
{
_ptr = ptr;
_length = length;
}
public UnmanagedMemoryManager(byte* ptr, int length, ushort bufferId)
{
_ptr = ptr;
_length = length;
BufferId = bufferId;
}
public override Span<byte> GetSpan() => new Span<byte>(_ptr, _length);
public override MemoryHandle Pin(int elementIndex = 0) => new MemoryHandle(_ptr + elementIndex);
public override void Unpin() { }
public void Free()
{
if (_ptr != null)
{
NativeMemory.AlignedFree(_ptr);
}
}
protected override void Dispose(bool disposing) { }
}
UnmanagedMemoryManager is the bridge between safe and unsafe code, by inheriting from MemoryManager it can be exposed as Memory and plugged into the entire BCL ecosystem for free:
- PipeReader / PipeWriter
- Stream.ReadAsync(Memory) / WriteAsync(ReadOnlyMemory)
- ReadOnlySequence (built from ReadOnlyMemory segments)
- IBufferWriter
- Any async API that takes Memory
This is especially useful for ReadOnlySequence which is very handy when dealing with TCP fragmentation.
This is how zero allocation is achieved on the receiving branch, the data is written to pre-allocated slots and we directly read from them. This data can be parsed by slicing over it and is valid until we return the CQE's buffer Id as can be seen in the snipped below through conn.ReturnBuffer. After returning each buffer, the kernel may reuse that "slot" for new incoming data so its data can be invalid/overwritten.
So, how does how handler look now?
public static async Task HandleAsync(Reactor reactor, int fd, Connection conn)
{
try
{
while (true)
{
RecvSnapshot snap = await conn.ReadAsync();
while (conn.TryGetItem(snap, out SpscRecvRing.Item item))
{
if (item.HasBuffer)
{
UnmanagedMemoryManager mem = item.AsMemoryManager();
ReadOnlyMemory<byte> data = mem.Memory;
// data is now usable with any BCL Memory<byte>/async API
_ = data.Length;
reactor.ReturnBuffer(mem.BufferId);
}
conn.QueueResponse(fd);
}
if (snap.IsClosed)
{
conn.Close(fd);
return;
}
conn.ResetRead();
}
}
catch (Exception ex)
{
Console.Error.WriteLine($"[r{reactor.Id}] handler crash on fd={fd}: {ex}");
conn.Close(fd);
}
}
The possibilites are now endless, we can build a ReadOnlySequence from all the data to facilitate slicing across multiple segments, also in the case of incomplete requests we can again create a ReadOnlySequence, call another ReadAsync and add the received segments to the already existing ReadOnlySequence.
Top comments (0)