DEV Community

Cover image for Using the Transactional Outbox Pattern with Azure Cosmos DB for Guaranteed Delivery of Domain Events
Christian Dennig
Christian Dennig

Posted on • Originally published at partlycloudy.blog on

Using the Transactional Outbox Pattern with Azure Cosmos DB for Guaranteed Delivery of Domain Events

This article is about how to build a resilient architecture for distributed applications leveraging basic Domain-Driven Design and the Transactional Outbox Pattern with Azure Cosmos DB and Azure Service Bus.

(STOP! This article has been migrated to (and will be kept up-to-date in) the Azure Architecture Center: https://docs.microsoft.com/en-us/azure/architecture/best-practices/transactional-outbox-cosmos)

Microservice architectures become increasingly popular these days and promise to solve problems like scalability, maintainability, agility etc. by splitting your once monolithic application into smaller (micro-) services that can operate and scale on their own. While you want these services to be independent from other services of your application (no tight coupling!), this also means managing data necessary to operate independently within a dedicated datastore per service. In such a distributed, microservice-oriented application, you will end up running multiple datastores where data is often replicated between different services of your application.

How does such a service get hold of all the data it needs to run properly? Typically, you use a messaging solution like RabbitMQ, Kafka or Azure Service Bus that distributes events from your microservices via a messaging bus after a business object has been created, modified, deleted etc. By following such an approach, you avoid calling and therefor tightly coupling your services together.


Let’s have a look at the “Hello, World!” example in that space: in an Ordering service, when a user wants to create a new order, the service would receive the payload from a client application via a REST endpoint, it would map the data to the internal representation of an Order object (validate the data) and after a successful commit to the database, it would publish an OrderCreated event to a message bus. Any other service interested in newly created orders (e.g. an Inventory or Invoicing service), would subscribe to these OrderCreated messages and process them accordingly. The following pseudo code shows, how this would typically look like in the Ordering service:


CreateNewOrder(CreateOrderDto order){
  // validate the incoming data
  ...
  // do some other stuff
  ...
  // Save the object to the database
  var result = _orderRespository.Create(order);
  // finally, publish the respective event
  _messagingService.Publish(new OrderCreatedEvent(result));
  return Ok();
}

Enter fullscreen mode Exit fullscreen mode

That all sounds wonderful until something unexpected happens between the highlighted lines of the implementation. Imagine you successfully saved the Order object to the database, are in the middle of publishing the event to the message bus and your service crashes, the message bus is not available (due to whatever reason) or the network between your service and the messaging system has a problem?!

In a nutshell: you cannot publish the OrderCreated event to “the outside world” although the actual object has already been saved. Now what? You end up having a severe problem and start saving the events to a secondary service like an Azure Storage Account to process them later or – even worse – writing code that keeps track of all the events that haven’t already been published in memory. Worst case, you end up having data inconsistencies in your other services, because events are lost. Debugging these kinds of problems is pure nightmare and you want to avoid these situations whatever it takes.

Solution

So, what is the solution for the problem described above? There is a well-known pattern called Transactional Outbox Pattern that will help us here. The pattern ensures that events will be saved in a datastore (typically in an Outbox table of your database) before ultimately pushing them to the message broker. If the business object and the corresponding events are saved within the same database transaction , it is also guaranteed that no data will be lost – either everything will be committed or rolled back. To eventually publish the event, a different service or worker process can query the Outbox table for new entries, publish the events and mark them as “processed”. Using this pattern, you ensure that events will never be lost after creating or modifying a business object.

In a traditional database, the implementation in your service is fairly easy. If you e.g. use Entity Framework Core, you will use your EF context to create a database transaction, save the business object and the event and commit the transaction – or do a rollback in case of an error. Even the worker service that is processing the events data is straightforward: periodically query the Outbox table for new entries, publish the event payload to the message bus and mark the entry as “processed”.

Of course, in real-life things are not as easy as they might look like in the first place. Most importantly, you need to make sure that the order of the events, as they happened in the application, is preserved so that an OrderUpdated event doesn’t get published before an OrderCreated event.

How to implement the pattern with Azure Cosmos DB?

So far, we discussed the Transactional Outbox Pattern in theory and how it can help to implement reliable messaging in distributed applications. But how can we use the pattern in combination with Azure Cosmos DB and leverage some of the unique features like Cosmos DB Change Feed to make our developer life easier? Let’s have a detailed look at it by implementing a sample service that is responsible for managing Contact objects (Firstname, Lastname, Email, Company Information etc.). The sample service uses the CQRS pattern and follows basic Domain-Driven Design concepts.

Azure Cosmos DB

Cosmos DB Logo

First, Azure Cosmos DB – in case you haven’t heard of the service so far – is a globally distributed NoSQL database service that lets you use different APIs like Gremlin, MongoDB and the Core SQL API (which we will focus on in this article) to manage your application data – with 99.999% SLAs on availability. Cosmos DB can automatically horizontally scale your database, promising unlimited storage and throughput. The service keeps its promise by partitioning your data based on a user-defined key provided at container (the place where your documents will be stored) creation, the so-called “partition key”. That means your data is divided into logical subsets within a container based on that partition key – you may have also heard of the term “sharding” in case you are familiar with MongoDB. Under the hood, Cosmos DB will then distribute these logical subsets of your data across (multiple) physical partitions.

Cosmos DB Transactional Batches

Due to the mechanism described above, transactions in Cosmos DB work slightly different than you might be familiar with in relational database systems. Cosmos DB transactions – called “transactional batches” – operate on a single logical partition and therefor guarantee ACID properties. That means, you can’t save two documents in a transactional batch operation in different containers or even logical partitions (read: with different “partition keys”). Due to that mechanism, the implementation of the Transactional Outbox Pattern with Cosmos DB is slightly different compared to the approach in a relational database where you would simply have two tables: one for the business object and one Outbox table for the events. In Cosmos DB, we need to save the business object and the events data in the same logical partition (in the same container). The following chapters describe, how this can be achieved.

Custom Container Context, Repositories and UnitOfWork

The most important part of the implementation is a “custom container context” that is responsible of keeping track of objects that need to be saved in the same transactional batch. Think of a very lightweight “Entity Framework Context” maintaining a list of created/modified objects that also knows in which Cosmos DB container (and logical partition) these objects need to be saved. Here’s the interface for it:


public interface IContainerContext
{
    public Container Container { get; }
    public List<IDataObject<Entity>> DataObjects { get; }
    public void Add(IDataObject<Entity> entity);
    public Task<List<IDataObject<Entity>>> 
      SaveChangesAsync(CancellationToken cancellationToken = default);
    public void Reset();
}

Enter fullscreen mode Exit fullscreen mode

The list within the “container context” component will hold Contact as well as DomainEvent objects and both will be put in the same container – yes, we are mixing multiple types of objects in the same Cosmos DB container and use a Type property to distinct between an “entity” and an “event”.

For each type there exists a dedicated repository that defines/implements the data access. The Contact repository interface offers the following methods:


public interface IContactsRepository
{
    public void Create(Contact contact);
    public Task<(Contact, string)> ReadAsync(Guid id, string etag);
    public Task DeleteAsync(Guid id, string etag);
    public Task<(List<(Contact, string)>, bool, string)> 
      ReadAllAsync(int pageSize, string continuationToken);
    public void Update(Contact contact, string etag);
}

Enter fullscreen mode Exit fullscreen mode

The Event repository looks similar, except that there is only one method to create new events in the store:


public interface IEventRepository
{
    public void Create(ContactDomainEvent e);
}

Enter fullscreen mode Exit fullscreen mode

The implementations of both repository interfaces get a reference via dependency injection to an IContainerContext instance to make sure that both operate on the same context.

The final component is a UnitOfWork that is responsible for committing the changes held in the IContainerContext instance to the database:


public class UnitOfWork : IUnitOfWork
{
    private readonly IContainerContext _context;
    public IContactRepository ContactsRepo { get; }

    public UnitOfWork(IContainerContext ctx, IContactRepository cRepo)
    {
        _context = ctx;
        ContactsRepo = cRepo;
    }

    public Task<List<IDataObject<Entity>>> 
        CommitAsync(CancellationToken cancellationToken = default)
    {
        return _context.SaveChangesAsync(cancellationToken);
    }
}

Enter fullscreen mode Exit fullscreen mode


csharp

We now have the components in place for data access. Let’s see, how events are created and published.

Event Handling – Creation and Publication

Every time a Contact object is created, modified or (soft-) deleted, we want the system to raise a corresponding event, so that we can notify other services interested in these changes immediately. Core of the solution provided in this article is a combination of Domain-Driven Design and making use of the mediator pattern as proposed by Jimmy Bogard [1]. He suggests maintaining a list of domain events that happened due to modifications of the domain object and “publish” these events ultimately before saving the actual object to the database. The list of changes is kept in the domain object itself, so that no other component can modify the chain of events. The behavior of maintaining such events (IEvent instances) in a domain object is defined via an interface IEventEmitter and implemented in an abstract DomainEntity class:


public abstract class DomainEntity : Entity, IEventEmitter<IEvent>
{
...
...
    private readonly List<IEvent> _events = new();

    [JsonIgnore] public IReadOnlyList<IEvent> DomainEvents => 
        _events.AsReadOnly();

    public virtual void AddEvent(IEvent domainEvent)
    {
        var i = _events.FindIndex(0, e => e.Action == domainEvent.Action);
        if (i < 0)
        {
            _events.Add(domainEvent);
        }
        else
        {
            _events.RemoveAt(i);
            _events.Insert(i, domainEvent);
        }
    }
...
...
}

Enter fullscreen mode Exit fullscreen mode

When it comes to raising/adding domain events, it’s the responsibility of the Contact object. As mentioned before, the Contact entity follows basic DDD concepts. In this case, it means that you can’t modify any domain properties “from outside”: you don’t have any public setters in the class. Instead, it offers dedicated methods to manipulate the state and is therefore able to raise the appropriate events for a certain modification (e.g. “NameUpdated”, “EmailUpdated” etc.).

Here’s an example when updating the name of a contact – the event is raised at the highlighted row:


public void SetName(string firstName, string lastName)
{
    if (string.IsNullOrWhiteSpace(firstName) || 
        string.IsNullOrWhiteSpace(lastName))
    {
        throw 
            new ArgumentException("FirstName or LastName may not be empty");
    }

    Name = new Name(firstName, lastName);

    if (IsNew) return;

    AddEvent(new ContactNameUpdatedEvent(Id, Name));
    ModifiedAt = DateTimeOffset.Now;
}

Enter fullscreen mode Exit fullscreen mode

The corresponding ContactNameUpdatedEvent that simply keeps track of the individual changes to the domain object looks as follows:


public class ContactNameUpdatedEvent : ContactDomainEvent
{
    public Name Name { get; }

    public ContactNameUpdatedEvent(Guid contactId, Name contactName) : 
        base(Guid.NewGuid(), contactId, nameof(ContactNameUpdatedEvent))
    {
        Name = contactName;
    }
}

Enter fullscreen mode Exit fullscreen mode

So far, we only “log” the events to the domain object and nothing gets published or even saved to the database. How can we persist the events in combination with the business object? Well, it’s done right before saving the domain object to Cosmos DB.

Within the SaveChangesAsync method of the IContainerContext implementation, we simply loop over all objects tracked in the container context and publish the events maintained in these objects. It’s all done in a private RaiseDomainEvents method (dObjs is the list of tracked entities of the container context):


private void RaiseDomainEvents(List<IDataObject<Entity>> dObjs)
{
    var eventEmitters = new List<IEventEmitter<IEvent>>();

    // Get all EventEmitters
    foreach (var o in dObjs)
        if (o.Data is IEventEmitter<IEvent> ee)
            eventEmitters.Add(ee);

    // Raise Events
    if (eventEmitters.Count <= 0) return;
    foreach (var evt in eventEmitters
        .SelectMany(eventEmitter => eventEmitter.DomainEvents))
        _mediator.Publish(evt);
}

Enter fullscreen mode Exit fullscreen mode

On line 14 (highlighted), we use the MediatR library to publish an event within our application – this is possible, because all events – like ContactNameUpdatedEvent – also implement the INotification interface of the MediatR package.

Of course, we also need to handle these events – and this is where the IEventsRepository implementation comes into play. Let’s have a look at such an event handler:


public class ContactNameUpdatedHandler :
    INotificationHandler<ContactNameUpdatedEvent>
{
    private IEventRepository EventRepository { get; }

    public ContactNameUpdatedHandler(IEventRepository eventRepo)
    {
        EventRepository = eventRepo;
    }

    public Task Handle(ContactNameUpdatedEvent notification, 
        CancellationToken cancellationToken)
    {
        EventRepository.Create(notification);
        return Task.CompletedTask;
    }
}

Enter fullscreen mode Exit fullscreen mode

You can see, that an IEventRepository instance gets injected in the handler class via the constructor. So, as soon as an ContactNameUpdatedEvent gets published in the application, the Handle method gets invoked and uses the events repository instance to create a notification object – that ultimately lands in the list of tracked objects in the IContainerContext object and therefore will become part of the objects that will be saved in the same transactional batch to Cosmos DB.

Here are the important parts of the implementation of IContainerContext:


private async Task<List<IDataObject<Entity>>> 
    SaveInTransactionalBatchAsync(List<IDataObject<Entity>> dObjs,
        CancellationToken cancellationToken)
{
    if (dObjs.Count > 0)
    {
        var pk = new PartitionKey(dObjs[0].PartitionKey);
        var tb = Container.CreateTransactionalBatch(pk);
        dObjs.ForEach(o =>
        {
            TransactionalBatchItemRequestOptions tro = null;

            if (!string.IsNullOrWhiteSpace(o.Etag))
                tro = new TransactionalBatchItemRequestOptions 
                { 
                    IfMatchEtag = o.Etag 
                };

            switch (o.State)
            {
                case EntityState.Created:
                    tb.CreateItem(o);
                    break;
                case EntityState.Updated or EntityState.Deleted:
                    tb.ReplaceItem(o.Id, o, tro);
                    break;
            }
        });

        var tbResult = await tb.ExecuteAsync(cancellationToken);
...
[check for return codes etc.]
...
    }

    var result = new List<IDataObject<Entity>>(dObjs);
    // reset internal list
    DataObjects.Clear();
    return result;
}

Enter fullscreen mode Exit fullscreen mode

Let Cosmos DB Shine

We now have everything in place: any time a domain object gets created or modified, it adds a corresponding domain event, which will be put in the same container context (via a separate repository) for object tracking and finally saved – both the domain object and the events – in the same transactional batch to Cosmos DB.

This is how the process works so far (e.g. for updating the name on a contact object):

  1. SetName is invoked on the contact object
  2. Event ContactNameUpdated is added to the list of events in the domain object
  3. ContactRepository Update method is invoked which adds the domain object to the container context. Object is now “tracked”.
  4. CommitAsync is invoked on the UnitOfWork object which in turn calls SaveChangesAsync on the container context
  5. Within SaveChangesAsync, all events in the list of the domain object get published by a MediatR instance and are added via the EventsRepository to the same container context
  6. After that, in SaveChangesAsync, a TransactionalBatch is created which will hold both the contact object and the event
  7. The TransactionalBatch is executed and the data is committed to Cosmos DB
  8. SaveChangesAsync and CommitAsync successfully return
  9. End of the update process

Let’s have a closer look now at how each type gets persisted to a container. In the code snippets above, you saw that objects saved to Cosmos DB will be wrapped in a DataObject instance. Such an object provides common properties like Id, PartitionKey, Type, State (like e.g. Created, Updated etc. – won’t be persisted in Cosmos DB), Etag (for optimistic locking [2]), TTL (Time-To-Live property for automatic cleanup of old documents [3]) and – of course – the Data itself. All of this is defined in a generic interface called IDataObject and used by the repositories and the container context:


public interface IDataObject<out T> where T : Entity
{
    string Id { get; }
    string PartitionKey { get; }
    string Type { get; }
    T Data { get; }
    string Etag { get; set; }
    int Ttl { get; }
    EntityState State { get; set; }
}

Enter fullscreen mode Exit fullscreen mode

Objects wrapped in a DataObject instance and saved to the database will then look like this (Contact and ContactNameUpdatedEvent):


// Contact document/object - after creation

{
    "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "contact",
    "data": {
        "name": {
            "firstName": "Bertram",
            "lastName": "Gilfoyle"
        },
        "description": "This is a contact",
        "email": "bg@piedpiper.com",
        "company": {
            "companyName": "Pied Piper",
            "street": "Street",
            "houseNumber": "1a",
            "postalCode": "092821",
            "city": "Palo Alto",
            "country": "US"
        },
        "createdAt": "2021-09-22T11:07:37.3022907+02:00",
        "deleted": false,
        "id": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2"
    },
    "ttl": -1,
    "_etag": "\"180014cc-0000-1500-0000-614455330000\"",
    "_ts": 1631868211
}

// after setting a new name - this is how an event document looks like

{
    "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
    "partitionKey": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
    "type": "domainEvent",
    "data": {
        "name": {
            "firstName": "Dinesh",
            "lastName": "Chugtai"
        },
        "contactId": "b5e2e7aa-4982-4735-9422-c39a7c4af5c2",
        "action": "ContactNameUpdatedEvent",
        "id": "d6a5f4b2-84c3-4ac7-ae22-6f4025ba9ca0",
        "createdAt": "2021-09-17T10:50:01.1692448+02:00"
    },
    "ttl": 120,
    "_etag": "\"18005bce-0000-1500-0000-614456b80000\"",
    "_ts": 1631868600
}

Enter fullscreen mode Exit fullscreen mode

You see that the Contact and ContactNameUpdatedEvent (type: domainEvent) documents have the same partition key – hence both documents will be persisted in the same logical partition. And, if you want to keep a log of changes of a contact object, you can also keep all the events in the container for good. Frankly, this is not what we want to have here in this sample. The purpose of persisting both object types is to make sure that a) events never get lost and b) they eventually get published to “the outside world” (e.g. to an Azure Service Bus topic).


