DEV Community

Cristiano Rodrigues for Unhacked

Posted on

Backpressure sem drama: pipelines in-process com System.Threading.Channels

Passar dados de quem produz para quem consome dentro do mesmo processo parece o problema mais simples da computação concorrente. Uma fila no meio, um lock para proteger, pronto. Mas essa simplicidade é uma armadilha: o problema real não é mover os dados, é decidir o que acontece quando o produtor é mais rápido que o consumidor. Esse fenômeno tem nome, backpressure, e é ele que derruba serviços em produção às três da manhã.

Como você segura uma API que recebe 5.000 eventos por segundo quando o consumidor só consegue gravar 2.000 por segundo no banco? Deixa a fila crescer até estourar a memória? Bloqueia a thread do request? Descarta eventos em silêncio?

A equipe do .NET enfrentou exatamente esse dilema ao construir a infraestrutura do ASP.NET Core e do SignalR, e a resposta se chama System.Threading.Channels: nasceu como pacote NuGet em 2018, na época do .NET Core 2.1, e entrou no shared framework no .NET Core 3.0. Não é uma biblioteca de terceiros: é mantida pelo time do runtime e usada internamente por componentes da própria plataforma, do SignalR ao Kestrel. Hoje, no .NET 10, ela é a peça padrão para pipelines in-process, com direito a melhorias de runtime que aceleram seu código sem você mudar uma linha.

Vamos entender por que as soluções óbvias falham, como um Channel funciona por dentro e como levar isso para produção com backpressure tratado como cidadão de primeira classe.

O problema em detalhe: por que as soluções óbvias falham

Antes de aceitar uma abstração nova, vale esgotar as alternativas que todo mundo tenta primeiro.

Queue<T> + lock: a implementação manual. Funciona no teste local com um produtor e um consumidor. Em produção, o consumidor precisa fazer polling (dormir, acordar, checar a fila, dormir de novo), o que desperdiça CPU quando a fila está vazia e adiciona latência quando ela está cheia. E não existe nenhum mecanismo para segurar o produtor: a fila cresce sem limite.

ConcurrentQueue<T>: resolve o lock, não resolve o resto. Continua sem sinalização assíncrona (o consumidor ainda faz polling com TryDequeue num loop) e continua sem limite de capacidade. Você trocou um problema de sincronização por um problema de coordenação.

BlockingCollection<T>: finalmente tem capacidade limitada e bloqueia o produtor quando enche. O defeito é a palavra "bloqueia": Add e Take seguram uma thread inteira do thread pool enquanto esperam. Num servidor ASP.NET Core com 200 requests concorrentes esperando espaço na fila, são 200 threads paradas. Threads são caras (cerca de 1 MB de stack cada) e o thread pool leva tempo para injetar novas. É a receita clássica de thread starvation.

Fila externa (RabbitMQ, Redis, Kafka): resolve backpressure e ainda dá durabilidade, mas ao custo de uma viagem de rede por mensagem, serialização, infraestrutura para operar e um novo ponto de falha. Para comunicação entre serviços, é a escolha certa. Para passar trabalho entre duas partes do mesmo processo, é adicionar rede, operação e um novo modo de falha a um problema que acontece inteiramente dentro do processo.

Repare no padrão: cada alternativa resolve um pedaço e deixa outro exposto. O que falta é uma estrutura que seja assíncrona de ponta a ponta, tenha capacidade limitada e trate a fila cheia como um evento esperado, não como uma exceção.

Anatomia de um Channel

Um Channel<T> é duas metades coladas numa estrutura só:

ChannelWriter<T>: o lado do produtor. Expõe WriteAsync (escreve, aguardando se preciso), TryWrite (tenta escrever sem esperar), WaitToWriteAsync (aguarda até haver espaço, sem escrever) e Complete (sinaliza o fim).

ChannelReader<T>: o lado do consumidor. Expõe ReadAsync, TryRead, WaitToReadAsync e o mais elegante de todos, ReadAllAsync, que devolve um IAsyncEnumerable<T>.

