In the previous article I introduced Trupe, a lightweight actor model library for .NET. Since then, I’ve made several design improvements to enhance clarity, performance, and developer experience.
In this article, I’ll walk you through the latest modifications and implement two essential messaging patterns: Tell and Ask.
What's Changed in Trupe?
I'm going to starting talking about the change I've done in the library
Actor Interface
In the initial design, I debated between IActor and IActor<TMessage>. I wanted to avoid forcing a base class on developers. After some reflection, I’ve refined the naming to better reflect intent.
-
IActor: This remains the marker interface that identifies a class as an Actor. -
IHandleActorMessage<TMessage>: Renamed fromIActor<TMessage>. This makes it explicit that the interface is responsible for handling a specific message type. -
HandleAsync: Renamed fromReceiveAsync. "Handle" more accurately describes the processing action.
The updated interfaces look like this:
public interface IActor
{
IActorContext Context { get; }
ValueTask HandleAsync(object? message, CancellationToken cancellationToken = default);
}
public interface IHandleActorMessage<TMessage>
{
ValueTask HandleAsync(TMessage message, CancellationToken cancellationToken = default);
}
Mailbox Simplification
I have modernized the IMailbox interface. Instead of a traditional Dequeue method, the mailbox now implements IAsyncEnumerable<IMessage>. This simplifies the consumption loop in ActorProcess by allowing us to use the elegant await foreach pattern.
public interface IMailbox : IAsyncEnumerable<IMessage>
{
ValueTask EnqueueAsync(IMessage message, CancellationToken cancellationToken = default);
}
Actor Process Improvements
Task Execution Strategy
Previously, I started the actor loop using TaskCreationOptions.LongRunning. However, research highlighted that this forces the creation of a new thread outside the ThreadPool. For a library like this, that is not ideal; we want to leverage the ThreadPool's efficiency and avoid thread exhaustion. I also want to avoid capturing the SynchronizationContext unnecessarily.
Lifecycle Management
I’ve refined the start/stop API so only the supervisor controls the actor’s lifecycle. This reduces complexity and eliminates redundant sync/async APIs.
public class ActorProcess(IActor actor, IMailbox mailbox)
{
private CancellationTokenSource? _cts;
private Task? _executing;
public void Start()
{
if (_executing != null)
{
return;
}
_cts = new CancellationTokenSource();
_executing = Task.Run(() => RunAsync(_cts.Token));
}
public async Task StopAsync()
{
if (_cts == null || _executing == null)
{
return;
}
await _cts.CancelAsync();
try
{
await _executing;
}
catch (OperationCanceledException)
{
// Ignore cancellation exceptions during shutdown
}
_cts.Dispose();
_cts = null;
_executing = null;
}
}
Dropping Sigil for Simplicity and AOT Support
To simplify the library and support Native AOT, I removed the Sigil dependency (which relies on dynamic IL generation). Instead, I am now using C#'s reflection capabilities (MakeGenericMethod and CreateDelegate) to dispatch messages to strongly-typed handlers.
Here is the revised RunAsync method utilizing a concurrent dictionary to cache delegates:
public class ActorProcess(IActor actor, IMailbox mailbox)
{
private static readonly ConcurrentDictionary<
Type,
Func<IActor, IMessage, ValueTask>
> _typedCallHandle = new();
private async Task RunAsync(CancellationToken cancellationToken)
{
await foreach (var message in mailbox.WithCancellation(cancellationToken))
{
if (RuntimeFeature.IsDynamicCodeSupported)
{
var callHandle = _typedCallHandle.GetOrAdd(
message.Payload.GetType(),
CreateCallHandleDelegate
);
await callHandle(actor, message);
}
else
{
await actor.HandleAsync(message.Payload, message.CancellationToken);
}
}
}
private static async ValueTask CallHandle<TMessage>(IActor actor, IMessage message)
{
if (actor is IHandleActorMessage<TMessage> handle)
{
await handle.HandleAsync((TMessage)message.Payload, message.CancellationToken);
}
else
{
await actor.HandleAsync(message.Payload, message.CancellationToken);
}
}
private static readonly MethodInfo s_callHandleMethodInfo = typeof(ActorProcess).GetMethod(
nameof(CallHandle),
BindingFlags.Static | BindingFlags.NonPublic
)!;
private static Func<IActor, IMessage, ValueTask> CreateCallHandleDelegate(Type messageType)
{
var typed = s_callHandleMethodInfo.MakeGenericMethod(messageType);
return typed.CreateDelegate<Func<IActor, IMessage, ValueTask>>();
}
}
This approach ensures efficient typed message handling when dynamic code is supported, while falling back to a generic handler for AOT scenarios.
Implementing Tell and Ask
Now, let’s implement two core messaging primitives in actor systems:
-
Tell: A "Fire-and-forget" mechanism. The sender pushes a message and continues immediately without waiting for a result. -
Ask: A "Request-Response" mechanism. The sender sends a message and waits (asynchronously) for a reply.
The IActorReference Interface
First, we define an interface for actor references that supports both patterns:
public interface IActorReference
{
TResponse Ask<TRequest, TResponse>(TRequest request, TimeSpan? timeout = null)
where TRequest : notnull;
ValueTask<TResponse> AskAsync<TRequest, TResponse>(TRequest request, CancellationToken cancellationToken = default)
where TRequest : notnull;
void Tell<TMessage>(TMessage message, TimeSpan? timeout = null)
where TMessage : notnull;
ValueTask TellAsync<TMessage>(TMessage message, CancellationToken cancellationToken = default)
where TMessage : notnull;
}
Tell – Fire and Forget
Tell is straightforward—it sends a message without waiting for a response.
public class LocalActorReference
{
public void Tell<TMessage>(TMessage message, TimeSpan? timeout = null)
where TMessage : notnull
{
var cts = new CancellationTokenSource();
if (timeout.HasValue)
{
cts.CancelAfter(timeout.Value);
}
try
{
var task = TellAsync(message, cts.Token);
if (task.IsCompletedSuccessfully)
{
return;
}
task.AsTask().GetAwaiter().GetResult();
}
catch (OperationCanceledException ex)
{
throw new TimeoutException(
$"Tell operation timed out after {timeout?.TotalMilliseconds ?? 0} ms.",
ex
);
}
}
public async ValueTask TellAsync<TMessage>(TMessage message, CancellationToken cancellationToken = default)
where TMessage : notnull
{
await mailbox.EnqueueAsync(
new LocalTellMessage(message, CancellationToken.None),
cancellationToken
);
}
}
Ask – Request-Response
Ask is more complex. It requires a temporary mechanism to capture the response. In Trupe, the LocalAskMessage (not shown here) acts as a TaskCompletionSource that the Actor process can complete when processing finishes.
public class LocalActorReference
{
public TResponse Ask<TRequest, TResponse>(TRequest request, TimeSpan? timeout = null)
where TRequest : notnull
{
var cts = new CancellationTokenSource();
if (timeout.HasValue)
{
cts.CancelAfter(timeout.Value);
}
try
{
var result = AskAsync<TRequest, TResponse>(request, cts.Token);
if (result.IsCompletedSuccessfully)
{
return result.Result;
}
return result.AsTask().GetAwaiter().GetResult();
}
catch (OperationCanceledException ex)
{
throw new TimeoutException(
$"Ask operation timed out after {timeout?.TotalMilliseconds ?? 0} ms.",
ex
);
}
}
public async ValueTask<TResponse> AskAsync<TRequest, TResponse>(TRequest request, CancellationToken cancellationToken = default)
where TRequest : notnull
{
var message = new LocalAskMessage(request, cancellationToken);
await mailbox.EnqueueAsync(message, cancellationToken);
var response = await message.AsTask();
if (response is TResponse val)
{
return val;
}
throw new InvalidCastException(
$"Cannot cast response of type {response?.GetType().FullName ?? "null"} to {typeof(TResponse).FullName}."
);
}
}
Closing the Loop: Updating ActorProcess
Finally, we need to update the RunAsync loop in ActorProcess to handle the response.
Trupe uses a context-based approach for responses. The actor handles the message and sets the result on Context.Response. The process loop then grabs this result and completes the Ask promise.
public class ActorProcess(IActor actor, IMailbox mailbox)
{
private async Task RunAsync(CancellationToken cancellationToken)
{
await foreach (var message in mailbox.WithCancellation(cancellationToken))
{
actor.Context.Response = null;
if (RuntimeFeature.IsDynamicCodeSupported)
{
var callHandle = _typedCallHandle.GetOrAdd(
message.Payload.GetType(),
CreateCallHandleDelegate
);
await callHandle(actor, message);
}
else
{
await actor.HandleAsync(message.Payload, message.CancellationToken);
}
if (message is IAskMessage askMessage)
{
askMessage.SetResult(actor.Context.Response);
}
actor.Context.Response = null;
}
}
}
Conclusion
With these changes, Trupe is becoming a robust foundation for building Actor systems in .NET.
- Clarity: Renamed interfaces make the code self-documenting.
- Simplicity: Moving to
IAsyncEnumerableand removingSigildrastically reduces maintenance overhead. - Compatibility: We are now ready for Native AOT.
- Functionality: The
TellandAskimplementations provide the essential communication channels actors need.
In the next article, I will dive into Supervision and how we handle faults in Trupe.
Top comments (0)