So, to finally read the stream of events and send them to a message broker, let’s use one of the “unsung hero features” [4] of Cosmos DB: Change Feed.

The Change Feed is a persistent log of changes in your container that is operating in the background keeping track of modifications in the order they occurred – per logical partition. So, when you read the Change Feed, it is guaranteed that – for a certain partition key – you read the changes of those documents always in the correct order. This is mandatory for our scenario. Frankly, this is the reason why we put both the contact and corresponding event documents in the same partition: when we read the Change Feed, we can be sure that we never get an “updated” before a “created” event.

So, how do we read the Change Feed then? You have multiple options. The most convenient way is to use an Azure Function with a Cosmos DB trigger. Here, you have everything in place and if you want to host that part of the application on the Azure Functions service, you are good to go. Another option is to use the Change Feed Processor library [5]. It lets you integrate Change Feed processing in your Web API e.g. as a background service (via IHostedService interface). In this sample, we simply create a console application that uses the abstract class BackgroundService for implementing long running background tasks in .NET Core.

Now, to receive the changes from the Cosmos DB Change Feed, you need to instantiate a ChangeFeedProcessor object, register a handler method for message processing and start listening for changes:


private async Task<ChangeFeedProcessor> 
    StartChangeFeedProcessorAsync()
{
    var changeFeedProcessor = _container
        .GetChangeFeedProcessorBuilder<ExpandoObject>(
            _configuration.GetSection("Cosmos")["ProcessorName"],
            HandleChangesAsync)
        .WithInstanceName(Environment.MachineName)
        .WithLeaseContainer(_leaseContainer)
        .WithMaxItems(25)
        .WithStartTime(new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc))
        .WithPollInterval(TimeSpan.FromSeconds(3))
        .Build();

    _logger.LogInformation("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    _logger.LogInformation("Change Feed Processor started.  
        Waiting for new messages to arrive.");
    return changeFeedProcessor;
}