Os pares Try/Wait merecem um comentário, porque são o caminho de alto desempenho. TryRead devolve um item de forma síncrona se houver um disponível, sem criar nenhuma operação assíncrona. O padrão clássico de consumidor eficiente combina os dois: aguarde com WaitToReadAsync e, quando houver sinal, drene tudo que estiver no buffer com TryRead num loop apertado antes de voltar a esperar. Do lado do produtor, WaitToWriteAsync seguido de TryWrite cumpre o mesmo papel. E todos os métodos assíncronos (WriteAsync, ReadAsync, WaitToReadAsync, ReadAllAsync) aceitam CancellationToken, o que vai ser essencial quando chegarmos no shutdown.

A decisão mais importante você toma na criação, escolhendo entre três fábricas:

// Sem limite: nunca segura o produtor, memória é o limite
var unbounded = Channel.CreateUnbounded<TelemetryEvent>();

// Com limite: aqui mora o backpressure
var bounded = Channel.CreateBounded<TelemetryEvent>(
    new BoundedChannelOptions(capacity: 1_000)
    {
        FullMode = BoundedChannelFullMode.Wait
    });

// Com prioridade: itens "menores" saem primeiro (API adicionada no .NET 9)
var prioritized = Channel.CreateUnboundedPrioritized<TelemetryEvent>();
Enter fullscreen mode Exit fullscreen mode

Vamos focar nas duas primeiras agora e voltar à variante priorizada mais adiante, depois de entender performance.

O FullMode do canal bounded é literalmente a sua política de backpressure escrita em código:

Wait: o produtor espera (de forma assíncrona, sem bloquear thread) até abrir espaço. É a escolha padrão quando nenhum dado pode ser perdido.

DropOldest: descarta o item mais antigo para abrir espaço. Perfeito para telemetria de posição: se o buffer encheu, a coordenada de 10 segundos atrás vale menos que a atual.

DropNewest: descarta o item mais recente já enfileirado.

DropWrite: descarta o item que está tentando entrar.

Vamos acompanhar passo a passo o que acontece num canal bounded de capacidade 3 com FullMode.Wait, um produtor rápido e um consumidor lento:

Tempo Produtor Consumidor Itens no buffer
t0 WriteAsync(A) completa síncrono ocupado A
t1 WriteAsync(B) completa síncrono ocupado A, B
t2 WriteAsync(C) completa síncrono ocupado A, B, C
t3 WriteAsync(D) retorna ValueTask pendente ocupado A, B, C
t4 ainda aguardando, sem thread presa ReadAsync() consome A B, C
t5 continuação dispara, D entra processando A B, C, D

O mesmo fluxo, desenhado:

Propagação do backpressure: o produtor a 5.000 msg/s encontra o buffer cheio, o WriteAsync devolve uma ValueTask pendente sem bloquear thread, e a velocidade do consumidor a 2.000 msg/s se propaga de volta ao produtor

Repare no que acontece em t3 e t4: o produtor não travou uma thread, ele devolveu uma ValueTask incompleta e liberou a thread para outro trabalho. Quando o consumidor abriu espaço, o runtime agendou a continuação. Isso é backpressure assíncrono: a velocidade do consumidor se propaga naturalmente para o produtor, sem polling, sem thread bloqueada, sem fila infinita.

Por que funciona

A elegância do Channel está em três decisões de design:

Async nativo, não adaptado. WriteAsync e ReadAsync devolvem ValueTask, e a escolha desse tipo não é cosmética. No caminho mais comum (há espaço no buffer, há um item disponível), a operação completa imediatamente, e a ValueTask permite devolver esse resultado sem alocar um objeto Task no heap. Só quando existe espera real a operação vira assíncrona de fato, e mesmo aí as implementações atuais reutilizam os objetos de espera. O resultado é pressão quase nula no GC no caminho feliz.

Backpressure é configuração, não código. Nas soluções manuais, a política de fila cheia fica espalhada em ifs pelo código do produtor. No Channel, ela é declarada uma vez, na criação, e vale para todos os produtores.

