DEV Community

Cover image for Implementando Tell e Ask no Trupe — Biblioteca de Modelo de Atores em .NET
Rafael Andrade
Rafael Andrade

Posted on

Implementando Tell e Ask no Trupe — Biblioteca de Modelo de Atores em .NET

No artigo anterior, apresentei o Trupe, uma biblioteca leve de modelo de atores para .NET. Desde então, fiz várias melhorias de design para aprimorar a clareza, o desempenho e a experiência do desenvolvedor.

Neste artigo, vou guiá-lo pelas modificações mais recentes e implementar dois padrões de mensagens essenciais: Tell e Ask.

O que mudou no Trupe?

Vou começar falando sobre as mudanças que realizei na biblioteca.

Interface do Ator

No design inicial, debati entre IActor e IActor<TMessage>. Eu queria evitar forçar uma classe base para os desenvolvedores. Após alguma reflexão, refinei a nomenclatura para refletir melhor a intenção.

  • IActor: Esta permanece como a interface marcadora (marker interface) que identifica uma classe como um Ator.
  • IHandleActorMessage<TMessage>: Renomeada de IActor<TMessage>. Isso torna explícito que a interface é responsável por lidar com um tipo específico de mensagem.
  • HandleAsync: Renomeado de ReceiveAsync. "Handle" (lidar/tratar) descreve com mais precisão a ação de processamento.

As interfaces atualizadas ficaram assim:

public interface IActor
{
    IActorContext Context { get; }

    ValueTask HandleAsync(object? message, CancellationToken cancellationToken = default);
}

public interface IHandleActorMessage<TMessage>
{
    ValueTask HandleAsync(TMessage message, CancellationToken cancellationToken = default);
}
Enter fullscreen mode Exit fullscreen mode

Simplificação da Mailbox

Modernizei a interface IMailbox. Em vez de um método tradicional de Dequeue, a mailbox agora implementa IAsyncEnumerable<IMessage>. Isso simplifica o loop de consumo no ActorProcess, permitindo usarmos o elegante padrão await foreach.

public interface IMailbox : IAsyncEnumerable<IMessage>
{
    ValueTask EnqueueAsync(IMessage message, CancellationToken cancellationToken = default);
}
Enter fullscreen mode Exit fullscreen mode

Melhorias no Processo do Ator

Estratégia de Execução de Tasks

Anteriormente, eu iniciava o loop do ator usando TaskCreationOptions.LongRunning. No entanto, pesquisas destacaram que isso força a criação de uma nova thread fora do ThreadPool. Para uma biblioteca como esta, isso não é ideal; queremos aproveitar a eficiência do ThreadPool e evitar a exaustão de threads. Também quero evitar capturar o SynchronizationContext desnecessariamente.

Gerenciamento de Ciclo de Vida

Refinei a API de start/stop para que apenas o supervisor controle o ciclo de vida do ator. Isso reduz a complexidade e elimina APIs redundantes de sincronização/assincronia.

public class ActorProcess(IActor actor, IMailbox mailbox)
{
    private CancellationTokenSource? _cts;
    private Task? _executing;

    public void Start()
    {
        if (_executing != null)
        {
            return;
        }

        _cts = new CancellationTokenSource();
        _executing = Task.Run(() => RunAsync(_cts.Token));
    }


    public async Task StopAsync()
    {
        if (_cts == null || _executing == null)
        {
            return;
        }

        await _cts.CancelAsync();
        try
        {
            await _executing;
        }
        catch (OperationCanceledException)
        {
            // Ignora exceções de cancelamento durante o desligamento
        }

        _cts.Dispose();

        _cts = null;
        _executing = null;
    }
}
Enter fullscreen mode Exit fullscreen mode

Removendo Sigil por Simplicidade e Suporte a AOT