Enter fullscreen mode Exit fullscreen mode

The handler method is then responsible for processing the messages. In this sample, we keep things simple and publish the events to an Azure Service Bus topic which is partitioned for scalability and has the “deduplication” feature enabled [6] – more on that in a second. From there on, any service interested in changes to Contact objects can subscribe to that topic and receive and process those changes for its own context.

You’ll also see that the Service Bus messages have a SessionId property. By using sessions in Azure Service Bus, we guarantee that the order of the messages is preserved (FIFO) [7] – which is necessary for our use case. Here is the snippet that handles messages from the Change Feed:


private async Task HandleChangesAsync(IReadOnlyCollection<ExpandoObject> changes, CancellationToken cancellationToken)
{
    _logger.LogInformation($"Received {changes.Count} document(s).");
    var eventsCount = 0;

    Dictionary<string, List<ServiceBusMessage>> partitionedMessages = new();

    foreach (var document in changes as dynamic)
    {
        if (!((IDictionary<string, object>)document).ContainsKey("type") ||
            !((IDictionary<string, object>)document).ContainsKey("data")) continue; // unknown doc type

        if (document.type == EVENT_TYPE) // domainEvent
        {
            string json = JsonConvert.SerializeObject(document.data);
            var sbMessage = new ServiceBusMessage(json)
            {
                ContentType = "application/json",
                Subject = document.data.action,
                MessageId = document.id,
                PartitionKey = document.partitionKey,
                SessionId = document.partitionKey
            };

            // Create message batch per partitionKey
            if (partitionedMessages.ContainsKey(document.partitionKey))
            {
                partitionedMessages[sbMessage.PartitionKey].Add(sbMessage);
            }
            else
            {
                partitionedMessages[sbMessage.PartitionKey] = new List<ServiceBusMessage> { sbMessage };
            }

            eventsCount++;
        }
    }

    if (partitionedMessages.Count > 0)
    {
        _logger.LogInformation($"Processing {eventsCount} event(s) in {partitionedMessages.Count} partition(s).");

        // Loop over each partition
        foreach (var partition in partitionedMessages)
        {
            // Create batch for partition
            using var messageBatch =
                await _topicSender.CreateMessageBatchAsync(cancellationToken);
            foreach (var msg in partition.Value)
                if (!messageBatch.TryAddMessage(msg))
                    throw new Exception();

            _logger.LogInformation(
                $"Sending {messageBatch.Count} event(s) to Service Bus. PartitionId: {partition.Key}");

            try
            {
                await _topicSender.SendMessagesAsync(messageBatch, cancellationToken);
            }
            catch (Exception e)
            {
                _logger.LogError(e.Message);
                throw;
            }
        }
    }
    else
    {
        _logger.LogInformation("No event documents in change feed batch. Waiting for new messages to arrive.");
    }
}