Separação de leitura e escrita no sistema de tipos. Você injeta ChannelWriter<T> no produtor e ChannelReader<T> no consumidor. O compilador garante que o controller não consegue consumir a fila e o worker não consegue escrever nela. Arquitetura imposta por tipo, não por convenção.

O que acontece por baixo dos panos

Vale abrir o capô, porque é aí que mora a explicação da performance. Um aviso honesto antes: os detalhes a seguir descrevem as implementações atuais do runtime, que não fazem parte do contrato público da API e podem mudar entre versões. Os princípios, porém, são estáveis.

O buffer do canal unbounded é uma fila concorrente segmentada. Nas implementações atuais, é a mesma estrutura da ConcurrentQueue<T>: uma lista de segmentos (blocos contíguos de memória) com enqueue e dequeue lock-free na maioria dos caminhos. Itens vizinhos ficam próximos em memória, o que melhora a localidade de cache em relação a estruturas concorrentes baseadas em nós encadeados.

O buffer do canal bounded é um deque sobre array circular, protegido por um lock de curtíssima duração. Parece um retrocesso em relação ao lock-free, mas não é: o lock cobre apenas algumas instruções (inserir num array, remover, checar capacidade) e nunca é mantido durante uma espera. É uma contenção de outra natureza, e de outro custo, comparada a alternativas que bloqueiam threads inteiras enquanto aguardam espaço.

A espera é uma lista de awaiters reutilizáveis. Quando um consumidor chama ReadAsync num canal vazio, o Channel não cria uma Task: ele registra um objeto interno (AsyncOperation, que implementa IValueTaskSource) numa lista de leitores pendentes. Quando um produtor escreve, ele completa diretamente o awaiter do primeiro leitor da fila, muitas vezes entregando o item sem sequer passar pelo buffer. Esses objetos de espera são reaproveitados entre operações, e é por isso que o benchmark mais adiante mostra alocação em kilobytes, não megabytes.

As flags escolhem implementações especializadas. SingleReader = true num canal unbounded troca a implementação inteira por uma classe dedicada ao cenário de consumidor único, que mantém um único awaiter cacheado em vez de uma fila deles. Combinada com SingleWriter, você chega no clássico SPSC (Single Producer Single Consumer), o cenário em que quase toda sincronização pode ser eliminada.

Em resumo: o Channel é rápido porque quase nunca aloca, quase nunca bloqueia e escolhe o algoritmo certo para as promessas que você faz na criação.

Implementação: do básico ao pipeline

O padrão mínimo, completo e funcional:

using System.Threading.Channels;

var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(100)
{
    FullMode = BoundedChannelFullMode.Wait
});

// Produtor
var producer = Task.Run(async () =>
{
    try
    {
        for (var i = 1; i <= 1_000; i++)
        {
            // Se o buffer estiver cheio, aguarda sem bloquear thread
            await channel.Writer.WriteAsync(i);
        }

        // Sinaliza que não vem mais nada: sem isso, o consumidor espera para sempre
        channel.Writer.Complete();
    }
    catch (Exception ex)
    {
        // Propaga a falha: o await foreach do consumidor vai relançá-la
        channel.Writer.Complete(ex);
    }
});

// Consumidor
var consumer = Task.Run(async () =>
{
    // ReadAllAsync encerra o loop quando o writer chama Complete()
    // e relança a exceção se o writer chamou Complete(ex)
    await foreach (var item in channel.Reader.ReadAllAsync())
    {
        await ProcessAsync(item);
    }
});

await Task.WhenAll(producer, consumer);
Enter fullscreen mode Exit fullscreen mode

O Complete() merece atenção: ele é o encerramento gracioso do pipeline. Quando o produtor termina, o ReadAllAsync do consumidor drena o que restou no buffer e o await foreach termina sozinho. E a sobrecarga Complete(exception) é uma funcionalidade pouco conhecida e muito útil: a exceção atravessa o canal e ressurge no consumidor, no ponto onde ele estava lendo. O erro do produtor não morre num log esquecido, ele chega em quem precisa reagir.

