DEV Community

ZèD
ZèD

Posted on

Implementing Event-Driven Order Processing with Kafka using KafkaFlow in .NET

In larger applications, producers (publishing events from command handlers / services) and consumers (processing events in background workers) belong to different deployment units or have different scaling needs. KafkaFlow supports this cleanly by allowing separate cluster registrations or isolated configuration slices.

This article shows a minimal, production-grade pattern: producer-only in API / command-side services and consumer-only in background / worker projects — both reading the same configuration section but registering only what they need.

Configuration Model (shared)

public class KafkaFlowConfiguration
{
    public string ServerUrl { get; set; } = string.Empty;

    public OrderConfig Orders { get; set; } = new();
}

public class OrderConfig
{
    public string TopicName { get; set; } = string.Empty;
    public string ConsumerGroup { get; set; } = string.Empty;
    public string ConsumerName { get; set; } = string.Empty;
}
Enter fullscreen mode Exit fullscreen mode

Producer-Only Extension (use in API / command services)

public static class KafkaFlowProducerExtensions
{
    public static IServiceCollection AddKafkaFlowProducer(
        this IServiceCollection services,
        IConfiguration configuration)
    {
        services.Configure<KafkaFlowConfiguration>(
            configuration.GetSection("KafkaConfiguration"));

        var config = configuration
            .GetSection("KafkaConfiguration")
            .Get<KafkaFlowConfiguration>();

        services.AddKafka(kafka => kafka
            .AddCluster(cluster => cluster
                .WithBrokers([config.ServerUrl])
                .CreateTopicIfNotExists(config.Orders.TopicName, 10, 1)
                .AddProducer("order-producer", p => 
                    p.DefaultTopic(config.Orders.TopicName)))
        );

        return services;
    }
}
Enter fullscreen mode Exit fullscreen mode

Consumer-Only Extension (use in worker / processor projects)

public static class KafkaFlowConsumerExtensions
{
    public static IServiceCollection AddKafkaFlowConsumer(
        this IServiceCollection services,
        IConfiguration configuration)
    {
        services.Configure<KafkaFlowConfiguration>(
            configuration.GetSection("KafkaConfiguration"));

        var config = configuration
            .GetSection("KafkaConfiguration")
            .Get<KafkaFlowConfiguration>();

        services.AddKafka(kafka => kafka
            .AddCluster(cluster => cluster
                .WithBrokers([config.ServerUrl])
                .CreateTopicIfNotExists(config.Orders.TopicName, 10, 1)
                .AddConsumer(consumer => consumer
                    .Topic(config.Orders.TopicName)
                    .WithGroupId(config.Orders.ConsumerGroup)
                    .WithName(config.Orders.ConsumerName)
                    .WithBufferSize(10)
                    .WithConsumerLagWorkerBalancer(maxWorkers: 100, minWorkers: 10, lagThreshold: 25)
                    .WithWorkerDistributionStrategy<FreeWorkerDistributionStrategy>()
                    .AddMiddlewares(middlewares => middlewares
                        .AddTypedHandlers(handlers => handlers
                            .AddHandler<OrderCreatedHandler>()
                            .AddHandler<OrderPaidHandler>()
                            .AddHandler<OrderShippedHandler>()))))
        );

        return services;
    }
}
Enter fullscreen mode Exit fullscreen mode

Shared appsettings.json (or separate overrides per project)

{
  "KafkaConfiguration": {
    "ServerUrl": "kafka-1:9092,kafka-2:9092",
    "Orders": {
      "TopicName": "orders",
      "ConsumerGroup": "order-processing",
      "ConsumerName": "order-events-handler"
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Domain Events

public record OrderCreated(
    Guid OrderId,
    Guid CustomerId,
    decimal TotalAmount,
    DateTime CreatedAt);

public record OrderPaid(
    Guid OrderId,
    string PaymentId,
    DateTime PaidAt);

public record OrderShipped(
    Guid OrderId,
    string TrackingNumber,
    DateTime ShippedAt);
Enter fullscreen mode Exit fullscreen mode

Typed Handlers (consumer side)

public class OrderCreatedHandler : IMessageHandler<OrderCreated>
{
    private readonly IInventoryService _inventory;

    public OrderCreatedHandler(IInventoryService inventory)
    {
        _inventory = inventory;
    }

    public Task Handle(OrderCreated message, CancellationToken cancellationToken)
    {
        return _inventory.ReserveStockAsync(message.OrderId);
    }
}

public class OrderPaidHandler : IMessageHandler<OrderPaid>
{
    private readonly IShippingService _shipping;

    public OrderPaidHandler(IShippingService shipping)
    {
        _shipping = shipping;
    }

    public Task Handle(OrderPaid message, CancellationToken cancellationToken)
    {
        return _shipping.InitiateAsync(message.OrderId);
    }
}

public class OrderShippedHandler : IMessageHandler<OrderShipped>
{
    private readonly INotificationService _notification;

    public OrderShippedHandler(INotificationService notification)
    {
        _notification = notification;
    }

    public Task Handle(OrderShipped message, CancellationToken cancellationToken)
    {
        return _notification.SendOrderUpdateAsync(message.OrderId, "shipped");
    }
}
Enter fullscreen mode Exit fullscreen mode

Producer Usage (command / API side)

public class OrderEventPublisher
{
    private readonly IMessageProducer _producer;

    public OrderEventPublisher(IMessageProducer producer)
    {
        _producer = producer;
    }

    public Task PublishAsync<T>(T evt) where T : class
    {
        var message = new Message<string, T>
        {
            Key = evt switch
            {
                { OrderId: var id } => id.ToString(),
                _ => Guid.NewGuid().ToString()
            },
            Value = evt
        };

        return _producer.ProduceAsync(message);
    }
}
Enter fullscreen mode Exit fullscreen mode

Registration Examples

API / Command project → Program.cs

builder.Services.AddKafkaFlowProducer(builder.Configuration);
Enter fullscreen mode Exit fullscreen mode

Worker / Processor project → Program.cs

builder.Services.AddKafkaFlowConsumer(builder.Configuration);
Enter fullscreen mode Exit fullscreen mode

Key Benefits of Separation

  • Independent scaling — scale producers horizontally with API instances, consumers vertically or separately
  • Different resource profiles — producers are lightweight, consumers can be memory/CPU heavy
  • Clear boundaries — no accidental consumer registration in web apps
  • Shared config — single source of truth for brokers/topics
  • Topic auto-creation — safe from both sides (idempotent)

Extend with serializers, retry policies, or schema registry independently on each side.

Top comments (0)