Enter fullscreen mode Exit fullscreen mode

What happens when errors occur?

In case of an error while processing the changes, the Change Feed library will restart reading messages at the position where it successfully processed the last batch. That means, if we already have processed 10.000 messages, are now in the middle of working on messages 10.001 to 10.0025 (we read documents in batches of 25 – see the initialization of the ChangeFeedProcessor object) and an error happens, we can simply restart our process and it will pick up its work at position 10.001. The library keeps track of what has already been processed via a Leases container.

Of course, it can happen that – out of the 25 messages that are then reprocessed – our application already sent 10 to Azure Service Bus. This is where the deduplication feature will save our life. Azure Service Bus will check if a message has already been added to a topic during a specified time window based on the application-controlled MessageId property of a message. That property is set to the Id of the event document, meaning Azure Service Bus will ignore and drop a message, if a certain event has already been successfully added to our topic.

In a typical “Transactional Outbox Pattern” implementation, we would now update the processed events and set a Processed property to true, indicating that we successfully published it – and every now and then, the events would then be deleted to keep only the most recent records/documents. Of course, this could be addressed manually in the handler method.

But we are using Cosmos DB! We are already keeping track of events that were processed by using the Change Feed (in combination with the Leases container). And to periodically clean-up the events, we can leverage another useful feature of Cosmos DB: Time-To-Live (TTL) on documents [3]. Cosmos DB provides the ability to automatically delete documents based on a TTL property that can be added to a document – a timespan in seconds. The service will constantly check your container for documents with such a TTL property and as soon as it has expired, Cosmos DB will remove it from the container (btw, using the remaining Request Units in a background task – it will never “eat up” RUs that you need to handle user requests).


