DEV Community

Aleksei Popov
Aleksei Popov

Posted on • Updated on

Implementing Horizontally Scalable Transactional Outbox with .NET 8 and Kafka: A Practical Guide

Introduction

In the world of distributed systems, reliable and performant messaging is key to the scalability of such systems.

One of the most intricate challenges is the dual write problem.

In this article, I'm going to provide a practical guide for implementing a transactional outbox solution to achieve 'at least once' guarantees, which can be horizontally scaled across multiple instances of the application.

Before we dive into the implementation, let's quickly review what the dual write problem is and the challenges it poses.

Additionally, if you're interested in learning more about achieving reliable messaging in a distributed system, welcome to my article about it on hackernoon https://hackernoon.com/reliable-messaging-in-distributed-systems

Dual write problem

To better explain the dual write problem, let's take a look at the following diagrams.

Imagine we perform an operation by receiving an API request, storing data in the database, and sending events to Kafka to notify other systems about certain events.

If we first commit the transaction and then send events to Kafka, we could find ourselves in a situation where messages can be lost due to potential network issues, Kafka cluster outages, or even a bug in the code.

Image description

In the opposite scenario, where we first send messages and then commit the transaction, we can find ourselves in a situation of data inconsistency. This can occur if, for some reason, the database transaction fails and we have already notified other systems about events that did not actually happen. This may lead to system failures and serious consequences.

Image description

So, in critical business scenarios, losing messages could harm the business. For instance, this is particularly true in transactions and payment processing, which can be very sensetive.

Transactional Outbox

To tackle this problem, the transactional outbox approach can be leveraged. It requires conducting business operations and storing events in an atomic way. To achieve this, a special outbox table can be created to persist messages that occurred during the transaction. Then, by using a special Relay component, data can be transmitted to Kafka. This approach helps to achieve 'at least once' guarantees without losing messages. Why 'at least once'? Because receiving responses from Kafka or updating the status of sent messages could fail, which would lead to retries.

Image description

Let us quickly take a look at Kafka concepts before proceeding to implementation.

Kafka

Kafka is a distributed streaming platform that was originally developed by LinkedIn and later open-sourced as a part of the Apache Software Foundation. It's designed to handle high volumes of data in real-time, making it a powerful tool for building data pipelines, integrating systems, and managing streaming data.

Kafka operates on the concept of topics, which are essentially categories or feeds to which records are published. Producers publish data to topics, and consumers subscribe to topics to read the data. This architecture allows for high throughput and scalability, enabling Kafka to process and store streams of data from multiple sources efficiently.

There are the key conceptions you need to know.

Image description

Topics
A topic is like a category or a box where you store messages. You can have different topics for different types of messages. For example, one topic for customer orders and another for shipping notifications.

Partitions
A partition is a subdivision of a topic, like dividing a big box (topic) into smaller sections (partitions) to organise messages better. This helps in processing messages more efficiently by allowing multiple messages to be read and written in parallel.

Consumers
A consumer is like someone who takes messages from the box (topic). Consumers read messages from topics to process them. For example, a service that reads shipping notifications and then updates a tracking system.

Producers
A producer is like someone who puts messages into the box (topic). Producers send messages to topics. For example, an online store system that sends a message every time an order is placed.

Consumer Groups
A consumer group is a team working together to read messages from a topic. The group ensures that each consumer handles messages from a different partition so they don't read the same message twice. It's like dividing the work of checking a big box's sections among several people, where each person is responsible for one section.

Image description

Broker
A broker in Kafka is like a server or a librarian in a library. It manages the storage and handling of messages within topics. Just as a librarian organises books and keeps track of who borrows what, a broker stores messages and manages their distribution to consumers. If Kafka is a library, then each broker is a bookshelf in that library.

Cluster
A cluster in Kafka is a group of brokers working together. Imagine a library with multiple bookshelves; the library is the cluster, and each bookshelf is a broker. This setup allows Kafka to handle more messages and serve more consumers by spreading the load across multiple servers (brokers). A cluster ensures that if one server (broker) has an issue, the others can take over, keeping the system running smoothly.

Implementation

You can find the whole implementation by this link

First, as mentioned above, we need to create an Outbox table to persist messages.

Define Outbox table

