No artigo anterior, apresentei o Trupe, uma biblioteca leve de modelo de atores para .NET. Desde então, fiz várias melhorias de design para aprimorar a clareza, o desempenho e a experiência do desenvolvedor.
Neste artigo, vou guiá-lo pelas modificações mais recentes e implementar dois padrões de mensagens essenciais: Tell e Ask.
O que mudou no Trupe?
Vou começar falando sobre as mudanças que realizei na biblioteca.
Interface do Ator
No design inicial, debati entre IActor e IActor<TMessage>. Eu queria evitar forçar uma classe base para os desenvolvedores. Após alguma reflexão, refinei a nomenclatura para refletir melhor a intenção.
-
IActor: Esta permanece como a interface marcadora (marker interface) que identifica uma classe como um Ator. -
IHandleActorMessage<TMessage>: Renomeada deIActor<TMessage>. Isso torna explícito que a interface é responsável por lidar com um tipo específico de mensagem. -
HandleAsync: Renomeado deReceiveAsync. "Handle" (lidar/tratar) descreve com mais precisão a ação de processamento.
As interfaces atualizadas ficaram assim:
public interface IActor
{
IActorContext Context { get; }
ValueTask HandleAsync(object? message, CancellationToken cancellationToken = default);
}
public interface IHandleActorMessage<TMessage>
{
ValueTask HandleAsync(TMessage message, CancellationToken cancellationToken = default);
}
Simplificação da Mailbox
Modernizei a interface IMailbox. Em vez de um método tradicional de Dequeue, a mailbox agora implementa IAsyncEnumerable<IMessage>. Isso simplifica o loop de consumo no ActorProcess, permitindo usarmos o elegante padrão await foreach.
public interface IMailbox : IAsyncEnumerable<IMessage>
{
ValueTask EnqueueAsync(IMessage message, CancellationToken cancellationToken = default);
}
Melhorias no Processo do Ator
Estratégia de Execução de Tasks
Anteriormente, eu iniciava o loop do ator usando TaskCreationOptions.LongRunning. No entanto, pesquisas destacaram que isso força a criação de uma nova thread fora do ThreadPool. Para uma biblioteca como esta, isso não é ideal; queremos aproveitar a eficiência do ThreadPool e evitar a exaustão de threads. Também quero evitar capturar o SynchronizationContext desnecessariamente.
Gerenciamento de Ciclo de Vida
Refinei a API de start/stop para que apenas o supervisor controle o ciclo de vida do ator. Isso reduz a complexidade e elimina APIs redundantes de sincronização/assincronia.
public class ActorProcess(IActor actor, IMailbox mailbox)
{
private CancellationTokenSource? _cts;
private Task? _executing;
public void Start()
{
if (_executing != null)
{
return;
}
_cts = new CancellationTokenSource();
_executing = Task.Run(() => RunAsync(_cts.Token));
}
public async Task StopAsync()
{
if (_cts == null || _executing == null)
{
return;
}
await _cts.CancelAsync();
try
{
await _executing;
}
catch (OperationCanceledException)
{
// Ignora exceções de cancelamento durante o desligamento
}
_cts.Dispose();
_cts = null;
_executing = null;
}
}
Removendo Sigil por Simplicidade e Suporte a AOT
Para simplificar a biblioteca e suportar Native AOT, removi a dependência do Sigil (que depende de geração dinâmica de IL). Em vez disso, agora estou usando as capacidades de reflexão do C# (MakeGenericMethod e CreateDelegate) para despachar mensagens para manipuladores fortemente tipados.
Aqui está o método RunAsync revisado utilizando um dicionário concorrente para fazer cache dos delegates:
public class ActorProcess(IActor actor, IMailbox mailbox)
{
private static readonly ConcurrentDictionary<
Type,
Func<IActor, IMessage, ValueTask>
> _typedCallHandle = new();
private async Task RunAsync(CancellationToken cancellationToken)
{
await foreach (var message in mailbox.WithCancellation(cancellationToken))
{
if (RuntimeFeature.IsDynamicCodeSupported)
{
var callHandle = _typedCallHandle.GetOrAdd(
message.Payload.GetType(),
CreateCallHandleDelegate
);
await callHandle(actor, message);
}
else
{
await actor.HandleAsync(message.Payload, message.CancellationToken);
}
}
}
private static async ValueTask CallHandle<TMessage>(IActor actor, IMessage message)
{
if (actor is IHandleActorMessage<TMessage> handle)
{
await handle.HandleAsync((TMessage)message.Payload, message.CancellationToken);
}
else
{
await actor.HandleAsync(message.Payload, message.CancellationToken);
}
}
private static readonly MethodInfo s_callHandleMethodInfo = typeof(ActorProcess).GetMethod(
nameof(CallHandle),
BindingFlags.Static | BindingFlags.NonPublic
)!;
private static Func<IActor, IMessage, ValueTask> CreateCallHandleDelegate(Type messageType)
{
var typed = s_callHandleMethodInfo.MakeGenericMethod(messageType);
return typed.CreateDelegate<Func<IActor, IMessage, ValueTask>>();
}
}
Essa abordagem garante um manuseio eficiente de mensagens tipadas quando código dinâmico é suportado, enquanto recorre a um manipulador genérico para cenários AOT.
Implementando Tell e Ask
Agora, vamos implementar dois primitivos de mensagens centrais em sistemas de atores:
-
Tell: Um mecanismo "Fire-and-forget" (dispare e esqueça). O remetente envia uma mensagem e continua imediatamente sem esperar por um resultado. -
Ask: Um mecanismo "Request-Response" (solicitação-resposta). O remetente envia uma mensagem e aguarda (assincronamente) por uma resposta.
A Interface IActorReference
Primeiro, definimos uma interface para referências de atores que suporta ambos os padrões:
public interface IActorReference
{
TResponse Ask<TRequest, TResponse>(TRequest request, TimeSpan? timeout = null)
where TRequest : notnull;
ValueTask<TResponse> AskAsync<TRequest, TResponse>(TRequest request, CancellationToken cancellationToken = default)
where TRequest : notnull;
void Tell<TMessage>(TMessage message, TimeSpan? timeout = null)
where TMessage : notnull;
ValueTask TellAsync<TMessage>(TMessage message, CancellationToken cancellationToken = default)
where TMessage : notnull;
}
Tell – Fire and Forget
Tell é direto — ele envia uma mensagem sem esperar por uma resposta.
public class LocalActorReference
{
public void Tell<TMessage>(TMessage message, TimeSpan? timeout = null)
where TMessage : notnull
{
var cts = new CancellationTokenSource();
if (timeout.HasValue)
{
cts.CancelAfter(timeout.Value);
}
try
{
var task = TellAsync(message, cts.Token);
if (task.IsCompletedSuccessfully)
{
return;
}
task.AsTask().GetAwaiter().GetResult();
}
catch (OperationCanceledException ex)
{
throw new TimeoutException(
$"Tell operation timed out after {timeout?.TotalMilliseconds ?? 0} ms.",
ex
);
}
}
public async ValueTask TellAsync<TMessage>(TMessage message, CancellationToken cancellationToken = default)
where TMessage : notnull
{
await mailbox.EnqueueAsync(
new LocalTellMessage(message, CancellationToken.None),
cancellationToken
);
}
}
Ask – Request-Response
Ask é mais complexo. Requer um mecanismo temporário para capturar a resposta. No Trupe, a LocalAskMessage (não mostrada aqui) atua como um TaskCompletionSource que o processo do Ator pode completar quando o processamento terminar.
public class LocalActorReference
{
public TResponse Ask<TRequest, TResponse>(TRequest request, TimeSpan? timeout = null)
where TRequest : notnull
{
var cts = new CancellationTokenSource();
if (timeout.HasValue)
{
cts.CancelAfter(timeout.Value);
}
try
{
var result = AskAsync<TRequest, TResponse>(request, cts.Token);
if (result.IsCompletedSuccessfully)
{
return result.Result;
}
return result.AsTask().GetAwaiter().GetResult();
}
catch (OperationCanceledException ex)
{
throw new TimeoutException(
$"Ask operation timed out after {timeout?.TotalMilliseconds ?? 0} ms.",
ex
);
}
}
public async ValueTask<TResponse> AskAsync<TRequest, TResponse>(TRequest request, CancellationToken cancellationToken = default)
where TRequest : notnull
{
var message = new LocalAskMessage(request, cancellationToken);
await mailbox.EnqueueAsync(message, cancellationToken);
var response = await message.AsTask();
if (response is TResponse val)
{
return val;
}
throw new InvalidCastException(
$"Cannot cast response of type {response?.GetType().FullName ?? "null"} to {typeof(TResponse).FullName}."
);
}
}
Fechando o Ciclo: Atualizando o ActorProcess
Finalmente, precisamos atualizar o loop RunAsync no ActorProcess para lidar com a resposta.
O Trupe usa uma abordagem baseada em contexto para respostas. O ator processa a mensagem e define o resultado em Context.Response. O loop do processo então pega esse resultado e completa a promise do Ask.
public class ActorProcess(IActor actor, IMailbox mailbox)
{
private async Task RunAsync(CancellationToken cancellationToken)
{
await foreach (var message in mailbox.WithCancellation(cancellationToken))
{
actor.Context.Response = null;
if (RuntimeFeature.IsDynamicCodeSupported)
{
var callHandle = _typedCallHandle.GetOrAdd(
message.Payload.GetType(),
CreateCallHandleDelegate
);
await callHandle(actor, message);
}
else
{
await actor.HandleAsync(message.Payload, message.CancellationToken);
}
if (message is IAskMessage askMessage)
{
askMessage.SetResult(actor.Context.Response);
}
actor.Context.Response = null;
}
}
}
Conclusão
Com essas mudanças, o Trupe está se tornando uma base robusta para construir sistemas de Atores em .NET.
- Clareza: Interfaces renomeadas tornam o código autoexplicativo.
- Simplicidade: Mover para IAsyncEnumerable e remover o Sigil reduz drasticamente a sobrecarga de manutenção.
- Compatibilidade: Agora estamos prontos para Native AOT.
- Funcionalidade: As implementações de
TelleAskfornecem os canais de comunicação essenciais que os atores precisam.
No próximo artigo, mergulharei em Supervisão e como lidamos com falhas no Trupe.
Top comments (0)