So, when all the services work as expected, events will be processed and published fast – meaning within seconds. If we have an error in Cosmos DB, we do not even publish any events, because nothing (both business object as well as corresponding events) can’t be saved at all – users will receive an error.

The only thing we need to consider and therefore set an appropriate TTL value on our DomainEvent documents, is when the background worker application (Change Feed Processor) or the Azure Service Bus aren’t available.

How much time do we grant both components to eventually publish the events? Let’s have a look at the SLA for Azure Service Bus: Microsoft guarantees 99.9% uptime. That means a downtime of ~9h per year or ~44mins per month. Of course, we should give our service more time than a duration of a “possible outage” to process changes. Furthermore, we should also consider that our Change Feed Processor worker service can also be down. To be on the safe side, I’d pick 10 days (TTL expects seconds, so a value of 864.000) for a production environment. All components involved will then have more than one week to process / publish changes within our application. That should be enough even in the case of a “disaster” like an Azure region outage. And after 10 days, the container holding the events will be cleaned-up automatically by Cosmos DB.

Sidenote: As mentioned before, that’s not necessary. If you want to keep a log of all the changes for a Contact object, you can also set the TTL to -1 on event documents and Cosmos DB won’t purge them at all.

Advantages

As a result, we now have a service that can handle Contact business objects and every time such a contact gets added or modified, it will raise a corresponding event which will be published within the application right before saving the actual object. These events will be picked up by an event handler in our service and added to the same “database context” which will save both the business object and events in the same transaction to Cosmos DB. Hence, we can guarantee that no event will ever be lost. Furthermore, we leverage the Cosmos DB Change Feed then to publish the tracked events in the background to an Azure Service Bus Topic via a background worker service that makes use of the ChangeFeedProcessor library.

