DEV Community

Cover image for Trupe: Implementing Actor Model in .NET
Rafael Andrade
Rafael Andrade

Posted on

Trupe: Implementing Actor Model in .NET

The Actor Model has been a powerful paradigm for concurrent and distributed systems since the 1970s, offering a solution to complex synchronization problems through message-passing and isolation. While mature frameworks like Proto.Actor and Akka.NET provide robust implementations, building a minimal version from scratch offers invaluable insights into the underlying mechanics and design decisions.

In this article, I'll walk through my journey of implementing a foundational Actor Model framework in C#, focusing on core concepts with minimal dependencies. This isn't meant to replace production-ready frameworks but to deepen our understanding of how they work under the hood.

Project Setup

We'll target .NET 9.0 & 10.0 and strive to keep our dependencies to a bare minimum. The only third-party library we'll use is Sigil for high-performance runtime code generation.

Actor

The first and most central component is the Actor. This will be the core of our framework. We'll start with an IActor interface. It will have one main method, ReceiveAsync, which is called when the actor receives a message.

I'm also adding a Context property. I won't use it just yet, but it will be crucial later for things like accessing an actor's parent, children, or replying to messages.

public interface IActor
{
    IActorContext? Context { get; set; }

    ValueTask ReceiveAsync(object? message, CancellationToken cancellationToken = default);
}
Enter fullscreen mode Exit fullscreen mode

Now, we have our first big decision: should we support strongly-typed messages? If we do, actors can handle specific message types without needing to cast object. This improves type safety and developer experience.

I've decided to support this by adding a generic interface that inherits from the base IActor:

public interface IActor<in TMessage> : IActor
{
    ValueTask ReceiveAsync(TMessage message, CancellationToken cancellationToken = default);
}
Enter fullscreen mode Exit fullscreen mode

An actor class can now implement IActor<MyMessage1> and IActor<MyMessage2> to handle different messages in different methods.

Message

The next core component is the Message. For now, this will be an internal wrapper that passes the user's message (the Value) and a CancellationToken between the mailbox and the actor.

I'll define an interface and a simple record implementation for local (in-process) messages. In the future, we could create a RemoteMessage implementation for network communication.

public interface IMessage
{
    object? Value { get; }

    CancellationToken CancellationToken { get; }
}

// A simple record for in-process messages
public record LocalMessage(object? Value, CancellationToken CancellationToken = default) : IMessage;
Enter fullscreen mode Exit fullscreen mode

Mailbox

Another essential component is the Mailbox. Each actor will have its own mailbox. When a message is sent to an actor, it's first enqueued in the mailbox. A separate process (which we'll build next) will then dequeue messages one by one and deliver them to the actor. This ensures actors process messages sequentially, which is a key guarantee of the Actor Model.

public interface IMailbox
{
    ValueTask EnqueueAsync(IMessage message, CancellationToken cancellationToken = default);

    ValueTask<IMessage> DequeueAsync(CancellationToken cancellationToken = default);
}
Enter fullscreen mode Exit fullscreen mode

For our implementation, we'll use a Channel from the System.Threading.Channels namespace, which provides an optimized, high-performance producer/consumer queue.

public class ChannelMailBox : IMailbox
{
    private readonly Channel<IMessage> _channel;
    public ChannelMailBox() : this(0) // Default to unbounded
    {

    }

    public ChannelMailBox(int maxSize, BoundedChannelFullMode fullMode = BoundedChannelFullMode.Wait)
    {
        if (maxSize == 0)
        {
            _channel = Channel.CreateUnbounded<IMessage>(new UnboundedChannelOptions
            {
                SingleReader = true, // Only our ActorProcess will read
                SingleWriter = false // Multiple producers can send messages
            });
        }
        else
        {
            _channel = Channel.CreateBounded<IMessage>(new BoundedChannelOptions(maxSize)
            {
                SingleReader = true, // Only our ActorProcess will read
                SingleWriter = false, // Multiple producers can send messages
                FullMode = fullMode // What to do when the mailbox is full
            });
        }
    }