Dois parentes do Complete valem registro. TryComplete() faz o mesmo sem lançar exceção se o canal já foi completado, o que é essencial quando vários produtores podem disputar o encerramento. E do outro lado, Reader.Completion é uma Task que completa quando o canal terminou de verdade (writer completou e o buffer foi drenado), falhando com a exceção original se o encerramento veio de Complete(ex). Um await channel.Reader.Completion é a forma limpa de esperar o fim de um pipeline sem ser o consumidor dele.

E quando o throughput importa de verdade, o consumidor eficiente usa o padrão Wait + drenagem:

// Aguarda haver dados, depois drena tudo de forma síncrona
while (await channel.Reader.WaitToReadAsync(ct))
{
    while (channel.Reader.TryRead(out var item))
    {
        await ProcessAsync(item);
    }
}
Enter fullscreen mode Exit fullscreen mode

A diferença para o await foreach é sutil mas real: dentro do loop interno, TryRead consome itens já disponíveis sem criar nenhuma operação assíncrona. Só quando o buffer esvazia o código volta a esperar.

Integração real: fila de trabalho em ASP.NET Core

O caso de uso mais comum em produção: um endpoint precisa disparar trabalho pesado (enviar e-mail, processar imagem, gravar auditoria) sem segurar o request. Vamos montar a versão completa, com o Channel como fila e um BackgroundService como consumidor. A arquitetura inteira cabe num desenho:

Arquitetura da fila de trabalho em ASP.NET Core: o request passa pelo endpoint e entra no Bounded Channel via TryWrite, o BackgroundService consome e grava no destino. O consumo lento enche o buffer e a fila cheia vira HTTP 503 no endpoint, fechando o ciclo de backpressure

Primeiro, os tipos que circulam pelo pipeline:

public sealed record SignupRequest(string Email);

public sealed record EmailJob(string Email)
{
    public Guid Id { get; } = Guid.NewGuid();
}
Enter fullscreen mode Exit fullscreen mode

Em seguida, a fila como um serviço com contrato explícito:

public sealed class WorkQueue<T>
{
    private readonly Channel<T> _channel;

    public WorkQueue(int capacity)
    {
        if (capacity <= 0)
            throw new ArgumentOutOfRangeException(nameof(capacity));

        _channel = Channel.CreateBounded<T>(new BoundedChannelOptions(capacity)
        {
            FullMode = BoundedChannelFullMode.Wait,
            SingleReader = true,   // um único BackgroundService consome
            SingleWriter = false   // vários requests produzem
        });
    }

    public ValueTask EnqueueAsync(T item, CancellationToken ct = default)
        => _channel.Writer.WriteAsync(item, ct);

    public bool TryEnqueue(T item)
        => _channel.Writer.TryWrite(item);

    public IAsyncEnumerable<T> DequeueAllAsync(CancellationToken ct = default)
        => _channel.Reader.ReadAllAsync(ct);
}
Enter fullscreen mode Exit fullscreen mode

O consumidor, como BackgroundService:

public sealed class EmailWorker : BackgroundService
{
    private readonly WorkQueue<EmailJob> _queue;
    private readonly ILogger<EmailWorker> _logger;

    public EmailWorker(WorkQueue<EmailJob> queue, ILogger<EmailWorker> logger)
    {
        _queue = queue;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await foreach (var job in _queue.DequeueAllAsync(stoppingToken))
        {
            try
            {
                await SendAsync(job, stoppingToken);
            }
            catch (Exception ex) when (ex is not OperationCanceledException)
            {
                // Uma falha de item não pode derrubar o pipeline inteiro
                _logger.LogError(ex, "Falha ao processar job {JobId}", job.Id);
            }
        }
    }

    private Task SendAsync(EmailJob job, CancellationToken ct)
        => Task.Delay(50, ct); // aqui entra o envio real
}
Enter fullscreen mode Exit fullscreen mode

E o registro no Program.cs:

builder.Services.AddSingleton(new WorkQueue<EmailJob>(capacity: 5_000));
builder.Services.AddHostedService<EmailWorker>();
Enter fullscreen mode Exit fullscreen mode

No endpoint, você decide a política de sobrecarga de forma explícita:

app.MapPost("/signup", (SignupRequest req, WorkQueue<EmailJob> queue) =>
{
    // TryEnqueue: se a fila está cheia, o sistema está sobrecarregado.
    // Melhor responder 503 agora do que travar o request esperando.
    if (!queue.TryEnqueue(new EmailJob(req.Email)))
        return Results.StatusCode(StatusCodes.Status503ServiceUnavailable);

    return Results.Accepted();
});
Enter fullscreen mode Exit fullscreen mode

Repare na decisão: em vez de await EnqueueAsync (que seguraria o request sob sobrecarga), o endpoint usa TryWrite e devolve 503. A fila cheia deixou de ser um acidente e virou um sinal de load shedding. Isso é backpressure chegando até o cliente HTTP, que é exatamente onde ele deve chegar.

O CancellationToken que atravessa todo esse código não é decoração. É ele que conecta o pipeline ao ciclo de vida do host: quando o ASP.NET Core inicia o shutdown, o stoppingToken dispara, o ReadAllAsync é cancelado e o worker encerra de forma cooperativa em vez de ser morto no meio de um job.

Vale ainda situar o Channel no ecossistema: ele aparece com frequência ao lado de System.IO.Pipelines. Enquanto o PipeReader resolve o fluxo eficiente de bytes entre um socket e um parser, o Channel<T> resolve o fluxo de objetos entre produtores e consumidores dentro da aplicação. Um pipeline de rede típico lê bytes com PipeReader, converte em mensagens tipadas e as publica num Channel<T> para processamento assíncrono. As duas peças resolvem o mesmo problema conceitual (backpressure) em camadas diferentes: bytes numa, objetos na outra.

Nota: essa fila vive na memória do processo. Se o pod reiniciar num deploy, tudo que estava no buffer se perde. Para e-mail de boas-vindas, tolerável. Para cobrança, inaceitável: aí a fila certa é durável (RabbitMQ, Azure Service Bus, Kafka). Channel não substitui message broker, ele resolve outro problema.

Alta performance: as opções que a maioria ignora

Depois do pipeline funcionando, vale extrair o máximo. UnboundedChannelOptions e BoundedChannelOptions têm três flags que mudam o código que o runtime escolhe por baixo:

SingleReader = true: promete que só um consumidor lê por vez. Como vimos nos internals, no canal unbounded o runtime troca a implementação por uma versão especializada com um único awaiter cacheado. No canal bounded as flags não trocam a implementação, e o benchmark logo abaixo mostra essa diferença na prática.

SingleWriter = true: a promessa equivalente do lado do produtor. As duas juntas configuram o cenário SPSC (Single Producer Single Consumer), que dispensa boa parte da sincronização exigida pelas implementações gerais.

AllowSynchronousContinuations = true: permite que a continuação do consumidor rode na thread do produtor, eliminando um hop de agendamento. Ganha latência, mas o produtor passa a executar código do consumidor. Use apenas quando você controla os dois lados e o trabalho do consumidor é curto.

E é com dados que comprovamos o ganho. Vamos medir em duas etapas: primeiro o pipeline completo, como ele roda em produção, depois a estrutura de dados isolada.

Benchmark 1: o pipeline completo

BenchmarkDotNet, movendo 100.000 inteiros de um produtor para um consumidor em tasks separadas:

[MemoryDiagnoser]
public class QueueBenchmarks
{
    private const int N = 100_000;

    [Benchmark(Baseline = true)]
    public async Task Channel_Bounded_Default()
        => await RunChannelAsync(Channel.CreateBounded<int>(
            new BoundedChannelOptions(1_000)));

    [Benchmark]
    public async Task Channel_Bounded_SingleReaderWriter()
        => await RunChannelAsync(Channel.CreateBounded<int>(
            new BoundedChannelOptions(1_000)
            {
                SingleReader = true,
                SingleWriter = true
            }));