So, in the end this is not a “traditional” implementation of the “Transactional Outbox Pattern”, because we leverage some features of Cosmos DB that makes things easier for a developer.

What are the advantages of this solution then? We have…

  • guaranteed delivery of events
  • guaranteed ordering of events and deduplication via Azure Service Bus
  • no need to maintain an extra Processed property that indicates a successful processing of an event document – the solution makes use of Time-To-Live and Change Feed features in Cosmos DB
  • error proof processing of messages via ChangeFeedProcessor (or Azure Function)
  • optional: you can add multiple Change Feed processors – each one maintaining its own “pointer” enabling additional scenarios
  • if you use a multi-master/multi-region Cosmos DB account, you’ll even get “five nines” (99.999%) availability – which is outstanding!

Summary

This sample demonstrated how to implement the Transactional Outbox Pattern with Azure Cosmos DB. We made use of some of the hero features of Cosmos DB to simplify the implementation of the pattern.

You can find the source code for the sample application including the change feed processor on the Microsoft Patterns & Practices repo. There, you’ll also find a bicep deployment script to create all the necessary Azure resources for you automatically.

Keep in mind that this is not a “production ready” implementation, but it hopefully can act as an inspiration for your next adventure.

Happy hacking! 😊

Implement the Transactional Outbox Pattern with @AzureCosmosDB and @Azure #ServiceBus #distributed #pattern #microservices

Tweet

References

Additional Links

Source Code

The sample source code for this article can be found on GitHub: https://github.com/mspnp/transactional-outbox-pattern.


Top comments (1)

Collapse
 
rmaziarka profile image
Radek Maziarka

Hello Christian, thanks for the great example! Very instructive :)

I went through the repo and found interesting thing in querying all contacts:
github.com/mspnp/transactional-out...
Am I right that you are reading all the contacts throughout the partitions? I am afraid that it could be very costly, when it comes to the RU spent. What would be a better solution here?