DEV Community

Dinesh Dunukedeniya
Dinesh Dunukedeniya

Posted on • Edited on

Mediator Pattern in Event-Driven Architecture (EDA)

In Event-Driven Architecture (EDA), microservices communicate by publishing and subscribing to events, enabling loose coupling and asynchronous processing. However, as systems grow in complexity, managing interactions across numerous services can become challenging.

The Mediator Pattern introduces an intermediary component — the mediator — that manages and orchestrates this communication. Rather than having services directly consume and respond to each other’s events, a dedicated mediator service listens to incoming events, applies business rules, and then routes or transforms messages as needed before passing them on to downstream services.

This pattern is sometimes referred to as the orchestration pattern or a central workflow engine, where a single component controls and coordinates the interactions between services.

This pattern is particularly useful in complex business workflows involving multiple microservices. By centralizing coordination logic within the mediator, you gain better control over the execution flow, simplify service responsibilities, reduce inter-service dependencies, and improve overall system maintainability.


Mediator Pattern


When to Use the Mediator Pattern

✅ Complex workflows requiring sequencing and coordination.Especially useful in implementing the Saga Pattern or multi-step business processes involving multiple services

✅ Centralized logging, monitoring, and tracing. Gain visibility into the entire workflow execution from a single place, useful for debugging and observability.

✅ Reducing point-to-point event subscriptions between services
Avoid a tangled web of services subscribing to each other’s events, which becomes hard to manage and evolve.

✅ Business rules that span multiple services. Keep cross-cutting concerns (e.g., validation, routing decisions) in one place rather than duplicating them.

✅ Dynamic or conditional logic between steps. When the path of execution depends on business logic or the results of previous steps (e.g., retry logic, fallback strategies).

✅ Orchestrated error handling and compensations.Centralize retry strategies, failure handling, and compensating actions in distributed transactions.

✅ Auditing and compliance. A mediator can capture and persist event flows required for regulatory purposes.

Avoid

⚠️ Avoid if your system has simple, independent events.

⚠️ High-throughput, low-latency systems. Mediators can introduce bottlenecks or delays if not designed with performance in mind.

⚠️ Use cases better suited to choreography. For loosely coupled systems where services can autonomously react to events, a pure event-driven approach with no central orchestrator may be more appropriate.

Implementing the Mediator Pattern in .NET Microservices with Kafka

Let’s walk through how to implement the Mediator Pattern using Kafka in a .NET-based microservices architecture. In our setup, all microservices are web apps, meaning that to consume Kafka events, we need to use background services (hosted services) within each app to listen and process messages.

Architecture Overview

Microservices Involved:

  • Client Web App
    Initiates the workflow by publishing OrderPlaced events.

  • Mediator Service
    Acts as the central orchestrator. It reacts to events, applies business rules, and triggers the next step by publishing new events.

  • Inventory Service
    Listens for ReserveInventory events and responds with either InventoryReserved or InventoryReservationFailed.

  • Payment Service
    Listens for ProcessPayment events and responds with PaymentProcessed or PaymentFailed.

  • Shipping Service
    Listens for OrderConfirmed events and proceeds with shipping logistics.

Design Principles

  • Decoupling: Services do not communicate directly; all communication flows through Kafka topics.

  • Orchestration: Only the Mediator knows the full business workflow. Each other service handles only its own concern.

  • Background Tasks: Since all services are ASP.NET Core web apps, Kafka consumers are implemented using BackgroundService.

Each Microservice Includes:

  • Kafka Consumer Background Task
    Subscribes to relevant topics and handles incoming events.

  • Kafka Producer Service
    Sends events to Kafka topics to trigger actions in other services.

  • Controller/Endpoints (if needed)
    Mostly for internal APIs or admin/debug access. Not used for orchestration logic.

1. Domain Models

public record OrderPlaced(Guid OrderId, List<string> Items, PaymentInfo Payment);
public record ReserveInventory(Guid OrderId, List<string> Items);
public record InventoryReserved(Guid OrderId);
public record InventoryReservationFailed(Guid OrderId, string Reason);
public record ProcessPayment(Guid OrderId, PaymentInfo Payment);
public record PaymentProcessed(Guid OrderId);
public record PaymentFailed(Guid OrderId, string Reason);
public record OrderConfirmed(Guid OrderId);

public record PaymentInfo(string CardNumber, decimal Amount);
Enter fullscreen mode Exit fullscreen mode

2. Kafka Producer Helper

public class KafkaProducer
{
    private readonly IProducer<string, string> _producer;

    public KafkaProducer(string bootstrapServers)
    {
        var config = new ProducerConfig { BootstrapServers = bootstrapServers };
        _producer = new ProducerBuilder<string, string>(config).Build();
    }

    public async Task ProduceAsync<T>(string topic, T message)
    {
        var json = JsonConvert.SerializeObject(message);
        await _producer.ProduceAsync(topic, new Message<string, string> { Key = Guid.NewGuid().ToString(), Value = json });
    }
}
Enter fullscreen mode Exit fullscreen mode

3. Mediator Service Background Consumers

OrderPlacedConsumer.cs

public class OrderPlacedConsumer : BackgroundService
{
    private readonly IConsumer<string, string> _consumer;
    private readonly KafkaProducer _producer;
    private readonly ILogger<OrderPlacedConsumer> _logger;

