O Modelo de Atores tem sido um paradigma poderoso para sistemas concorrentes e distribuídos desde a década de 1970, oferecendo uma solução para problemas complexos de sincronização através de passagem de mensagens e isolamento. Embora frameworks maduros como Proto.Actor e Akka.NET forneçam implementações robustas, construir uma versão mínima do zero oferece insights inestimáveis sobre os mecanismos subjacentes e decisões de design.
Neste artigo, vou explicar minha jornada implementando um framework básico do Modelo de Atores em C#, focando nos conceitos fundamentais com dependências mínimas. Este não é um substituto para frameworks prontos para produção, mas sim uma forma de aprofundar nosso entendimento sobre como eles funcionam internamente.
Configuração do Projeto
Vamos direcionar para .NET 9.0 e 10.0 e nos esforçar para manter nossas dependências no mínimo possível. A única biblioteca de terceiros que usaremos é o Sigil para geração de código em tempo de execução de alto desempenho.
Ator
O primeiro e mais central componente é o Ator. Este será o núcleo do nosso framework. Começaremos com uma interface IAtor. Ela terá um método principal, ReceiveAsync, que é chamado quando o ator recebe uma mensagem.
Também estou adicionando uma propriedade Context. Não vou usá-la ainda, mas será crucial mais tarde para coisas como acessar o pai de um ator, filhos ou responder a mensagens.
public interface IActor
{
IActorContext? Context { get; set; }
ValueTask ReceiveAsync(object? message, CancellationToken cancellationToken = default);
}
Agora, temos nossa primeira grande decisão: devemos suportar mensagens fortemente tipadas? Se fizermos isso, os atores podem lidar com tipos específicos de mensagens sem precisar fazer casting de objetos. Isso melhora a segurança de tipos e a experiência do desenvolvedor.
Decidi suportar isso adicionando uma interface genérica que herda da IActor base:
public interface IActor<in TMessage> : IActor
{
ValueTask ReceiveAsync(TMessage message, CancellationToken cancellationToken = default);
}
Uma classe de ator agora pode implementar IActor<MinhaMensagem1> e IActor<MinhaMensagem2> para lidar com diferentes mensagens em métodos diferentes.
Mensagem
O próximo componente fundamental é a Mensagem. Por enquanto, será um wrapper interno que passa a mensagem do usuário (o Value) e um CancellationToken entre a caixa de correio e o ator.
Vou definir uma interface e uma implementação simples de registro para mensagens locais (dentro do processo). No futuro, poderíamos criar uma implementação RemoteMessage para comunicação de rede.
public interface IMessage
{
object? Value { get; }
CancellationToken CancellationToken { get; }
}
// Um registro simples para mensagens dentro do processo
public record LocalMessage(object? Value, CancellationToken CancellationToken = default) : IMessage;
Caixa de Correio (Mailbox)
Outro componente essencial é a Caixa de Correio. Cada ator terá sua própria caixa de correio. Quando uma mensagem é enviada para um ator, ela é primeiro enfileirada na caixa de correio. Um processo separado (que construiremos em seguida) irá então remover as mensagens uma por uma e entregá-las ao ator. Isso garante que os atores processem mensagens sequencialmente, o que é uma garantia fundamental do Modelo de Atores.
public interface IMailbox
{
ValueTask EnqueueAsync(IMessage message, CancellationToken cancellationToken = default);
ValueTask<IMessage> DequeueAsync(CancellationToken cancellationToken = default);
}
Para nossa implementação, usaremos um Channel do namespace System.Threading.Channels, que fornece uma fila produtor/consumidor otimizada e de alto desempenho.
public class ChannelMailBox : IMailbox
{
private readonly Channel<IMessage> _channel;
public ChannelMailBox() : this(0) // Padrão para ilimitado
{
}
public ChannelMailBox(int maxSize, BoundedChannelFullMode fullMode = BoundedChannelFullMode.Wait)
{
if (maxSize == 0)
{
_channel = Channel.CreateUnbounded<IMessage>(new UnboundedChannelOptions
{
SingleReader = true, // Somente nosso ActorProcess lerá
SingleWriter = false // Múltiplos produtores podem enviar mensagens
});
}
else
{
_channel = Channel.CreateBounded<IMessage>(new BoundedChannelOptions(maxSize)
{
SingleReader = true, // Somente nosso ActorProcess lerá
SingleWriter = false, // Múltiplos produtores podem enviar mensagens
FullMode = fullMode // O que fazer quando a caixa de correio está cheia
});
}
}
public async ValueTask EnqueueAsync(IMessage message, CancellationToken cancellationToken = default)
{
await _channel.Writer.WriteAsync(message, cancellationToken);
}
public async ValueTask<IMessage> DequeueAsync(CancellationToken cancellationToken = default)
{
return await _channel.Reader.ReadAsync(cancellationToken);
}
}
PID ou Referência do Ator
A Referência do Ator (também conhecida como ID de Processo ou PID) é o identificador que o código externo usa para interagir com um ator. Você nunca deve manter uma referência direta à própria classe do ator. Toda comunicação deve passar pela referência. Isso reforça o encapsulamento e a transparência de localização (a referência pode apontar para um ator em outra máquina).
Vamos definir a interface e uma implementação local. Por enquanto, terá apenas um método Send, que é "fire-and-forget" (envia e esquece).
// A interface pública para interagir com um ator
public interface IActorReference
{
Uri Name { get; }
void Send<TMessage>(TMessage message);
Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken = default);
}
O trabalho da implementação LocalActorReference é simples: encapsular a mensagem do usuário no nosso envelope IMessage e enfileirá-la na caixa de correio do ator.
public class LocalActorReference(Uri name, IMailbox mailbox) : IActorReference
{
public Uri Name { get; } = name;
public void Send<TMessage>(TMessage message)
{
mailbox.EnqueueAsync(new LocalMessage(message))
.GetAwaiter()
.GetResult();
}
public async Task Send<TMessage>(TMessage message, CancellationToken cancellationToken = default)
{
await mailbox.EnqueueAsync(new LocalMessage(message), cancellationToken);
}
}
O Processo do Ator
Esta é a peça final do nosso sistema central. O ActorProcess é o "motor" ou "executor" responsável pelo ciclo de vida de um ator. Ele possui a instância do ator e sua caixa de correio. Seu trabalho é executar uma tarefa em segundo plano que continuamente:
- Remove uma mensagem da Caixa de Correio.
- Entrega a mensagem ao método
ReceiveAsyncdo Ator. - Aguarda a próxima mensagem.
public class ActorProcess(IActor actor, IMailbox mailbox)
{
private static readonly ConcurrentDictionary<Type, Dictionary<Type, Func<IActor, object, CancellationToken, ValueTask>>> Actions = new();
private readonly Dictionary<Type, Func<IActor, object, CancellationToken, ValueTask>> _actions = Actions.GetOrAdd(actor.GetType(), CreateActions);
private CancellationTokenSource? _cancellationTokenSource;
private Task? _consumer;
public void Start()
{
Stop();
_cancellationTokenSource = new CancellationTokenSource();
_consumer = ExecuteAsync(_cancellationTokenSource.Token);
_consumer = Task.Factory.StartNew(async ct => await ExecuteAsync((CancellationToken)ct),
_cancellationTokenSource.Token,
_cancellationTokenSource.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
public void Stop()
{
if (_cancellationTokenSource != null)
{
_cancellationTokenSource.Cancel();
_cancellationTokenSource.Dispose();
_cancellationTokenSource = null;
}
if (_consumer != null)
{
try
{
_consumer.GetAwaiter().GetResult();
}
catch (OperationCanceledException)
{
}
_consumer = null;
}
}
public async Task StartAsync()
{
await StopAsync();
_cancellationTokenSource = new CancellationTokenSource();
_consumer = ExecuteAsync(_cancellationTokenSource.Token);
_consumer = Task.Factory.StartNew(async ct => await ExecuteAsync((CancellationToken)ct),
_cancellationTokenSource.Token,
_cancellationTokenSource.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
public async Task StopAsync()
{
if (_cancellationTokenSource != null)
{
await _cancellationTokenSource.CancelAsync();
_cancellationTokenSource.Dispose();
_cancellationTokenSource = null;
}
if (_consumer != null)
{
try
{
await _consumer;
}
catch (OperationCanceledException)
{
}
_consumer = null;
}
}
private async Task ExecuteAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
var message = await mailbox.DequeueAsync(cancellationToken);
if (message.Value != null && _actions.TryGetValue(message.Value.GetType(), out var action))
{
await action(actor, message.Value, message.CancellationToken);
}
else
{
await actor.ReceiveAsync(message.Value, message.CancellationToken);
}
}
catch (OperationCanceledException)
{
}
}
}
private static Dictionary<Type, Func<IActor, object, CancellationToken, ValueTask>> CreateActions(Type actorType)
{
var types = actorType.GetInterfaces();
var actions = new Dictionary<Type, Func<IActor, object, CancellationToken, ValueTask>>();
foreach (var type in types)
{
if (!type.IsGenericType || type.GetGenericTypeDefinition() != typeof(IActor<>))
{
continue;
}
var method = type.GetMethods()[0];
var exe = Emit<Func<IActor, object, CancellationToken, ValueTask>>
.NewDynamicMethod($"Receive{Guid.NewGuid():N}")
.LoadArgument(0)
.CastClass(type)
.LoadArgument(1)
.CastClass(type.GenericTypeArguments[0])
.LoadArgument(2)
.CallVirtual(method)
.Return();
actions[type.GenericTypeArguments[0]] = exe.CreateDelegate();
}
return actions;
}
}
Juntando Tudo: Um Exemplo de Uso
Então, como usamos o que construímos até agora? Vamos criar um ator simples que lida com uma SimpleMessage e conectar tudo.
var mailbox = new ChannelMailBox();
var process = new ActorProcess(new SimpleActor(), mailbox);
var reference = new LocalActorReference(
new Uri("localhost:test_with_obj"),
mailbox);
process.Start();
var message = new SimpleMessage();
reference.Send(message);
while (!message.Received && !cancellationToken.IsCancellationRequested)
{
Task.Delay(100, cancellationToken).Wait(cancellationToken);
}
process.Stop();
public class SimpleTypedActor : IActor<SimpleMessage>
{
public ValueTask ReceiveAsync(SimpleMessage message, CancellationToken cancellationToken = default)
{
message.Received = true;
return ValueTask.CompletedTask;
}
public IActorContext? Context { get; set; }
public ValueTask ReceiveAsync(object? message, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
}
public class SimpleMessage
{
public bool Received { get; set; }
}
Como podem ver, tenho muito trabalho pela frente antes que esteja pronto para uso.
Conclusão e Próximos Passos
Construímos com sucesso o núcleo de um framework do Modelo de Atores! Temos recepção de mensagens tipadas e não tipadas, uma caixa de correio de alto desempenho e um sistema de despacho dinâmico para rotear mensagens.
Este é apenas o começo. Um sistema pronto para produção precisaria de muitos mais recursos, como:
- Supervisão: Estratégias para reiniciar atores quando falham.
- Remoting: A capacidade de atores se comunicarem através de limites de rede.
- Hierarquias de Atores: Relacionamentos pai-filho para estruturar sistemas.
- Agendamento Avançado: Mais controle sobre o threading e contexto de execução.
Esta implementação fornece uma base sólida para entender como esses recursos mais avançados podem ser construídos sobre os primitivos fundamentais de Atores, Caixas de Correio e Referências.
Referências
Para inspiração e para ver como frameworks prontos para produção são construídos, recomendo verificar estes projetos:
https://github.com/asynkron/protoactor-dotnet/
https://github.com/akkadotnet/akka.net
Github do projeto: https://github.com/lillo42/trupe
Top comments (0)