DEV Community

Isaac Ojeda
Isaac Ojeda

Posted on

Sistema de Control de Jobs en Tiempo Real con Channels y Background Services en .NET

Introducción

En el desarrollo moderno de aplicaciones, es común necesitar ejecutar procesos en segundo plano que se comuniquen con nuestra API de forma eficiente y segura. Tradicionalmente, esto se resolvía con implementaciones complejas usando locks, colas manuales o infraestructura externa como RabbitMQ. Sin embargo, .NET ofrece una solución simple pero elegante: System.Threading.Channels.

En este artículo, exploraremos cómo construir un sistema de control de jobs en tiempo real utilizando:

  • 🔧 Channels para comunicación thread-safe entre componentes
  • 🔄 Background Services para tareas recurrentes
  • 🚀 Minimal APIs para endpoints modernos y limpios
  • TaskCompletionSource para comunicación bidireccional

Al finalizar, tendrás un proyecto funcional que puedes adaptar para casos de uso reales como procesamiento de emails, análisis de imágenes, generación de reportes, y más.

Nota: El código fuente siempre lo encontrarás en mi github -> DevToPosts/ApiBackgroundChannels at main · isaacOjeda/DevToPosts

¿Qué son los Channels?

Los Channels en .NET son estructuras de datos thread-safe diseñadas para escenarios productor-consumidor. Piensa en ellos como una "tubería" donde un lado escribe datos y el otro los lee, sin preocuparte por locks o sincronización manual.

¿Por qué usarlos?

  • ✅ Thread-safe por diseño
  • ✅ Alta performance con bajo overhead
  • ✅ Backpressure integrado (control de flujo)
  • ✅ Ideal para comunicación entre hilos/tareas
  • ✅ Alternativa simple a colas externas (RabbitMQ, Redis) para escenarios internos
  • ✅ Optimizado para async/await (usa ValueTask internamente)

Arquitectura del Proyecto

Este proyecto demuestra cómo controlar un Background Job desde una API usando Channels para comunicación bidireccional:

┌──────────────┐         ┌─────────┐         ┌──────────────────┐
│  API Request │ ──────> │ Channel │ ──────> │ Background Job   │
│  (Productor) │         │ (Cola)  │         │ (Consumidor)     │
└──────────────┘         └─────────┘         └──────────────────┘
      ↑                                             │
      └────── TaskCompletionSource ─────────────────┘
                     (Respuesta)
Enter fullscreen mode Exit fullscreen mode

Paso 1: Definir el Modelo de Comunicación

Primero, necesitamos estructuras para enviar comandos y recibir respuestas:

public enum CommandType { Start, Stop, GetStatus }

public class JobCommand
{
    public CommandType Type { get; set; }
    public TaskCompletionSource<JobStatus>? ResponseTask { get; set; }
}

public class JobStatus
{
    public bool IsRunning { get; set; }
    public int ExecutionCount { get; set; }
    public DateTime? LastExecutionTime { get; set; }
    public string Message { get; set; } = string.Empty;
}
Enter fullscreen mode Exit fullscreen mode

💡 Clave: TaskCompletionSource nos permite crear una Task que completaremos manualmente cuando tengamos la respuesta, haciendo posible la comunicación bidireccional.

Paso 2: Crear el Background Service (Consumidor)

public class JobProcessor : BackgroundService
{
    private readonly Channel<JobCommand> _channel;
    private bool _isJobRunning = false;
    private int _executionCount = 0;

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("JobProcessor iniciado. Esperando comandos...");

