DEV Community

Cover image for Trupe: Implementando o Modelo de Atores no .NET
Rafael Andrade
Rafael Andrade

Posted on

Trupe: Implementando o Modelo de Atores no .NET

O Modelo de Atores tem sido um paradigma poderoso para sistemas concorrentes e distribuídos desde a década de 1970, oferecendo uma solução para problemas complexos de sincronização através de passagem de mensagens e isolamento. Embora frameworks maduros como Proto.Actor e Akka.NET forneçam implementações robustas, construir uma versão mínima do zero oferece insights inestimáveis sobre os mecanismos subjacentes e decisões de design.

Neste artigo, vou explicar minha jornada implementando um framework básico do Modelo de Atores em C#, focando nos conceitos fundamentais com dependências mínimas. Este não é um substituto para frameworks prontos para produção, mas sim uma forma de aprofundar nosso entendimento sobre como eles funcionam internamente.

Configuração do Projeto

Vamos direcionar para .NET 9.0 e 10.0 e nos esforçar para manter nossas dependências no mínimo possível. A única biblioteca de terceiros que usaremos é o Sigil para geração de código em tempo de execução de alto desempenho.

Ator

O primeiro e mais central componente é o Ator. Este será o núcleo do nosso framework. Começaremos com uma interface IAtor. Ela terá um método principal, ReceiveAsync, que é chamado quando o ator recebe uma mensagem.

Também estou adicionando uma propriedade Context. Não vou usá-la ainda, mas será crucial mais tarde para coisas como acessar o pai de um ator, filhos ou responder a mensagens.

public interface IActor
{
    IActorContext? Context { get; set; }

    ValueTask ReceiveAsync(object? message, CancellationToken cancellationToken = default);
}
Enter fullscreen mode Exit fullscreen mode

Agora, temos nossa primeira grande decisão: devemos suportar mensagens fortemente tipadas? Se fizermos isso, os atores podem lidar com tipos específicos de mensagens sem precisar fazer casting de objetos. Isso melhora a segurança de tipos e a experiência do desenvolvedor.

Decidi suportar isso adicionando uma interface genérica que herda da IActor base:

public interface IActor<in TMessage> : IActor
{
    ValueTask ReceiveAsync(TMessage message, CancellationToken cancellationToken = default);
}
Enter fullscreen mode Exit fullscreen mode

Uma classe de ator agora pode implementar IActor<MinhaMensagem1> e IActor<MinhaMensagem2> para lidar com diferentes mensagens em métodos diferentes.

Mensagem

O próximo componente fundamental é a Mensagem. Por enquanto, será um wrapper interno que passa a mensagem do usuário (o Value) e um CancellationToken entre a caixa de correio e o ator.

Vou definir uma interface e uma implementação simples de registro para mensagens locais (dentro do processo). No futuro, poderíamos criar uma implementação RemoteMessage para comunicação de rede.

public interface IMessage
{
    object? Value { get; }

    CancellationToken CancellationToken { get; }
}

// Um registro simples para mensagens dentro do processo
public record LocalMessage(object? Value, CancellationToken CancellationToken = default) : IMessage;
Enter fullscreen mode Exit fullscreen mode

Caixa de Correio (Mailbox)

Outro componente essencial é a Caixa de Correio. Cada ator terá sua própria caixa de correio. Quando uma mensagem é enviada para um ator, ela é primeiro enfileirada na caixa de correio. Um processo separado (que construiremos em seguida) irá então remover as mensagens uma por uma e entregá-las ao ator. Isso garante que os atores processem mensagens sequencialmente, o que é uma garantia fundamental do Modelo de Atores.

public interface IMailbox
{
    ValueTask EnqueueAsync(IMessage message, CancellationToken cancellationToken = default);

    ValueTask<IMessage> DequeueAsync(CancellationToken cancellationToken = default);
}
Enter fullscreen mode Exit fullscreen mode

Para nossa implementação, usaremos um Channel do namespace System.Threading.Channels, que fornece uma fila produtor/consumidor otimizada e de alto desempenho.

public class ChannelMailBox : IMailbox
{
    private readonly Channel<IMessage> _channel;
    public ChannelMailBox() : this(0) // Padrão para ilimitado
    {

    }