    public async ValueTask EnqueueAsync(IMessage message, CancellationToken cancellationToken = default)
    {
        await _channel.Writer.WriteAsync(message, cancellationToken);
    }

    public async ValueTask<IMessage> DequeueAsync(CancellationToken cancellationToken = default)
    {
        return await _channel.Reader.ReadAsync(cancellationToken);
    }
}
Enter fullscreen mode Exit fullscreen mode

PID or Actor Reference

The Actor Reference (also known as a Process ID or PID) is the handle that external code uses to interact with an actor. You should never hold a direct reference to the actor class itself. All communication must go through the reference. This enforces encapsulation and location transparency (the reference could point to an actor on another machine).

Let's define the interface and a local implementation. For now, it will just have a Send method, which is "fire-and-forget."

// The public-facing interface for interacting with an actor
public interface IActorReference
{
    Uri Name { get; }

    void Send<TMessage>(TMessage message);

    Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken = default);
}
Enter fullscreen mode Exit fullscreen mode

The LocalActorReference implementation's job is simple: wrap the user's message in our IMessage envelope and enqueue it into the actor's mailbox.

public class LocalActorReference(Uri name, IMailbox mailbox) : IActorReference
{
    public Uri Name { get; } = name;

    public void Send<TMessage>(TMessage message)
    {
        mailbox.EnqueueAsync(new LocalMessage(message))
            .GetAwaiter()
            .GetResult();
    }

    public async Task Send<TMessage>(TMessage message, CancellationToken cancellationToken = default)
    {
        await mailbox.EnqueueAsync(new LocalMessage(message), cancellationToken);
    }
}
Enter fullscreen mode Exit fullscreen mode

The Actor Process

This is the final piece of our core system. The ActorProcess is the "engine" or "runner" responsible for an actor's lifecycle. It owns the actor instance and its mailbox. Its job is to run a background task that continuously:

  1. Dequeues a message from the Mailbox.
  2. Delivers the message to the Actor's ReceiveAsync method.
  3. Waits for the next message.
public class ActorProcess(IActor actor, IMailbox mailbox)
{
    private static readonly ConcurrentDictionary<Type, Dictionary<Type, Func<IActor, object, CancellationToken, ValueTask>>> Actions = new();
    private readonly Dictionary<Type, Func<IActor, object, CancellationToken, ValueTask>> _actions = Actions.GetOrAdd(actor.GetType(), CreateActions);
    private CancellationTokenSource? _cancellationTokenSource;

    private Task? _consumer;

    public void Start()
    {
        Stop();

        _cancellationTokenSource = new CancellationTokenSource();
        _consumer = ExecuteAsync(_cancellationTokenSource.Token);

        _consumer = Task.Factory.StartNew(async ct => await ExecuteAsync((CancellationToken)ct), 
            _cancellationTokenSource.Token, 
            _cancellationTokenSource.Token, 
            TaskCreationOptions.LongRunning, 
            TaskScheduler.Default);
    }

    public void Stop()
    {
        if (_cancellationTokenSource != null)
        {
            _cancellationTokenSource.Cancel();
            _cancellationTokenSource.Dispose();
            _cancellationTokenSource = null;
        }


        if (_consumer != null)
        {
            try
            {
                _consumer.GetAwaiter().GetResult();
            }
            catch (OperationCanceledException)
            {

            }

            _consumer = null;
        }
    }