CREATE TABLE Outbox(
Id                      uniqueidentifier not null default newsequentialid() PRIMARY KEY,
DateTimestamp           datetimeoffset(7) not null default SYSDATETIMEOFFSET(),
RawData                 NVARCHAR(MAX) not null,
MessageType             NVARCHAR(255) not null,
Topic                   NVARCHAR(255) not null,
PartitionBy             NVARCHAR(255) null,
IsProcessed             INT DEFAULT 0,
IsSequential            INT DEFAULT 0,
Metadata                NVARCHAR(MAX) null,
ReservedAt              datetimeoffset(7) null,
ExpiredAt               datetimeoffset(7) null,
IsProcessing            INT DEFAULT 0
Enter fullscreen mode Exit fullscreen mode

RawData: JSON message representation.
MessageType: Name of the message type.
Topic: Kafka topic name.
PartitionBy: Partition key for the Kafka topic.
IsSequential: flag indicating that only a single processing relay node can pick up these messages.
Metadata: Custom metadata, such as operation ID, trace ID, etc.

Create IOutbox and IRelay abstractions

IOutbox performs key operations such as storing messages, reserving them for processing, marking them as processed, and deleting the processed ones

In the horizontal scalability section, I'm going to delve more deeply into the reserving API.

public interface IOutbox
{
    Task AddAsync<T>(T data, string topic, Func<T, string>? partitionBy, bool isSequential, Dictionary<string, string>? metadata, CancellationToken cancellationToken)
        where T : class;
    Task<ImmutableArray<OutboxRecord>> ReserveAsync(int top, TimeSpan reservationTimeout, CancellationToken cancellationToken);
    Task MarkAsProcessedAsync(ImmutableArray<OutboxRecord> data, CancellationToken cancellationToken);
    Task DeleteProcessedAsync(CancellationToken cancellationToken);
}
Enter fullscreen mode Exit fullscreen mode

And IRelay has two simple operations, Publish and Cleanup.

public interface IRelay
{      
    Task PublishAsync(CancellationToken cancellationToken);
    Task CleanupAsync(CancellationToken cancellationToken);
}
Enter fullscreen mode Exit fullscreen mode

Okay, now we have an abstraction for persisting and publishing new messages.

Serialization

It is better to encapsulate serialization details within an abstraction that will be used in both the Outbox and Kafka components.

public interface ISerializer
{
    string Serialize<T>(T data) where T : class;
    T Deserialize<T>(string data) where T : class;
}
Enter fullscreen mode Exit fullscreen mode

Transaction control

To address concurrency, I'm going to use optimistic concurrency control, which is provided with the snapshot isolation (technically it provides repeatable read safety) level of MS SQL Server.

I enabled it in the initial migration using the following lines.

migrationBuilder.Sql("ALTER DATABASE CURRENT SET ALLOW_SNAPSHOT_ISOLATION ON;", true);
migrationBuilder.Sql("ALTER DATABASE CURRENT SET READ_COMMITTED_SNAPSHOT ON;", true);
Enter fullscreen mode Exit fullscreen mode

Since we need to store new data and messages in an atomic way, we should open a transaction at the beginning of the operation and roll it back in case of any failure.

Let's define the IUnitOfWork interface for this purpose.

public interface IUnitOfWork
{
    IQueryable<T> Query<T>() where T : class;
    Task AddAsync<T>(T entity, CancellationToken cancellationToken) where T : class;
    Task AddRangeAsync<T>(IEnumerable<T> entities, CancellationToken cancellationToken) where T : class;
    void Remove<T>(T entity) where T : class;
    void RemoveRange<T>(IEnumerable<T> entities) where T : class;
    Task CommitAsync(CancellationToken cancellationToken);
    Task<IDbContextTransaction> BeginSnapshotTransactionAsync(CancellationToken cancellationToken);
    Task<IDbContextTransaction> BeginTransactionAsync(CancellationToken cancellationToken);
}
Enter fullscreen mode Exit fullscreen mode

To minimise the overhead introduced by ORMs, I'm going to utilise the lightweight ORM Dapper for core Outbox queries.

For instance, this is the AddAsync implementation, which primarily retrieves a transaction, serializes a new message, and stores new messages by using the same transaction.

public async Task AddAsync<T>(T data, string topic, Func<T, string>? partitionBy, bool isSequential, Dictionary<string, string>? metadata, CancellationToken cancellationToken) 
    where T : class
{
    var transaction = GetTransaction();
    var connection = transaction.Connection;
    var query = SqlQueriesReader.ReadWithCache(InsertInOutboxQueryName);
    var json = _serializer.Serialize(data);

    var commandDefinition = new CommandDefinition(query, new
    {
        RawData = json,
        MessageType = GetMessageTypeName(data),
        Topic = topic,
        PartitionBy = partitionBy?.Invoke(data) ?? null,
        IsSequential = isSequential,
        Metadata = metadata != null ? _serializer.Serialize(metadata) : null
    }, cancellationToken: cancellationToken, transaction: transaction);

    await connection.ExecuteAsync(commandDefinition);
}
Enter fullscreen mode Exit fullscreen mode

Also, for instance, in the PublishAsync method, a transaction is opened before making any changes.

Here, messages are first sent to Kafka, and only then are these messages marked as processed, which guarantees at least once delivery.

public async Task PublishAsync(CancellationToken cancellationToken)
{
    await using var transaction = await _unitOfWork.BeginSnapshotTransactionAsync(cancellationToken);

    try
    {
        var records = await _outbox.ReserveAsync(BatchSize, ReservationTimeout, cancellationToken);

        var builder = ImmutableArray.CreateBuilder<MessageEnvelope>();

        foreach (var record in records)
        {
            var message = new MessageEnvelope
            {
                PayloadType = record.MessageType,
                Payload = record.JsonRawData,
                Topic = record.Topic,
                Key = record.PartitionBy,
                Created = record.Timestamp,
                Metadata = record.Metadata
            };
            builder.Add(message);
        }

        await _kafkaMessageSender.SendAsync(builder.ToImmutable(), cancellationToken);

        await _outbox.MarkAsProcessedAsync(records, cancellationToken);
        await transaction.CommitAsync(cancellationToken);
    }
    catch
    {
        await transaction.RollbackAsync(cancellationToken);
        throw;
    }
}
Enter fullscreen mode Exit fullscreen mode

Kafka Producer

The Kafka production implementation is simple. To transfer data to consumers, the following message structure is used.

public sealed class MessageEnvelope
{
    public string Topic { get; init; } = default!;
    public string? Key { get; init; }
    public string Payload { get; init; } = default!;
    public string PayloadType { get; init; } = default!;
    public Dictionary<string, string>? Metadata { get; init; }
    public DateTimeOffset Created { get; init; }
}
Enter fullscreen mode Exit fullscreen mode

And it sends asynchronously using the synchronous version of Produce, which yields better performance results in high-load data transferring.

public Task SendAsync(ImmutableArray<MessageEnvelope> messages, CancellationToken cancellationToken)
    {
        foreach (var message in messages)
        {
            try
            {
                var kafkaPayload = _serializer.Serialize(message);

                _producer.Produce(
                    message.Topic,
                    new Message<string, string> { Key = message.Key ?? _defaultKey, Value = kafkaPayload, Headers = PrepareHeaders(message.Metadata, message.Created, message.PayloadType) }, report =>
                    {
                        if (report.Status != PersistenceStatus.Persisted)
                            //throw new Exception($"Failed to send message to Kafka, Id: {message.Id}, Topic: {topic}");
                            _logger.LogError("Failed kafka message producing with Key {Key}, Error: {error}", report.Message.Key, report.Error.Code);
                    });

                _logger.LogInformation("Message sent to Kafka, Id: {Id}, Topic: {Topic}", message.Key, message.Topic);
            }
            catch (ProduceException<Null, string> ex)
            {
                throw new Exception($"Failed to send message to Kafka, Id: {message.Key}, Topic: {message.Topic}", ex);
            }
        }

        _producer.Flush(cancellationToken);
        return Task.CompletedTask;
    }
}
Enter fullscreen mode Exit fullscreen mode

