How I Built a Generic Kafka Consumer in .NET with Strategy Pattern
Ever found yourself writing a separate Kafka consumer for every single topic? Yeah… me too. 😅
When I started working on our project, we had this pattern: one consumer per topic. Sounds simple? Well, it quickly became a nightmare:
- Code duplication everywhere
- Hard to maintain
- Adding a new topic meant touching multiple files
So I decided to tackle this problem and make one consumer to rule them all – generic, modular, and scalable. Here’s how I did it using Strategy Pattern in .NET.
The Problem
Imagine having a consumer that only handles "createEntityType" topic:
consumer.Subscribe(new[] { "createEntityType" });
var createEntityRequest = JsonSerializer.Deserialize<CreateEntityTypeRequest>(value);
var schemaId = await entityService.createNewEntityType(createEntityRequest);
await dynamicService.CreateDynamicTable(createEntityRequest);
This works… until you add the next topic. Or the one after that. Suddenly, every new topic is more boilerplate, more headaches.
The Solution: Generic Kafka Consumer
Instead of writing a consumer per topic, I refactored the code so that:
- The consumer discovers handlers dynamically using KafkaTopicAttribute
- Each handler implements the ICustomKafkaHandler interface
- Adding a new topic = add a handler class + attribute → nothing else changes
ConsumerWorker Example
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var topics = _topicHandlers.Keys.ToArray();
using var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
consumer.Subscribe(topics);
while (!stoppingToken.IsCancellationRequested)
{
var cr = consumer.Consume(stoppingToken);
using var scope = _scopeFactory.CreateScope();
var provider = scope.ServiceProvider;
if (_topicHandlers.TryGetValue(cr.Topic, out var handlerType))
{
var handler = (ICustomKafkaHandler)provider.GetRequiredService(handlerType);
var response = await handler.HandleAsync(cr.Message.Value, cr.Message.Headers, provider);
}
}
}
Handler Example: AddPropertyHandler
[KafkaTopic("addProperty")]
public class AddPropertyHandler : ICustomKafkaHandler
{
public async Task<object> HandleAsync(string message, Headers headers, IServiceProvider provider)
{
var request = JsonSerializer.Deserialize<AddFieldRequest>(message);
var service = provider.GetRequiredService<AddFieldService>();
return await service.AddFieldToSchema(request);
}
}
KafkaTopicAttribute & Interface
[AttributeUsage(AttributeTargets.Class)]
public class KafkaTopicAttribute : Attribute
{
public string Topic { get; }
public KafkaTopicAttribute(string topic) => Topic = topic;
}
public interface ICustomKafkaHandler
{
Task<object> HandleAsync(string message, Headers headers, IServiceProvider provider);
}
How It Works
- ConsumerWorker builds a map: Topic → Handler using KafkaTopicAttribute.
- Each handler implements ICustomKafkaHandler and handles messages for its topic.
- Dependency Injection (DI) injects required services directly into the handler.
- Want to add a new topic? Just add a new handler class with the attribute. No changes to ConsumerWorker.
Why This Approach Rocks
✅ Maintainable– Add new topics without touching the core consumer
✅ Modular– Each handler is responsible only for its topic
✅ Scalable– Works with dozens of topics with minimal effort
✅ Error handling – Each handler manages its own errors
✅ DI-friendly – Services are injected automatically
Lessons Learned
- Strategy Pattern + DI is a perfect combo for generic Kafka consumers
- Reflection + attributes = dynamic discovery without hardcoding
- Logging and consistent error handling per handler is crucial
- Separation of Consumer + Handlers = modularity & extensibility
TL;DR
Generic Kafka consumers in .NET save you from boilerplate, make your code modular, and make it easy to extend and maintain.
💡 Tip: This approach works best when you have multiple topics with similar patterns and want to centralize error handling, logging, and service injection.
Top comments (0)