    public async Task StartAsync()
    {
        await StopAsync();

        _cancellationTokenSource = new CancellationTokenSource();
        _consumer = ExecuteAsync(_cancellationTokenSource.Token);

        _consumer = Task.Factory.StartNew(async ct => await ExecuteAsync((CancellationToken)ct), 
            _cancellationTokenSource.Token,
            _cancellationTokenSource.Token,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
    }

    public async Task StopAsync()
    {
        if (_cancellationTokenSource != null)
        {
            await _cancellationTokenSource.CancelAsync();
            _cancellationTokenSource.Dispose();
            _cancellationTokenSource = null;
        }


        if (_consumer != null)
        {
            try
            {
                await _consumer;
            }
            catch (OperationCanceledException)
            {

            }

            _consumer = null;
        }
    }

    private async Task ExecuteAsync(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            try
            {
                var message = await mailbox.DequeueAsync(cancellationToken);
                if (message.Value != null && _actions.TryGetValue(message.Value.GetType(), out var action))
                {
                    await action(actor, message.Value, message.CancellationToken);
                }
                else
                {
                    await actor.ReceiveAsync(message.Value, message.CancellationToken);
                }
            }
            catch (OperationCanceledException)
            {
            }
        }
    }

    private static Dictionary<Type, Func<IActor, object, CancellationToken, ValueTask>> CreateActions(Type actorType)
    {
        var types = actorType.GetInterfaces();
        var actions = new Dictionary<Type, Func<IActor, object, CancellationToken, ValueTask>>();
        foreach (var type in types)
        {
            if (!type.IsGenericType || type.GetGenericTypeDefinition() != typeof(IActor<>))
            {
                continue;
            }

            var method = type.GetMethods()[0];
            var exe = Emit<Func<IActor, object, CancellationToken, ValueTask>>
                .NewDynamicMethod($"Receive{Guid.NewGuid():N}")
                .LoadArgument(0)
                .CastClass(type)
                .LoadArgument(1)
                .CastClass(type.GenericTypeArguments[0])
                .LoadArgument(2)
                .CallVirtual(method)
                .Return();

            actions[type.GenericTypeArguments[0]] = exe.CreateDelegate();
        }

        return actions;
    }   
}
Enter fullscreen mode Exit fullscreen mode

Putting It All Together: A Usage Example

So, how do we use what we've built so far? Let's create a simple actor that handles a SimpleMessage and wire everything together.

var mailbox = new ChannelMailBox();
var process = new ActorProcess(new SimpleActor(), mailbox);
var reference = new LocalActorReference(
  new Uri("localhost:test_with_obj"), 
  mailbox);

process.Start();

var message = new SimpleMessage();
reference.Send(message);

while (!message.Received && !cancellationToken.IsCancellationRequested)
{
    Task.Delay(100, cancellationToken).Wait(cancellationToken);
}

process.Stop();

public class SimpleTypedActor : IActor<SimpleMessage>
{
    public ValueTask ReceiveAsync(SimpleMessage message, CancellationToken cancellationToken = default)
    {
        message.Received = true;
        return ValueTask.CompletedTask;
    }

    public IActorContext? Context { get; set; }
    public ValueTask ReceiveAsync(object? message, CancellationToken cancellationToken = default)
    {
        throw new NotImplementedException();
    }
}

public class SimpleMessage
{
    public bool Received { get; set; }
}
Enter fullscreen mode Exit fullscreen mode

As you can see I have a lot of work, before it's be ready for usage.

Conclusion and Next Steps

We've successfully built the core of an Actor Model framework! We have typed and untyped message reception, a high-performance mailbox, and a dynamic dispatch system for routing messages.

This is just the beginning. A production-ready system would need many more features, such as:

  • Supervision: Strategies for restarting actors when they fail.
  • Remoting: The ability for actors to communicate across network boundaries.
  • Actor Hierarchies: Parent-child relationships for structuring systems.
  • Advanced Scheduling: More control over the threading and execution context.

This implementation provides a solid foundation for understanding how these more advanced features can be built upon the core primitives of Actors, Mailboxes, and References.

References

For inspiration and to see how production-ready frameworks are built, I highly recommend checking out these projects:

https://github.com/asynkron/protoactor-dotnet/
https://github.com/akkadotnet/akka.net

Project Github: https://github.com/lillo42/trupe

Top comments (0)