Kafka Consumer

We need to subscribe to a specific topic on the consumer side first.

using var consumer = new ConsumerBuilder<string, string>(_consumerConfig).Build();
consumer.Subscribe(_topic);
Enter fullscreen mode Exit fullscreen mode

It also improves performance by consuming messages in batch with the following hack.

public static IReadOnlyCollection<ConsumeResult<TKey, TValue>> ConsumeBatch<TKey, TValue>(
    this IConsumer<TKey, TValue> consumer, TimeSpan consumeTimeout, int maxBatchSize, CancellationToken stoppingToken)
{
    var message = consumer.Consume(consumeTimeout);

    if (message?.Message is null)
        return Array.Empty<ConsumeResult<TKey, TValue>>();

    var messageBatch = new List<ConsumeResult<TKey, TValue>> { message };

    while (messageBatch.Count < maxBatchSize)
    {
        message = consumer.Consume(TimeSpan.Zero);
        if (message?.Message is null)
            break;

        messageBatch.Add(message);
    }

    return messageBatch;
}
Enter fullscreen mode Exit fullscreen mode

The main body of the consuming logic is related to handling payloads and moving the offset. It is very necessary to commit the offset only when messages have been handled, to achieve an 'at least once' guarantee.

Consumer logic must be idempotent to handle duplicate messages.