Para simplificar a biblioteca e suportar Native AOT, removi a dependência do Sigil (que depende de geração dinâmica de IL). Em vez disso, agora estou usando as capacidades de reflexão do C# (MakeGenericMethod e CreateDelegate) para despachar mensagens para manipuladores fortemente tipados.

Aqui está o método RunAsync revisado utilizando um dicionário concorrente para fazer cache dos delegates:

public class ActorProcess(IActor actor, IMailbox mailbox)
{
    private static readonly ConcurrentDictionary<
        Type,
        Func<IActor, IMessage, ValueTask>
    > _typedCallHandle = new();

    private async Task RunAsync(CancellationToken cancellationToken)
    {
        await foreach (var message in mailbox.WithCancellation(cancellationToken))
        {
            if (RuntimeFeature.IsDynamicCodeSupported)
            {
                var callHandle = _typedCallHandle.GetOrAdd(
                    message.Payload.GetType(),
                    CreateCallHandleDelegate
                );

                await callHandle(actor, message);
            }
            else
            {
                await actor.HandleAsync(message.Payload, message.CancellationToken);
            }
        }
    }

    private static async ValueTask CallHandle<TMessage>(IActor actor, IMessage message)
    {
        if (actor is IHandleActorMessage<TMessage> handle)
        {
            await handle.HandleAsync((TMessage)message.Payload, message.CancellationToken);
        }
        else
        {
            await actor.HandleAsync(message.Payload, message.CancellationToken);
        }
    }

    private static readonly MethodInfo s_callHandleMethodInfo = typeof(ActorProcess).GetMethod(
        nameof(CallHandle),
        BindingFlags.Static | BindingFlags.NonPublic
    )!;

    private static Func<IActor, IMessage, ValueTask> CreateCallHandleDelegate(Type messageType)
    {
        var typed = s_callHandleMethodInfo.MakeGenericMethod(messageType);
        return typed.CreateDelegate<Func<IActor, IMessage, ValueTask>>();
    }
}
Enter fullscreen mode Exit fullscreen mode

Essa abordagem garante um manuseio eficiente de mensagens tipadas quando código dinâmico é suportado, enquanto recorre a um manipulador genérico para cenários AOT.

Implementando Tell e Ask

Agora, vamos implementar dois primitivos de mensagens centrais em sistemas de atores:

  • Tell: Um mecanismo "Fire-and-forget" (dispare e esqueça). O remetente envia uma mensagem e continua imediatamente sem esperar por um resultado.
  • Ask: Um mecanismo "Request-Response" (solicitação-resposta). O remetente envia uma mensagem e aguarda (assincronamente) por uma resposta.

A Interface IActorReference

Primeiro, definimos uma interface para referências de atores que suporta ambos os padrões:

public interface IActorReference
{
    TResponse Ask<TRequest, TResponse>(TRequest request, TimeSpan? timeout = null)
        where TRequest : notnull;

    ValueTask<TResponse> AskAsync<TRequest, TResponse>(TRequest request, CancellationToken cancellationToken = default)
        where TRequest : notnull;

    void Tell<TMessage>(TMessage message, TimeSpan? timeout = null)
        where TMessage : notnull;

    ValueTask TellAsync<TMessage>(TMessage message, CancellationToken cancellationToken = default)
        where TMessage : notnull;
}
Enter fullscreen mode Exit fullscreen mode

Tell – Fire and Forget

Tell é direto — ele envia uma mensagem sem esperar por uma resposta.

public class LocalActorReference
{
    public void Tell<TMessage>(TMessage message, TimeSpan? timeout = null)
        where TMessage : notnull
    {
        var cts = new CancellationTokenSource();
        if (timeout.HasValue)
        {
            cts.CancelAfter(timeout.Value);
        }

        try
        {
            var task = TellAsync(message, cts.Token);
            if (task.IsCompletedSuccessfully)
            {
                return;
            }

            task.AsTask().GetAwaiter().GetResult();
        }
        catch (OperationCanceledException ex)
        {
            throw new TimeoutException(
                $"Tell operation timed out after {timeout?.TotalMilliseconds ?? 0} ms.",
                ex
            );
        }
    }