    [Benchmark]
    public async Task BlockingCollection_Bounded()
    {
        // async e await aqui são obrigatórios: o using só pode
        // descartar a collection depois que as tasks terminarem
        using var collection = new BlockingCollection<int>(1_000);

        var producer = Task.Run(() =>
        {
            for (var i = 0; i < N; i++)
                collection.Add(i);
            collection.CompleteAdding();
        });

        var consumer = Task.Run(() =>
        {
            foreach (var _ in collection.GetConsumingEnumerable()) { }
        });

        await Task.WhenAll(producer, consumer);
    }

    private static async Task RunChannelAsync(Channel<int> channel)
    {
        var producer = Task.Run(async () =>
        {
            for (var i = 0; i < N; i++)
                await channel.Writer.WriteAsync(i);
            channel.Writer.Complete();
        });

        var consumer = Task.Run(async () =>
        {
            await foreach (var _ in channel.Reader.ReadAllAsync()) { }
        });

        await Task.WhenAll(producer, consumer);
    }
}
Enter fullscreen mode Exit fullscreen mode

Resultados na minha máquina (os números absolutos variam com o hardware, a proporção é o que importa). Ambiente: Apple M4 Pro (14 núcleos), macOS Tahoe 26.5.1, .NET 10.0.9 (Arm64 RyuJIT), BenchmarkDotNet v0.15.8:

Método Média Ratio Alocação
Channel_Bounded_Default 3,43 ms 1,00 10,31 KB
Channel_Bounded_SingleReaderWriter 3,47 ms 1,01 10,3 KB
BlockingCollection_Bounded 20,02 ms 5,85 22,98 KB

A leitura tem duas partes, e uma delas contraria a intuição. A primeira é a esperada: o Channel foi quase 6 vezes mais rápido que BlockingCollection, alocou menos da metade e não bloqueou nenhuma thread no caminho. Os 10 KB para mover 100.000 itens confirmam o que os internals prometeram: no caminho feliz, ValueTask completa síncrono, os awaiters são reutilizados e quase nada vai para o heap.

A segunda parte é a surpresa: SingleReader/SingleWriter não mudou nada, e isso não é erro de medição. Como vimos nos internals, a implementação especializada existe para o canal unbounded. No canal bounded, as flags não trocam a implementação, e o custo deste pipeline é dominado pelo agendamento entre threads, não pela estrutura de dados. Benchmark serve exatamente para isso: corrigir a intuição antes que ela vire decisão de arquitetura.

Nota: sobre metodologia. Esse benchmark mede o pipeline completo, incluindo Task.Run, o agendamento no thread pool e o custo do scheduler, porque é isso que você paga em produção. É uma medida honesta do cenário real, mas não isola o custo do Channel em si. Para isso, o segundo benchmark.

Benchmark 2: isolando a estrutura de dados

Aqui, uma única thread alterna escrita e leitura. Como sempre há espaço no buffer e sempre há um item disponível, toda operação completa de forma síncrona: o que sobra é o custo puro da estrutura, sem scheduler no caminho.

[Benchmark]
public async Task Channel_SingleThread_Default()
{
    var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(8));

    for (var i = 0; i < N; i++)
    {
        // Ambas completam síncrono: não há espera real
        await channel.Writer.WriteAsync(i);
        _ = await channel.Reader.ReadAsync();
    }
}

[Benchmark]
public async Task Channel_SingleThread_SingleReaderWriter()
{
    var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(8)
    {
        SingleReader = true,
        SingleWriter = true
    });

    for (var i = 0; i < N; i++)
    {
        await channel.Writer.WriteAsync(i);
        _ = await channel.Reader.ReadAsync();
    }
}
Enter fullscreen mode Exit fullscreen mode
Método Média Alocação
Channel_SingleThread_Default 1,29 ms 760 B
Channel_SingleThread_SingleReaderWriter 1,30 ms 760 B