I was also writing about why network failure is unavoidable and why consumers must be idempotent here.

https://dev.to/fairday/techniques-for-building-predictable-and-reliable-api-part-1-45bf

while (!stoppingToken.IsCancellationRequested)
{
    try
    {
        var payloads = consumer.ConsumeBatch(TimeSpan.FromMinutes(1), _maxConsumeBatchSize, stoppingToken);

        if (payloads.Count == 0)
            continue;

        foreach (var payload in payloads)
        {
            var message = serializer.Deserialize<MessageEnvelope>(payload.Message.Value);

            foreach (var handler in messageHandlers)
            {
                await handler.HandleAsync(message, stoppingToken);
            }
        }

        consumer.Commit();
    }
    catch (OperationCanceledException)
    {
        break;
    }
    catch (ConsumeException e)
    {
        if (e.Error.IsFatal)
        {
            // https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#fatal-consumer-errors
            logger.LogCritical(e, "Fatal error consuming message");
            break;
        }
        else
        {
            logger.LogError(e, "Error consuming message");
        }
    }
    catch (Exception e)
    {
        logger.LogError(e, "Error consuming message");
    }
}
Enter fullscreen mode Exit fullscreen mode

And do whatever we need in the message handler.

internal sealed class RootMessageHandler : IMessageHandler
{
    private readonly ILogger<RootMessageHandler> _logger;
    private readonly ISerializer _serializer;

    public RootMessageHandler(ILogger<RootMessageHandler> logger, ISerializer serializer)
    {
        _logger = logger;
        _serializer = serializer;
    }

    public Task HandleAsync(MessageEnvelope message, CancellationToken cancellationToken)
    {
        var type = Type.GetType(message.PayloadType);

        if (type == typeof(InvoiceCreatedEvent))
        {
            var invoiceCreatedEvent = _serializer.Deserialize<InvoiceCreatedEvent>(message.Payload);
            _logger.LogInformation("Received invoice created event with ID {InvoiceId}", invoiceCreatedEvent.InvoiceId);
        }

        return Task.CompletedTask;
    }
}
Enter fullscreen mode Exit fullscreen mode

Horizontal Scalability

In order to achieve higher throughput and handle more requests, there is a standard way to scale services horizontally or vertically by increasing CPU and memory resources. Vertical scaling reaches hardware limits faster, where horizontal scalability involves cloning servers to handle the increased load.

To make the Outbox architecture suitable for horizontal scalability, several nodes need to read records exclusively from the Outbox table.

What is Select for update?

"SELECT FOR UPDATE" is a SQL command used within a transaction to lock selected rows against concurrent access by other transactions. Essentially, it's a way to say, "I'm working with this data, and I don't want anyone else to change it until I'm done." This command ensures that any selected data remains consistent and unchanged from the moment it's read to the moment a transaction involving it is completed.

The following statement helps to achieve Select for Update behaviour in MS SQL Server by combining READCOMMITTEDLOCK and READPAST table hints.