    public async ValueTask TellAsync<TMessage>(TMessage message, CancellationToken cancellationToken = default)
        where TMessage : notnull
    {
        await mailbox.EnqueueAsync(
            new LocalTellMessage(message, CancellationToken.None),
            cancellationToken
        );
    }
}
Enter fullscreen mode Exit fullscreen mode

Ask – Request-Response

Ask é mais complexo. Requer um mecanismo temporário para capturar a resposta. No Trupe, a LocalAskMessage (não mostrada aqui) atua como um TaskCompletionSource que o processo do Ator pode completar quando o processamento terminar.

public class LocalActorReference
{
    public TResponse Ask<TRequest, TResponse>(TRequest request, TimeSpan? timeout = null)
        where TRequest : notnull
    {
        var cts = new CancellationTokenSource();
        if (timeout.HasValue)
        {
            cts.CancelAfter(timeout.Value);
        }

        try
        {
            var result = AskAsync<TRequest, TResponse>(request, cts.Token);
            if (result.IsCompletedSuccessfully)
            {
                return result.Result;
            }

            return result.AsTask().GetAwaiter().GetResult();
        }
        catch (OperationCanceledException ex)
        {
            throw new TimeoutException(
                $"Ask operation timed out after {timeout?.TotalMilliseconds ?? 0} ms.",
                ex
            );
        }
    }

    public async ValueTask<TResponse> AskAsync<TRequest, TResponse>(TRequest request, CancellationToken cancellationToken = default)
        where TRequest : notnull
    {
        var message = new LocalAskMessage(request, cancellationToken);

        await mailbox.EnqueueAsync(message, cancellationToken);

        var response = await message.AsTask();

        if (response is TResponse val)
        {
            return val;
        }

        throw new InvalidCastException(
            $"Cannot cast response of type {response?.GetType().FullName ?? "null"} to {typeof(TResponse).FullName}."
        );
    }
}
Enter fullscreen mode Exit fullscreen mode

Fechando o Ciclo: Atualizando o ActorProcess

Finalmente, precisamos atualizar o loop RunAsync no ActorProcess para lidar com a resposta.

O Trupe usa uma abordagem baseada em contexto para respostas. O ator processa a mensagem e define o resultado em Context.Response. O loop do processo então pega esse resultado e completa a promise do Ask.

public class ActorProcess(IActor actor, IMailbox mailbox)
{
    private async Task RunAsync(CancellationToken cancellationToken)
    {
        await foreach (var message in mailbox.WithCancellation(cancellationToken))
        {
            actor.Context.Response = null;
            if (RuntimeFeature.IsDynamicCodeSupported)
            {
                var callHandle = _typedCallHandle.GetOrAdd(
                    message.Payload.GetType(),
                    CreateCallHandleDelegate
                );

                await callHandle(actor, message);
            }
            else
            {
                await actor.HandleAsync(message.Payload, message.CancellationToken);
            }

            if (message is IAskMessage askMessage)
            {
                askMessage.SetResult(actor.Context.Response);
            }

            actor.Context.Response = null;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Conclusão

Com essas mudanças, o Trupe está se tornando uma base robusta para construir sistemas de Atores em .NET.

  • Clareza: Interfaces renomeadas tornam o código autoexplicativo.
  • Simplicidade: Mover para IAsyncEnumerable e remover o Sigil reduz drasticamente a sobrecarga de manutenção.
  • Compatibilidade: Agora estamos prontos para Native AOT.
  • Funcionalidade: As implementações de Tell e Ask fornecem os canais de comunicação essenciais que os atores precisam.

No próximo artigo, mergulharei em Supervisão e como lidamos com falhas no Trupe.

Top comments (0)