O empate entre as duas variantes se repete, pelo mesmo motivo do primeiro benchmark: canal bounded, mesma implementação, flags sem efeito. E a comparação entre os dois benchmarks conta a história completa: dos 3,4 ms do pipeline concorrente, pouco mais de um terço (1,3 ms) é o Channel em si. O resto é agendamento, thread pool e a concorrência real entre produtor e consumidor. E os 760 bytes para 200.000 operações (100.000 pares de write e read) confirmam na prática o que os internals prometeram: no caminho síncrono, praticamente nada vai para o heap.

E aqui entra o contexto de versão: no .NET 10, melhorias no ThreadPool (como o escape de work items da fila local quando uma thread bloqueia) e otimizações em Task beneficiam pipelines com Channels sem nenhuma mudança no seu código. É o tipo de ganho que você colhe só por atualizar o TargetFramework.

Nota: só ative SingleReader/SingleWriter se a promessa for verdadeira. As flags não são validadas em todos os caminhos: dois consumidores num canal SingleReader produzem comportamento indefinido, do tipo que passa no teste e corrompe estado em produção.

Outras variantes: o canal com prioridade

Com o modelo de performance em mãos, vale voltar à terceira fábrica. Nem toda fila é justa de propósito: numa fila de notificações, o alerta de segurança precisa furar a fila da newsletter. O Channel.CreateUnboundedPrioritized<T>, adicionado no .NET 9 e presente no .NET 10, resolve isso trocando o buffer FIFO por uma PriorityQueue<T> interna: o ReadAsync sempre entrega o menor item segundo o comparador.

public sealed record Notification(int Priority, string Message);

var channel = Channel.CreateUnboundedPrioritized<Notification>(
    new UnboundedPrioritizedChannelOptions<Notification>
    {
        // Menor prioridade numérica sai primeiro
        Comparer = Comparer<Notification>.Create(
            static (a, b) => a.Priority.CompareTo(b.Priority))
    });

await channel.Writer.WriteAsync(new(3, "Newsletter semanal"));
await channel.Writer.WriteAsync(new(1, "Alerta de segurança"));
await channel.Writer.WriteAsync(new(2, "Fatura disponível"));

// Sai: Alerta de segurança, Fatura disponível, Newsletter semanal
Enter fullscreen mode Exit fullscreen mode

Os casos de uso naturais: filas de notificação com níveis de severidade, jobs de clientes premium à frente dos demais, retries reagendados atrás do trabalho novo.

A prioridade tem preço, e ele é conhecido: enfileirar e desenfileirar num heap custa O(log n), contra O(1) do buffer FIFO. Para filas de milhares de itens é irrelevante, mas num hot path de milhões de operações por segundo a diferença aparece, e o benchmark decide.

Nota: a priorização vale apenas entre os itens que estão no buffer naquele momento, e itens de prioridade igual não têm ordem FIFO garantida (a PriorityQueue do .NET não é estável). Se a ordem de chegada importa dentro da mesma prioridade, inclua um número de sequência no comparador.

Quando escolher cada um

Aspecto Channel BlockingCollection TPL Dataflow Fila externa Pipelines
Unidade de trabalho Objetos Objetos Objetos Mensagens Bytes
Backpressure assíncrono Nativo Não (bloqueia thread) Nativo Nativo Nativo
Alocação no caminho feliz Quase zero Alta Média Alta (rede + serialização) Quase zero
Sobrevive a restart Não Não Não Sim Não
Pipelines multi-estágio Manual Manual Nativo Nativo Manual
Complexidade operacional Nenhuma Nenhuma Baixa Alta Nenhuma

Escolha Channel quando:

  • Produtor e consumidor vivem no mesmo processo
  • Você precisa de backpressure sem bloquear threads
  • Perder o buffer num restart é aceitável ou mitigável
  • Throughput e alocação importam

Escolha TPL Dataflow quando:

  • O pipeline tem muitos estágios com transformação, broadcast e agregação, e você quer isso declarativo