IF EXISTS(SELECT 1 FROM Outbox WHERE IsProcessed = 0 AND IsSequential = 0 AND (IsProcessing = 0 OR (IsProcessing = 1 AND SYSDATETIMEOFFSET() >= ExpiredAt)))
BEGIN
WITH forReservation AS (
    SELECT TOP (@MaxLimit) * FROM Outbox
    WITH (READCOMMITTEDLOCK, READPAST)
    WHERE IsProcessed = 0 AND IsSequential = 0 AND (IsProcessing = 0 OR (IsProcessing = 1 AND SYSDATETIMEOFFSET() >= ExpiredAt))
)
UPDATE forReservation
SET IsProcessing = 1, ReservedAt = SYSDATETIMEOFFSET(), ExpiredAt = DATEADD(SECOND, (@ReservationSeconds), SYSDATETIMEOFFSET())
OUTPUT INSERTED.*
END
Enter fullscreen mode Exit fullscreen mode

As you can see, it reserves records exclusively by setting ExpiredAt date.

How to deal with cases when preserving ordering is essential?

Although 'SELECT FOR UPDATE' offers many advantages in terms of scalability, it has a major drawback in situations where ordering must be preserved. This is because many workers will process data in parallel, and it's very likely that the chronological order will be affected.

In such scenarios, distributed locks might be helpful for transferring data in a single-threaded manner by a specific attribute. For instance, 'IsSequential' is a flag indicating that this message must be processed by a specific node, which pulls only records with such a flag.

Example

Let us look at a straightforward example where we need to publish an InvoiceCreatedEvent event after creating a new invoice.

[ApiController]
[Route("[controller]")]
public class InvoicesController : ControllerBase
{
    private readonly IInvoiceService _invoiceService;
    private readonly IUnitOfWork _unitOfWork;

    public InvoicesController(IInvoiceService invoiceService, IUnitOfWork unitOfWork)
    {
        _invoiceService = invoiceService;
        _unitOfWork = unitOfWork;
    }

    [HttpPost]
    public async Task<InvoicePayload> CreateInvoice([FromBody] InvoiceModel model, CancellationToken cancellationToken)
    {
        await using var transaction = await _unitOfWork.BeginSnapshotTransactionAsync(cancellationToken);

        try
        {
            var invoiceId = await _invoiceService.CreateInvoice(model, cancellationToken);
            await transaction.CommitAsync(cancellationToken);
            return new InvoicePayload
            {
                Id = invoiceId
            };
        }
        catch
        {
            await transaction.RollbackAsync(cancellationToken);
            throw;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

In this service, a new message is persisted in Outbox together with creating a new invoice.

internal sealed class InvoiceService : IInvoiceService
{
    private readonly IInvoiceRepository _invoiceRepository;
    private readonly IOutbox _outbox;

    public InvoiceService(IInvoiceRepository invoiceRepository, IOutbox outbox)
    {
        _invoiceRepository = invoiceRepository;
        _outbox = outbox;
    }

    public async Task<Guid> CreateInvoice(InvoiceModel model, CancellationToken cancellationToken)
    {
        var invoice = Invoice.Create(model.Amount, model.DueDate);

        await _invoiceRepository.AddEntityAsync(invoice, cancellationToken);
        await _invoiceRepository.SaveChangesAsync(cancellationToken);

        var operationId = Guid.NewGuid();

        await _outbox.AddAsync(
            data: new InvoiceCreatedEvent
            {
                InvoiceId = invoice.Id
            }, 
            topic: InvoiceCreatedEvent.Topic,
            partitionBy: i => i.InvoiceId.ToString(),
            isSequential: false,
            metadata: new Dictionary<string, string>
            {
                { "OperationId", operationId.ToString() }
            }, 
            cancellationToken);

        return invoice.Id;
    }
}
Enter fullscreen mode Exit fullscreen mode

If we open the database, we will see a new message added to Outbox after committing a transaction.

Image description

Here we see the same consumed message sent by Relay to Kafka

Image description

Also, I added debug logs to the visibility of messaging transferring

Image description

What is missed?

Although this solution is almost ready for production, there are many enhancements that can be added, such as:

  • Splitting between internal and public data contracts
  • Integrating open telemetry with Jaeger and Prometheus
  • Schema Registry integration to guard against breaking changes in data contracts
  • Dead Letter Queue
  • Batch data transferring
  • Housekeeping to store all messages for future investigation and auditing purposes
  • Admin Dashboard for managing the outbox state
  • Performance tests

And many other potential improvements.

Conclusion

In this article, I demonstrated how to implement scalable, production-ready, reliable messaging with a transactional outbox approach and Kafka.

See you next time!

Top comments (0)