    public ChannelMailBox(int maxSize, BoundedChannelFullMode fullMode = BoundedChannelFullMode.Wait)
    {
        if (maxSize == 0)
        {
            _channel = Channel.CreateUnbounded<IMessage>(new UnboundedChannelOptions
            {
                SingleReader = true, // Somente nosso ActorProcess lerá
                SingleWriter = false // Múltiplos produtores podem enviar mensagens
            });
        }
        else
        {
            _channel = Channel.CreateBounded<IMessage>(new BoundedChannelOptions(maxSize)
            {
                SingleReader = true, // Somente nosso ActorProcess lerá
                SingleWriter = false, // Múltiplos produtores podem enviar mensagens
                FullMode = fullMode // O que fazer quando a caixa de correio está cheia
            });
        }
    }

    public async ValueTask EnqueueAsync(IMessage message, CancellationToken cancellationToken = default)
    {
        await _channel.Writer.WriteAsync(message, cancellationToken);
    }

    public async ValueTask<IMessage> DequeueAsync(CancellationToken cancellationToken = default)
    {
        return await _channel.Reader.ReadAsync(cancellationToken);
    }
}
Enter fullscreen mode Exit fullscreen mode

PID ou Referência do Ator

A Referência do Ator (também conhecida como ID de Processo ou PID) é o identificador que o código externo usa para interagir com um ator. Você nunca deve manter uma referência direta à própria classe do ator. Toda comunicação deve passar pela referência. Isso reforça o encapsulamento e a transparência de localização (a referência pode apontar para um ator em outra máquina).

Vamos definir a interface e uma implementação local. Por enquanto, terá apenas um método Send, que é "fire-and-forget" (envia e esquece).

// A interface pública para interagir com um ator
public interface IActorReference
{
    Uri Name { get; }

    void Send<TMessage>(TMessage message);

    Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken = default);
}
Enter fullscreen mode Exit fullscreen mode

O trabalho da implementação LocalActorReference é simples: encapsular a mensagem do usuário no nosso envelope IMessage e enfileirá-la na caixa de correio do ator.

public class LocalActorReference(Uri name, IMailbox mailbox) : IActorReference
{
    public Uri Name { get; } = name;

    public void Send<TMessage>(TMessage message)
    {
        mailbox.EnqueueAsync(new LocalMessage(message))
            .GetAwaiter()
            .GetResult();
    }

    public async Task Send<TMessage>(TMessage message, CancellationToken cancellationToken = default)
    {
        await mailbox.EnqueueAsync(new LocalMessage(message), cancellationToken);
    }
}
Enter fullscreen mode Exit fullscreen mode

O Processo do Ator

Esta é a peça final do nosso sistema central. O ActorProcess é o "motor" ou "executor" responsável pelo ciclo de vida de um ator. Ele possui a instância do ator e sua caixa de correio. Seu trabalho é executar uma tarefa em segundo plano que continuamente:

  1. Remove uma mensagem da Caixa de Correio.
  2. Entrega a mensagem ao método ReceiveAsync do Ator.
  3. Aguarda a próxima mensagem.
public class ActorProcess(IActor actor, IMailbox mailbox)
{
    private static readonly ConcurrentDictionary<Type, Dictionary<Type, Func<IActor, object, CancellationToken, ValueTask>>> Actions = new();
    private readonly Dictionary<Type, Func<IActor, object, CancellationToken, ValueTask>> _actions = Actions.GetOrAdd(actor.GetType(), CreateActions);
    private CancellationTokenSource? _cancellationTokenSource;

    private Task? _consumer;

    public void Start()
    {
        Stop();

        _cancellationTokenSource = new CancellationTokenSource();
        _consumer = ExecuteAsync(_cancellationTokenSource.Token);

        _consumer = Task.Factory.StartNew(async ct => await ExecuteAsync((CancellationToken)ct), 
            _cancellationTokenSource.Token, 
            _cancellationTokenSource.Token, 
            TaskCreationOptions.LongRunning, 
            TaskScheduler.Default);
    }

    public void Stop()
    {
        if (_cancellationTokenSource != null)
        {
            _cancellationTokenSource.Cancel();
            _cancellationTokenSource.Dispose();
            _cancellationTokenSource = null;
        }


        if (_consumer != null)
        {
            try
            {
                _consumer.GetAwaiter().GetResult();
            }
            catch (OperationCanceledException)
            {

            }

            _consumer = null;
        }
    }