Escolha uma fila externa quando:

  • As mensagens precisam sobreviver a deploy e crash
  • Produtor e consumidor são serviços diferentes
  • Você precisa de reprocessamento, DLQ e auditoria

Escolha System.IO.Pipelines quando:

  • O fluxo é de bytes, não de objetos: parsing de protocolo, leitura de socket, streaming de arquivo. Pipelines e Channels não competem, se complementam.

Fique longe de BlockingCollection quando:

  • O código é assíncrono. Num servidor ASP.NET Core, cada Add ou Take bloqueado é uma thread do pool fora de combate.

Erros comuns

A lista de verificação antes de mandar um Channel para produção:

Usar CreateUnbounded sem monitoramento: todo canal ilimitado merece uma métrica de profundidade e um alerta.

Esquecer Complete(): o consumidor fica preso no await foreach para sempre.

Ignorar CancellationToken: sem ele, o shutdown do host vira um kill disfarçado.

Mentir nas flags: SingleReader com dois consumidores é comportamento indefinido.

Bloquear dentro do consumidor: chamar .Result ou .Wait() no worker reintroduz exatamente o problema de thread que o Channel eliminou.

Trabalho pesado e serial no consumidor único: se o processamento é lento, um SingleReader vira o gargalo. Escale com múltiplos consumidores (e SingleReader = false) ou paralelize dentro do worker.

Usar Channel entre processos: ele é uma estrutura de memória. Entre processos ou serviços, a resposta é IPC ou message broker.

Considerações e limitações

Honestidade sobre o que dói em produção, agora em detalhe:

Nota: Channel.CreateUnbounded é o vazamento de memória mais educado do .NET. Ele nunca falha, nunca avisa, só cresce. Todo unbounded em produção merece uma métrica de profundidade da fila (um contador que você incrementa no write e decrementa no read) e um alerta. Se você não consegue justificar por que o canal precisa ser ilimitado, ele deve ser bounded.

Nota: esquecer Complete() é o deadlock mais comum com Channels: o consumidor fica preso no await foreach para sempre e o BackgroundService nunca encerra, segurando o shutdown do host até o timeout. Trate Complete() como você trata Dispose(): parte do contrato, não cortesia.

Nota: o graceful shutdown merece desenho. Quando o stoppingToken dispara, ReadAllAsync lança OperationCanceledException e o que estava no buffer morre com o processo. Se esses itens importam, o padrão é: parar de aceitar escritas, chamar Complete(), e drenar o restante com um CancellationToken separado e um prazo (o ASP.NET Core dá 30 segundos por padrão, configuráveis via HostOptions.ShutdownTimeout). Reader.Completion é o sinal certo para saber que a drenagem terminou.

Nota: Channel distribui itens, não faz broadcast. Com dois consumidores no mesmo reader, cada item vai para um deles, nunca para os dois. Se você precisa que todo consumidor veja toda mensagem, o padrão é um canal por assinante, com um loop de fan-out no meio.

Conclusão

O Channel<T> é uma solução elegante para um problema que quase todo sistema tem e quase ninguém enxerga até doer: a diferença de velocidade entre quem produz e quem consome. A implementação não é difícil, mas os detalhes importam: escolher bounded com o FullMode certo, sinalizar Complete() sempre (e Complete(ex) quando algo falha), propagar CancellationToken de ponta a ponta, ativar SingleReader/SingleWriter só quando a promessa é verdadeira e desenhar o shutdown antes do primeiro deploy.

O trade-off central é claro: Channels compram throughput altíssimo e backpressure de graça ao custo de viverem na memória do processo. Quando a durabilidade importa mais que a latência, a fila externa vence. Quando o problema é in-process, o Channel é imbatível, e no .NET 10 ele fica mais rápido sem você fazer nada.

Na próxima vez que você se pegar escrevendo Queue<T> com lock e um Task.Delay de polling, pare e pense em Channels. Backpressure não é um bug para tratar, é um contrato para desenhar: sistemas resilientes não são os que nunca enchem, são os que sabem exatamente o que fazer quando enchem.

Referências

Top comments (0)