        // ✅ Patrón recomendado por Microsoft: WaitToReadAsync + TryRead
        // Más eficiente que ReadAllAsync para alta concurrencia
        while (await _channel.Reader.WaitToReadAsync(stoppingToken))
        {
            while (_channel.Reader.TryRead(out var command))
            {
                try
                {
                    await ProcessCommandAsync(command, stoppingToken);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Error procesando comando");
                    // ✅ Notificar errores al productor
                    command.ResponseTask?.TrySetException(ex);
                }
            }
        }
    }

    private async Task ProcessCommandAsync(JobCommand command, CancellationToken token)
    {
        switch (command.Type)
        {
            case CommandType.Start:
                _isJobRunning = true;
                _ = Task.Run(async () => await RunRecurringJobAsync(token));

                // Enviar respuesta al productor
                command.ResponseTask?.SetResult(new JobStatus 
                { 
                    IsRunning = true, 
                    Message = "Job iniciado" 
                });
                break;

            case CommandType.Stop:
                _isJobRunning = false;
                command.ResponseTask?.SetResult(new JobStatus 
                { 
                    IsRunning = false, 
                    Message = "Job detenido" 
                });
                break;

            case CommandType.GetStatus:
                command.ResponseTask?.SetResult(new JobStatus
                {
                    IsRunning = _isJobRunning,
                    ExecutionCount = _executionCount
                });
                break;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

💡 Explicación de las mejoras:

  • WaitToReadAsync() + TryRead(): Patrón recomendado por Microsoft para mejor performance
  • TrySetException(): Propaga errores al productor de forma segura
  • Bucle anidado: Procesa múltiples comandos en batch cuando están disponibles

Paso 3: Configurar el Channel y el Servicio

En Program.cs:

// ✅ Bounded Channel con opciones optimizadas (recomendado por Microsoft)
builder.Services.AddSingleton(Channel.CreateBounded<JobCommand>(
    new BoundedChannelOptions(100)
    {
        FullMode = BoundedChannelFullMode.Wait, // Backpressure automático
        SingleWriter = false, // Múltiples endpoints pueden escribir
        SingleReader = true   // Solo un BackgroundService consume
    }));

// Registrar el Background Service
builder.Services.AddHostedService<JobProcessor>();
Enter fullscreen mode Exit fullscreen mode

💡 ¿Bounded vs Unbounded?

Característica Unbounded Bounded
Capacidad Ilimitada Limitada (configurable)
Memoria Puede crecer sin control Controlada
Backpressure No Sí (automático)
Uso recomendado Productores lentos Productores rápidos
Performance Writes síncronos Writes pueden ser async

Modos de Bounded Channel (FullMode):

  • Wait (recomendado): Espera hasta que haya espacio (backpressure)
  • DropWrite: Descarta el nuevo elemento
  • DropOldest: Descarta el elemento más antiguo
  • DropNewest: Descarta el elemento más reciente

Opciones de optimización:

  • SingleWriter: true = mejor performance si solo un productor escribe
  • SingleReader: true = mejor performance si solo un consumidor lee
  • AllowSynchronousContinuations: false (default) para evitar bloqueos

Paso 4: Crear los Endpoints (Productores)

public static IEndpointRouteBuilder MapJobEndpoints(this IEndpointRouteBuilder app)
{
    var jobGroup = app.MapGroup("api/job");

    jobGroup.MapPost("start", async (Channel<JobCommand> channel) =>
    {
        // Crear TaskCompletionSource para esperar la respuesta
        var tcs = new TaskCompletionSource<JobStatus>();

        var command = new JobCommand
        {
            Type = CommandType.Start,
            ResponseTask = tcs
        };

        // ✅ WriteAsync maneja backpressure automáticamente
        await channel.Writer.WriteAsync(command);

        // Esperar respuesta del consumidor
        var status = await tcs.Task;
        return Results.Ok(status);
    });

    // Endpoints similares para stop y status...

    return app;
}
Enter fullscreen mode Exit fullscreen mode

💡 Flujo:

  1. API recibe request → Crea TaskCompletionSource
  2. Escribe comando en el Channel con WriteAsync() (maneja backpressure)
  3. Espera que el consumidor complete la Task
  4. Retorna la respuesta al cliente

¿Por qué este patrón?

Sin Channels:

// ❌ Locks manuales, propenso a errores
private static readonly object _lock = new();
private static Queue<Command> _queue = new();

public void AddCommand(Command cmd)
{
    lock(_lock) { _queue.Enqueue(cmd); }
}
Enter fullscreen mode Exit fullscreen mode

Con Channels:

// ✅ Thread-safe automático, limpio, con backpressure
await channel.Writer.WriteAsync(command);
Enter fullscreen mode Exit fullscreen mode

Casos de Uso Reales con Channels

1. Cola de Emails/Notificaciones

// Microsoft recomienda Bounded para prevenir OutOfMemory
var emailChannel = Channel.CreateBounded<EmailMessage>(
    new BoundedChannelOptions(1000)
    {
        FullMode = BoundedChannelFullMode.Wait
    });
// API recibe requests → Encola en Channel → Background envía emails en lotes
Enter fullscreen mode Exit fullscreen mode

2. Procesamiento de Imágenes

Channel<ImageProcessingJob> imageChannel;
// Upload de imágenes → Channel → Worker redimensiona/optimiza en background
Enter fullscreen mode Exit fullscreen mode

3. Logs Centralizados

// DropOldest para logs: si está lleno, descarta los más antiguos
var logChannel = Channel.CreateBounded<LogEntry>(
    new BoundedChannelOptions(5000)
    {
        FullMode = BoundedChannelFullMode.DropOldest
    });
Enter fullscreen mode Exit fullscreen mode

4. Rate Limiting / Throttling

var boundedChannel = Channel.CreateBounded<Request>(100);
// Limita a 100 requests concurrentes, el resto espera (backpressure)
Enter fullscreen mode Exit fullscreen mode

5. Event Sourcing Interno

Channel<DomainEvent> eventChannel;
// Eventos de dominio → Channel → Múltiples handlers procesan en paralelo
Enter fullscreen mode Exit fullscreen mode

6. Pipeline de Datos (ejemplo oficial de Microsoft)

// Patrón de processing pipeline con múltiples stages
Channel<RawData> inputChannel;
Channel<ProcessedData> outputChannel;
// Stage 1: Raw → Validated → Stage 2: Validated → Enriched
Enter fullscreen mode Exit fullscreen mode

Usa Channels cuando:

  • ✅ Comunicación dentro de la misma aplicación
  • ✅ Necesitas alta performance y bajo latency
  • ✅ Quieres simplicidad sin infraestructura externa
  • ✅ Trabajas con async/await
  • ✅ Necesitas backpressure automático

Usa Queue externo (RabbitMQ/Azure Service Bus) cuando:

  • ❌ Necesitas comunicación entre múltiples aplicaciones/servicios
  • ❌ Requieres persistencia de mensajes
  • ❌ Necesitas escalabilidad horizontal
  • ❌ Requieres garantías de entrega (at-least-once, exactly-once)

¿Por qué usar Channels?

A lo largo de este tutorial, hemos visto cómo Channels ofrece ventajas significativas:

  1. Simplicidad: No necesitas infraestructura externa para empezar
  2. Performance: Diseñados desde cero para async/await con ValueTask
  3. Seguridad: Thread-safe por diseño, sin preocupaciones por race conditions
  4. Control: Backpressure automático previene sobrecarga del sistema
  5. Flexibilidad: Configuración granular según tus necesidades específicas

Impacto en tu arquitectura

Este patrón es especialmente valioso cuando:

  • Estás construyendo aplicaciones monolíticas modernas que necesitan procesamiento asíncrono
  • Quieres reducir costos de infraestructura eliminando dependencias de message brokers
  • Necesitas optimizar performance con procesamiento en memoria
  • Buscas simplicidad operacional sin sacrificar escalabilidad vertical

Los Channels de .NET demuestran que no siempre necesitas herramientas complejas para resolver problemas complejos. A veces, la solución más elegante es la que viene incorporada en tu framework.

Próximos Pasos

¿Listo para llevar este conocimiento al siguiente nivel? Aquí tienes algunas ideas para expandir este proyecto:

1. Implementar Múltiples Consumidores

// Escalar procesamiento con múltiples workers
builder.Services.AddHostedService<JobProcessor>(); // Worker 1
builder.Services.AddHostedService<JobProcessor>(); // Worker 2
builder.Services.AddHostedService<JobProcessor>(); // Worker 3
Enter fullscreen mode Exit fullscreen mode

Aprenderás: Paralelización, distribución de carga, sincronización entre workers

2. Integrar Observabilidad

// Métricas con System.Diagnostics.Metrics
var meter = new Meter("BackgroundJobs");
var jobsProcessed = meter.CreateCounter<long>("jobs_processed");
var queueDepth = meter.CreateObservableGauge("queue_depth", 
    () => channel.Reader.Count);
Enter fullscreen mode Exit fullscreen mode

Aprenderás: OpenTelemetry, métricas personalizadas, dashboards con Grafana/Prometheus

3. Implementar Persistencia

// Guardar estado en caso de restart
public class PersistentJobProcessor : BackgroundService
{
    private readonly IJobStateRepository _repository;

    protected override async Task ExecuteAsync(CancellationToken token)
    {
        // Recuperar jobs pendientes al iniciar
        await _repository.RestorePendingJobsAsync(token);
        // ... continuar procesamiento normal
    }
}
Enter fullscreen mode Exit fullscreen mode

Aprenderás: State management, recovery strategies, durabilidad

4. Agregar Pipeline de Procesamiento

// Pipeline multi-etapa
var rawChannel = Channel.CreateBounded<RawData>(100);
var validatedChannel = Channel.CreateBounded<ValidatedData>(100);
var enrichedChannel = Channel.CreateBounded<EnrichedData>(100);

// Stage 1: Validación
builder.Services.AddHostedService<ValidationProcessor>();
// Stage 2: Enriquecimiento
builder.Services.AddHostedService<EnrichmentProcessor>();
// Stage 3: Persistencia
builder.Services.AddHostedService<PersistenceProcessor>();
Enter fullscreen mode Exit fullscreen mode

Aprenderás: Pipeline pattern, ETL processes, data transformation

5. Implementar Rate Limiting Avanzado

// Rate limiter con ventanas deslizantes
public class RateLimitedJobProcessor : BackgroundService
{
    private readonly RateLimiter _rateLimiter;

    protected override async Task ExecuteAsync(CancellationToken token)
    {
        while (await _channel.Reader.WaitToReadAsync(token))
        {
            using var lease = await _rateLimiter.AcquireAsync(1, token);
            if (lease.IsAcquired)
            {
                // Procesar job
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Aprenderás: Rate limiting patterns, token bucket, leaky bucket

6. Crear Dashboard de Monitoreo

// SignalR para updates en tiempo real
builder.Services.AddSignalR();

// Notificar estado a clientes conectados
await _hubContext.Clients.All.SendAsync("JobStatusUpdate", new {
    QueueDepth = channel.Reader.Count,
    ProcessingRate = jobsPerSecond,
    ActiveJobs = activeJobCount
});
Enter fullscreen mode Exit fullscreen mode

Aprenderás: Real-time updates, SignalR, live dashboards

7. Añadir Resiliencia

// Polly para retry policies
var retryPolicy = Policy
    .Handle<Exception>()
    .WaitAndRetryAsync(3, retryAttempt => 
        TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));

await retryPolicy.ExecuteAsync(async () => 
{
    await ProcessJobAsync(command);
});
Enter fullscreen mode Exit fullscreen mode

Aprenderás: Retry patterns, circuit breakers, fallback strategies

8. Implementar Health Checks

// Health check para el channel
builder.Services.AddHealthChecks()
    .AddCheck<ChannelHealthCheck>("channel_health")
    .AddCheck<JobProcessorHealthCheck>("job_processor_health");

public class ChannelHealthCheck : IHealthCheck
{
    public Task<HealthCheckResult> CheckHealthAsync()
    {
        var queueDepth = _channel.Reader.Count;
        return queueDepth < 1000 
            ? HealthCheckResult.Healthy() 
            : HealthCheckResult.Degraded("Queue depth high");
    }
}
Enter fullscreen mode Exit fullscreen mode

Aprenderás: Health monitoring, readiness/liveness probes, Kubernetes integration

9. Migrar a Arquitectura Distribuida

// Cuando crezcas más allá de un solo servidor
// Considera migrar a:
// - Azure Service Bus para messaging distribuido
// - Azure Queue Storage para simplicidad y bajo costo
// - RabbitMQ para control total
// - Redis Streams para alta performance

// Mantén la misma interfaz, cambia la implementación
public interface IJobQueue
{
    Task EnqueueAsync(JobCommand command);
    Task<JobCommand> DequeueAsync(CancellationToken token);
}

// Implementación con Channels (actual)
public class InMemoryJobQueue : IJobQueue { }

// Implementación con Azure Service Bus (futuro)
public class ServiceBusJobQueue : IJobQueue { }
Enter fullscreen mode Exit fullscreen mode

Aprenderás: Estrategias de migración, abstracciones, arquitectura evolutiva

Recursos para Continuar Aprendiendo

Documentación Oficial:

Artículos Avanzados:

Top comments (0)