DEV Community

Tehila F.
Tehila F.

Posted on

Stop Writing a Kafka Consumer for Every Topic – Make It Generic in .NET

How I Built a Generic Kafka Consumer in .NET with Strategy Pattern

Ever found yourself writing a separate Kafka consumer for every single topic? Yeah… me too. 😅

When I started working on our project, we had this pattern: one consumer per topic. Sounds simple? Well, it quickly became a nightmare:

  • Code duplication everywhere
  • Hard to maintain
  • Adding a new topic meant touching multiple files

So I decided to tackle this problem and make one consumer to rule them all – generic, modular, and scalable. Here’s how I did it using Strategy Pattern in .NET.

 

The Problem
Imagine having a consumer that only handles "createEntityType" topic:

consumer.Subscribe(new[] { "createEntityType" });
var createEntityRequest = JsonSerializer.Deserialize<CreateEntityTypeRequest>(value);
var schemaId = await entityService.createNewEntityType(createEntityRequest);
await dynamicService.CreateDynamicTable(createEntityRequest);
Enter fullscreen mode Exit fullscreen mode

This works… until you add the next topic. Or the one after that. Suddenly, every new topic is more boilerplate, more headaches.

 

The Solution: Generic Kafka Consumer

Instead of writing a consumer per topic, I refactored the code so that:

  • The consumer discovers handlers dynamically using KafkaTopicAttribute
  • Each handler implements the ICustomKafkaHandler interface
  • Adding a new topic = add a handler class + attribute → nothing else changes

 

ConsumerWorker Example

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    var topics = _topicHandlers.Keys.ToArray();
    using var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
    consumer.Subscribe(topics);

    while (!stoppingToken.IsCancellationRequested)
    {
        var cr = consumer.Consume(stoppingToken);
        using var scope = _scopeFactory.CreateScope();
        var provider = scope.ServiceProvider;

        if (_topicHandlers.TryGetValue(cr.Topic, out var handlerType))
        {
            var handler = (ICustomKafkaHandler)provider.GetRequiredService(handlerType);
            var response = await handler.HandleAsync(cr.Message.Value, cr.Message.Headers, provider);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

 

Handler Example: AddPropertyHandler

[KafkaTopic("addProperty")]
public class AddPropertyHandler : ICustomKafkaHandler
{
    public async Task<object> HandleAsync(string message, Headers headers, IServiceProvider provider)
    {
        var request = JsonSerializer.Deserialize<AddFieldRequest>(message);
        var service = provider.GetRequiredService<AddFieldService>();
        return await service.AddFieldToSchema(request);
    }
}
Enter fullscreen mode Exit fullscreen mode

 

KafkaTopicAttribute & Interface

[AttributeUsage(AttributeTargets.Class)]
public class KafkaTopicAttribute : Attribute
{
    public string Topic { get; }
    public KafkaTopicAttribute(string topic) => Topic = topic;
}

public interface ICustomKafkaHandler
{
    Task<object> HandleAsync(string message, Headers headers, IServiceProvider provider);
}
Enter fullscreen mode Exit fullscreen mode

 

How It Works

  1. ConsumerWorker builds a map: Topic → Handler using KafkaTopicAttribute.
  2. Each handler implements ICustomKafkaHandler and handles messages for its topic.
  3. Dependency Injection (DI) injects required services directly into the handler.
  4. Want to add a new topic? Just add a new handler class with the attribute. No changes to ConsumerWorker.

 

Why This Approach Rocks

Maintainable– Add new topics without touching the core consumer
Modular– Each handler is responsible only for its topic
Scalable– Works with dozens of topics with minimal effort
Error handling – Each handler manages its own errors
DI-friendly – Services are injected automatically

 

Lessons Learned

  • Strategy Pattern + DI is a perfect combo for generic Kafka consumers
  • Reflection + attributes = dynamic discovery without hardcoding
  • Logging and consistent error handling per handler is crucial
  • Separation of Consumer + Handlers = modularity & extensibility

 

TL;DR

Generic Kafka consumers in .NET save you from boilerplate, make your code modular, and make it easy to extend and maintain.
 

💡 Tip: This approach works best when you have multiple topics with similar patterns and want to centralize error handling, logging, and service injection.

Top comments (0)