    public OrderPlacedConsumer(KafkaProducer producer, ILogger<OrderPlacedConsumer> logger)
    {
        _producer = producer;
        _logger = logger;

        var config = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092",
            GroupId = "mediator-group",
            AutoOffsetReset = AutoOffsetReset.Earliest
        };
        _consumer = new ConsumerBuilder<string, string>(config).Build();
        _consumer.Subscribe("OrderPlaced");
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        return Task.Run(async () =>
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                var cr = _consumer.Consume(stoppingToken);
                var orderPlaced = JsonConvert.DeserializeObject<OrderPlaced>(cr.Message.Value);
                _logger.LogInformation($"OrderPlaced received: {orderPlaced.OrderId}");

                // Orchestration step: send ReserveInventory event
                var reserveInventory = new ReserveInventory(orderPlaced.OrderId, orderPlaced.Items);
                await _producer.ProduceAsync("ReserveInventory", reserveInventory);
            }
        }, stoppingToken);
    }
}
Enter fullscreen mode Exit fullscreen mode

InventoryResultConsumer.cs

public class InventoryResultConsumer : BackgroundService
{
    private readonly IConsumer<string, string> _consumer;
    private readonly KafkaProducer _producer;
    private readonly ILogger<InventoryResultConsumer> _logger;

    public InventoryResultConsumer(KafkaProducer producer, ILogger<InventoryResultConsumer> logger)
    {
        _producer = producer;
        _logger = logger;

        var config = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092",
            GroupId = "mediator-group",
            AutoOffsetReset = AutoOffsetReset.Earliest
        };
        _consumer = new ConsumerBuilder<string, string>(config).Build();
        _consumer.Subscribe(new[] { "InventoryReserved", "InventoryReservationFailed" });
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        return Task.Run(async () =>
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                var cr = _consumer.Consume(stoppingToken);
                var message = cr.Message.Value;

                if (message.Contains("InventoryReservationFailed"))
                {
                    var fail = JsonConvert.DeserializeObject<InventoryReservationFailed>(message);
                    _logger.LogWarning($"Inventory reservation failed for Order {fail.OrderId}: {fail.Reason}");
                    // Handle failure (notify, rollback, etc.)
                }
                else
                {
                    var reserved = JsonConvert.DeserializeObject<InventoryReserved>(message);
                    _logger.LogInformation($"Inventory reserved for Order {reserved.OrderId}");

                    // Retrieve payment info from some store (simulate here)
                    var paymentInfo = new PaymentInfo("1234-5678-9012-3456", 100m);

                    var processPayment = new ProcessPayment(reserved.OrderId, paymentInfo);
                    await _producer.ProduceAsync("ProcessPayment", processPayment);
                }
            }
        }, stoppingToken);
    }
}
Enter fullscreen mode Exit fullscreen mode

PaymentResultConsumer.cs

public class PaymentResultConsumer : BackgroundService
{
    private readonly IConsumer<string, string> _consumer;
    private readonly KafkaProducer _producer;
    private readonly ILogger<PaymentResultConsumer> _logger;

    public PaymentResultConsumer(KafkaProducer producer, ILogger<PaymentResultConsumer> logger)
    {
        _producer = producer;
        _logger = logger;

        var config = new ConsumerConfig
        {
            BootstrapServers = "localhost:9092",
            GroupId = "mediator-group",
            AutoOffsetReset = AutoOffsetReset.Earliest
        };
        _consumer = new ConsumerBuilder<string, string>(config).Build();
        _consumer.Subscribe(new[] { "PaymentProcessed", "PaymentFailed" });
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        return Task.Run(async () =>
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                var cr = _consumer.Consume(stoppingToken);
                var message = cr.Message.Value;

                if (message.Contains("PaymentFailed"))
                {
                    var fail = JsonConvert.DeserializeObject<PaymentFailed>(message);
                    _logger.LogWarning($"Payment failed for Order {fail.OrderId}: {fail.Reason}");
                    // Handle failure (e.g., rollback inventory)
                }
                else
                {
                    var processed = JsonConvert.DeserializeObject<PaymentProcessed>(message);
                    _logger.LogInformation($"Payment processed for Order {processed.OrderId}");

                    var orderConfirmed = new OrderConfirmed(processed.OrderId);
                    await _producer.ProduceAsync("OrderConfirmed", orderConfirmed);
                }
            }
        }, stoppingToken);
    }
}
Enter fullscreen mode Exit fullscreen mode

4. Program.cs (Register services and run)

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddSingleton(new KafkaProducer("localhost:9092"));
builder.Services.AddHostedService<OrderPlacedConsumer>();
builder.Services.AddHostedService<InventoryResultConsumer>();
builder.Services.AddHostedService<PaymentResultConsumer>();

var app = builder.Build();
app.Run();
Enter fullscreen mode Exit fullscreen mode

Summary

  • The Mediator service has three background consumers, each processing different event types
  • Each consumer applies the orchestration logic and triggers the next step by producing the next event
  • This keeps your microservices loosely coupled while having a central place for business workflow orchestration
  • You can extend this with state management (e.g., Redis, DB) for reliability and retries.

Top comments (0)