    public async Task StartAsync()
    {
        await StopAsync();

        _cancellationTokenSource = new CancellationTokenSource();
        _consumer = ExecuteAsync(_cancellationTokenSource.Token);

        _consumer = Task.Factory.StartNew(async ct => await ExecuteAsync((CancellationToken)ct), 
            _cancellationTokenSource.Token,
            _cancellationTokenSource.Token,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
    }

    public async Task StopAsync()
    {
        if (_cancellationTokenSource != null)
        {
            await _cancellationTokenSource.CancelAsync();
            _cancellationTokenSource.Dispose();
            _cancellationTokenSource = null;
        }


        if (_consumer != null)
        {
            try
            {
                await _consumer;
            }
            catch (OperationCanceledException)
            {

            }

            _consumer = null;
        }
    }

    private async Task ExecuteAsync(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            try
            {
                var message = await mailbox.DequeueAsync(cancellationToken);
                if (message.Value != null && _actions.TryGetValue(message.Value.GetType(), out var action))
                {
                    await action(actor, message.Value, message.CancellationToken);
                }
                else
                {
                    await actor.ReceiveAsync(message.Value, message.CancellationToken);
                }
            }
            catch (OperationCanceledException)
            {
            }
        }
    }

    private static Dictionary<Type, Func<IActor, object, CancellationToken, ValueTask>> CreateActions(Type actorType)
    {
        var types = actorType.GetInterfaces();
        var actions = new Dictionary<Type, Func<IActor, object, CancellationToken, ValueTask>>();
        foreach (var type in types)
        {
            if (!type.IsGenericType || type.GetGenericTypeDefinition() != typeof(IActor<>))
            {
                continue;
            }

            var method = type.GetMethods()[0];
            var exe = Emit<Func<IActor, object, CancellationToken, ValueTask>>
                .NewDynamicMethod($"Receive{Guid.NewGuid():N}")
                .LoadArgument(0)
                .CastClass(type)
                .LoadArgument(1)
                .CastClass(type.GenericTypeArguments[0])
                .LoadArgument(2)
                .CallVirtual(method)
                .Return();

            actions[type.GenericTypeArguments[0]] = exe.CreateDelegate();
        }

        return actions;
    }   
}
Enter fullscreen mode Exit fullscreen mode

Juntando Tudo: Um Exemplo de Uso

Então, como usamos o que construímos até agora? Vamos criar um ator simples que lida com uma SimpleMessage e conectar tudo.

var mailbox = new ChannelMailBox();
var process = new ActorProcess(new SimpleActor(), mailbox);
var reference = new LocalActorReference(
  new Uri("localhost:test_with_obj"), 
  mailbox);

process.Start();

var message = new SimpleMessage();
reference.Send(message);

while (!message.Received && !cancellationToken.IsCancellationRequested)
{
    Task.Delay(100, cancellationToken).Wait(cancellationToken);
}

process.Stop();

public class SimpleTypedActor : IActor<SimpleMessage>
{
    public ValueTask ReceiveAsync(SimpleMessage message, CancellationToken cancellationToken = default)
    {
        message.Received = true;
        return ValueTask.CompletedTask;
    }

    public IActorContext? Context { get; set; }
    public ValueTask ReceiveAsync(object? message, CancellationToken cancellationToken = default)
    {
        throw new NotImplementedException();
    }
}

public class SimpleMessage
{
    public bool Received { get; set; }
}
Enter fullscreen mode Exit fullscreen mode

Como podem ver, tenho muito trabalho pela frente antes que esteja pronto para uso.

Conclusão e Próximos Passos

Construímos com sucesso o núcleo de um framework do Modelo de Atores! Temos recepção de mensagens tipadas e não tipadas, uma caixa de correio de alto desempenho e um sistema de despacho dinâmico para rotear mensagens.

Este é apenas o começo. Um sistema pronto para produção precisaria de muitos mais recursos, como:

  • Supervisão: Estratégias para reiniciar atores quando falham.
  • Remoting: A capacidade de atores se comunicarem através de limites de rede.
  • Hierarquias de Atores: Relacionamentos pai-filho para estruturar sistemas.
  • Agendamento Avançado: Mais controle sobre o threading e contexto de execução.

Esta implementação fornece uma base sólida para entender como esses recursos mais avançados podem ser construídos sobre os primitivos fundamentais de Atores, Caixas de Correio e Referências.

Referências

Para inspiração e para ver como frameworks prontos para produção são construídos, recomendo verificar estes projetos:

https://github.com/asynkron/protoactor-dotnet/

https://github.com/akkadotnet/akka.net

Github do projeto: https://github.com/lillo42/trupe

Top comments (0)