<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Jimmy Bogard</title>
    <description>The latest articles on DEV Community by Jimmy Bogard (@jbogard).</description>
    <link>https://dev.to/jbogard</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F67050%2F6d22aedb-a7f3-49e6-8738-641dd9e6f70f.jpeg</url>
      <title>DEV Community: Jimmy Bogard</title>
      <link>https://dev.to/jbogard</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/jbogard"/>
    <language>en</language>
    <item>
      <title>Life Beyond Distributed Transactions: An Apostate's Implementation - Relational Resources</title>
      <dc:creator>Jimmy Bogard</dc:creator>
      <pubDate>Tue, 05 Feb 2019 20:46:04 +0000</pubDate>
      <link>https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---relational-resources-522p</link>
      <guid>https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---relational-resources-522p</guid>
      <description>&lt;p&gt;Posts in this series:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---a-primer-2po6-temp-slug-8261948"&gt;A Primer&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---document-coordination-leb-temp-slug-1009285"&gt;Document Coordination&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---document-example-4fpk-temp-slug-5917155"&gt;Document Example&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---dispatching-example-31i6-temp-slug-684211"&gt;Dispatching Example&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---failures-and-retries-m6-temp-slug-2730961"&gt;Failures and Retries&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---dispatcher-failure-recovery-3m38-temp-slug-5793122"&gt;Failure Recovery&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---sagas-2dj4-temp-slug-1869324"&gt;Sagas&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-relational-resources/" rel="noopener noreferrer"&gt;Relational Resources&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://github.com/jbogard/adventureworkscosmos" rel="noopener noreferrer"&gt;Sample code from this series&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;So far in this series we've mainly concerned ourselves with a single resource that can't support distributed (or multi-entity) transactions. While that is becoming less common as NoSQL options, as Azure CosmosDB supports them, and with the 4.0 release, MongoDB now supports multi-document transactions.&lt;/p&gt;

&lt;p&gt;What does this mean for us then?&lt;/p&gt;

&lt;p&gt;Firstly, there are many cases that even though transactions between disparate entities is possible, it may not be desired. We may have to make design or performance compromises to make it work, as the technology to perform multi-entity transactions will always add some overhead. Even in SQL-land, we still often need to worry about consistency, locks, concurrency, etc.&lt;/p&gt;

&lt;p&gt;The scenario that is most often overlooked I've found is bridging disparate resources, not multiple entities in the same resource (such as with many NoSQL databases). I've blogged about &lt;a href="https://jimmybogard.com/refactoring-towards-resilience-a-primer/" rel="noopener noreferrer"&gt;resilience patterns&lt;/a&gt;, especially crossing incorporating calling APIs to external sources. Most typically though, I see people trying to write to a database and send a message to a queue:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F1%2F2019%2FPicture0058.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F1%2F2019%2FPicture0058.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The code is rather innocuous - we're trying to do some stuff in a database, which IS transactional across multiple writes, but at the same time, try to write to a queue/broker/exchange like RabbitMQ/Azure Service Bus.&lt;/p&gt;

&lt;p&gt;While it may be easy/obvious to spot places where we're trying to incorporate external services into our business transactions, like APIs, it's not so obvious when it's infrastructure we &lt;em&gt;do&lt;/em&gt; own and &lt;em&gt;is&lt;/em&gt; transactional.&lt;/p&gt;

&lt;p&gt;Ultimately, our solution will be the same, which is to apply the outbox pattern.&lt;/p&gt;

&lt;h3&gt;
  
  
  Outbox pattern generalized
&lt;/h3&gt;

&lt;p&gt;With the Cosmos DB approach, we placed the outbox inside each individual document. This is because our transactional boundary is a single item in a single collection. In order to provide atomicity for "publishing" messages, we need to design our outbox to the same transactional scope as our business data. If the transactional scope is a single record, the outbox goes in that record. If the scope is the database, the outbox goes in the database.&lt;/p&gt;

&lt;p&gt;With a SQL database, our transaction scope widens considerably - to the entire database! This also widens the possibilities of how we can model our "outbox". We don't have to store an outbox per record in SQL Server - we can instead create a single outbox for the entire database.&lt;/p&gt;

&lt;p&gt;What should be in this outbox table? Something similar to our original outbox in the Cosmos DB example:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Message ID (which message is this)&lt;/li&gt;
&lt;li&gt;Type (what kind of message is this)&lt;/li&gt;
&lt;li&gt;Body (what's in the message)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;We also need to keep track of what's been dispatched or not. In the CosmosDB example, we keep sent/received messages inside each record.&lt;/p&gt;

&lt;p&gt;With SQL though, having a single table that grows without bound is...problematic, so we need some more information about whether or not a message has been dispatched, as well as a dispatch date to be able to clean up after ourselves.&lt;/p&gt;

&lt;p&gt;We can combine both into a single value however - "DispatchedAt", as a timestamp:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F1%2F2019%2FPicture0059.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F1%2F2019%2FPicture0059.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Processing the outbox will be a bit different than our original Cosmos DB example, since we now have a single table to deal with.&lt;/p&gt;

&lt;h3&gt;
  
  
  Sending via our SQL outbox
&lt;/h3&gt;

&lt;p&gt;In the Cosmos DB example, we used domain events to communicate. We can do similar in our SQL example, but we'll more or less need to draw some boundaries. Are our domain events just to coordinate activities in our domain model, or are they there to interact with the outside world as service-level integration events? Previously, we only used them for domain events, and used special domain event handlers to "republish" as service events.&lt;/p&gt;

&lt;p&gt;There are lots of tradeoffs between each approach, but in general, it's best not to combine the idea of domain events and service events.&lt;/p&gt;

&lt;p&gt;To keep things flexible, we can simply model our messages as plainly as possible, to the point of just including them on the &lt;code&gt;DbContext&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public Task SaveMessageAsync&amp;lt;T&amp;gt;(T message)  
{
    var outboxMessage = new OutboxMessage
    {
        Id = Guid.NewGuid(),
        Type = typeof(T).FullName,
        Body = JsonConvert.SerializeObject(message)
    };
    return Outbox.AddAsync(outboxMessage);
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When we're doing something that needs any kind of messaging, instead of sending directly to a broker, we save a new message as part of a transaction:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;using (var context = new AdventureWorks2016Context())  
{
    using (var transaction = context.Database.BeginTransaction())
    {
        var product = await context.Products.FindAsync(productId);

        product.Price = newPrice;
        await context.SaveMessageAsync(new ProductPriceChanged 
        {
            Id = productId, 
            Price = newPrice
        });
        await context.SaveChangesAsync();

        transaction.Commit();
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Each request that needs to send a message does so by only writing to the business data and outbox in a single transaction:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F1%2F2019%2FPicture0060.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F1%2F2019%2FPicture0060.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Then, as with our Cosmos DB example, a dispatcher reads from the outbox and sends these messages along to our broker:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F1%2F2019%2FPicture0061.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F1%2F2019%2FPicture0061.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;And also like our Cosmos DB example, our dispatcher can run right after the transaction completes, as well as a background process to "clean up" after any missed messages. After each dispatch, we'd set the date of dispatch to ensure we skip already dispatched messages:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;using (var context = new AdventureWorks2016Context())  
{
    using (var transaction = context.Database.BeginTransaction())
    {
        var message = await context.Outbox
             .Where(m =&amp;gt; m.DispatchedAt == null)
             .FirstOrDefaultAsync();

        bus.Send(message);
        message.DispatchedAt = DateTime.Now;
        await context.SaveChangesAsync();

        transaction.Commit();
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;With this in place, we can safely "send" messages in a transaction. However, we still have to deal with receiving these messages twice!&lt;/p&gt;

&lt;h3&gt;
  
  
  De-duplicating messages
&lt;/h3&gt;

&lt;p&gt;In order to make sure we achieve at-least-once delivery, but exactly-once processing, we'll have to keep track of messages we've processed. To do so, we can just add a little extra information to our outbox - not just when we've &lt;em&gt;sent&lt;/em&gt; messages, but when we've &lt;em&gt;processed&lt;/em&gt; messages:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F1%2F2019%2FPicture0062.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F1%2F2019%2FPicture0062.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Similar to our inbox, we'll include the processed date with each message. As we process a message, we'll double-check our outbox to see if it's already processed before. If not, we can perform the work. If so, we just skip the message - nothing to be done!&lt;/p&gt;

&lt;p&gt;With these measures in place, the last piece is to decide how long we should keep track of messages in our outbox. What's the longest amount of time a message can be marked as processed that we might receive the message again? An hour? A day? A week? Probably not a year, but something that makes sense. I'd start large, say a week, and move it lower as we understand the characteristics of our system.&lt;/p&gt;

&lt;p&gt;With the outbox pattern, we can still coordinate activities between transactional resources by keeping track of what we need to communicate, when we've communicated, and when we've processed. It's like a little to-do list that our system uses to check things off as it goes, never losing track of what it needs to do.&lt;/p&gt;

&lt;p&gt;In our last post, I'll wrap up and cover some scenarios where we should avoid such a level of coordination.&lt;/p&gt;

</description>
      <category>microservices</category>
      <category>architecture</category>
      <category>distributedsystems</category>
    </item>
    <item>
      <title>Life Beyond Distributed Transactions: An Apostate's Implementation - Sagas</title>
      <dc:creator>Jimmy Bogard</dc:creator>
      <pubDate>Wed, 12 Sep 2018 15:25:04 +0000</pubDate>
      <link>https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---sagas-lom</link>
      <guid>https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---sagas-lom</guid>
      <description>&lt;p&gt;Posts in this series:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---a-primer-2po6-temp-slug-8261948"&gt;A Primer&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---document-coordination-leb-temp-slug-1009285"&gt;Document Coordination&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---document-example-4fpk-temp-slug-5917155"&gt;Document Example&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---dispatching-example-31i6-temp-slug-684211"&gt;Dispatching Example&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---failures-and-retries-m6-temp-slug-2730961"&gt;Failures and Retries&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---dispatcher-failure-recovery-3m38-temp-slug-5793122"&gt;Failure Recovery&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-sagas/" rel="noopener noreferrer"&gt;Sagas&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-relational-resources/" rel="noopener noreferrer"&gt;Relational Resources&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://github.com/jbogard/adventureworkscosmos" rel="noopener noreferrer"&gt;Sample code from this series&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;So far in this series, we've looked at the ins and outs of moving beyond distributed transactions using persisted messages as a means of coordination between different documents (or resources). One common question in the example I give is "how do I actually make sure either both operations happen or neither?" To answer this question, we need to recognize that this "all-or-nothing" approach is a kind of transaction. But we've already said we're trying to avoid distributed transactions!&lt;/p&gt;

&lt;p&gt;We won't be building a new kind of distributed transaction, but instead one that lives longer than any one single request, or a long-lived transaction. To implement a long-lived transaction, we need to look at the Saga pattern, first described in &lt;a href="https://www.cs.cornell.edu/andru/cs711/2002fa/reading/sagas.pdf" rel="noopener noreferrer"&gt;the original Sagas paper (Molina, Salem)&lt;/a&gt;. The most common example of a saga I've seen described is booking a vacation. When you book a vacation, you need to:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Book a flight&lt;/li&gt;
&lt;li&gt;Book a hotel&lt;/li&gt;
&lt;li&gt;Reserve a car&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;You can't do all three at once - that's like getting a conference call together with all companies and getting consensus altogether. Not going to happen! Instead, we build this overall business transaction as a series of requests and compensating actions in case something goes wrong:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Cancel flight&lt;/li&gt;
&lt;li&gt;Cancel hotel&lt;/li&gt;
&lt;li&gt;Cancel car reservation&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Our saga operations can be linear (&lt;a href="https://lostechies.com/jimmybogard/2013/03/14/saga-implementation-patterns-controller/" rel="noopener noreferrer"&gt;controller pattern&lt;/a&gt;) or parallel (&lt;a href="https://lostechies.com/jimmybogard/2013/03/11/saga-implementation-patterns-observer/" rel="noopener noreferrer"&gt;observer pattern&lt;/a&gt;) or in microservices terms, &lt;a href="https://microservices.io/patterns/data/saga.html" rel="noopener noreferrer"&gt;orchestration/choreography&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;In order to satisfy our saga constraints, our requests must:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Be idempotent&lt;/li&gt;
&lt;li&gt;Can abort&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;And the compensating requests must:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Be idempotent&lt;/li&gt;
&lt;li&gt;Cannot abort&lt;/li&gt;
&lt;li&gt;Be commutative&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;In our example, we have a model that basically assumes success. We:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Approve an order&lt;/li&gt;
&lt;li&gt;Deduct stock&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Let's modify this a bit to create an order fulfillment saga. For this saga, we can fulfill an order if and only if:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Our order is approved&lt;/li&gt;
&lt;li&gt;We have enough stock&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;If our order is rejected, we need to release the stock. If we don't have enough stock, we need to un-approve (reject) our order. And keeping with our example, we need something to coordinate this activity - our saga. But rather than just call it some generic "saga", let's give it a meaningful name - &lt;code&gt;OrderFulfillmentSaga&lt;/code&gt;:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F8%2F2018%2FPicture0055.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F8%2F2018%2FPicture0055.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;This saga will coordinate the activities of the order request and stock. And because we need this saga to have the same communication properties of the other two documents, we can simply model this saga as just another document with our inbox/outbox!&lt;/p&gt;

&lt;p&gt;The overall flow will be:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Once an order is created, kick off a new order fulfillment saga&lt;/li&gt;
&lt;li&gt;This saga will coordinate actions between the stock and order request&lt;/li&gt;
&lt;li&gt;If the order is rejected, the saga needs to initiate a stock return&lt;/li&gt;
&lt;li&gt;If there is not enough stock, the saga needs to cancel the order&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Let's start with kicking off the saga!&lt;/p&gt;

&lt;h3&gt;
  
  
  Kicking things off
&lt;/h3&gt;

&lt;p&gt;When should we kick the saga off? It's tempting to do this in the initial request that creates a new order request, but remember - we can't put more than one document in a transaction unless we're very sure fulfillment and order requests will live close together in our Cosmos DB instance. That means we need to use our document messages to communicate with a saga - even if it doesn't exist!&lt;/p&gt;

&lt;p&gt;We don't want to fulfill an order request twice, and for our simple scenario let's just assume an order request can't be "retried". Originally, our &lt;code&gt;OrderRequest&lt;/code&gt; created an &lt;code&gt;ItemPurchased&lt;/code&gt; document message for each item - we'll remove that in favor of a single &lt;code&gt;OrderCreated&lt;/code&gt; document message:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public class OrderCreated : IDocumentMessage  
{
    public Guid Id { get; set; }
    public Guid OrderId { get; set; }

    public List&amp;lt;LineItem&amp;gt; LineItems { get; set; }

    public class LineItem
    {
        public int ProductId { get; set; }
        public int Quantity { get; set; }
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We &lt;em&gt;could&lt;/em&gt; just have the &lt;code&gt;OrderId&lt;/code&gt; and have the receiver then load up the &lt;code&gt;OrderRequest&lt;/code&gt;, but for simplicity sake (and assuming you can't change the order after created), we'll treat this information as immutable and keep it in the message. Now when we create an &lt;code&gt;OrderRequest&lt;/code&gt;, we'll also send this message:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public class OrderRequest : DocumentBase  
{
    public OrderRequest(ShoppingCart cart)
    {
        Id = Guid.NewGuid();
        Customer = new Customer
        {
            FirstName = "Jane",
            MiddleName = "Mary",
            LastName = "Doe"
        };
        Items = cart.Items.Select(li =&amp;gt; new LineItem
        {
            ProductId = li.Key,
            Quantity = li.Value.Quantity,
            ListPrice = li.Value.ListPrice,
            ProductName = li.Value.ProductName
        }).ToList();
        Status = Status.New;

        Send(new OrderCreated
        {
            Id = Guid.NewGuid(),
            OrderId = Id,
            LineItems = Items
                .Select(item =&amp;gt; new OrderCreated.LineItem
                {
                    ProductId = item.ProductId,
                    Quantity = item.Quantity
                })
                .ToList()
        });
    }
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;It's not much different than our original order creation - we're just now including the document message to initiate the order fulfillment saga.&lt;/p&gt;

&lt;p&gt;Our handler for this document message needs to find the right &lt;code&gt;OrderFulfillment&lt;/code&gt; saga document and let the saga handle the message:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public class OrderCreatedHandler : IDocumentMessageHandler&amp;lt;OrderCreated&amp;gt;  
{
    private readonly IDocumentDBRepository&amp;lt;OrderFulfillment&amp;gt; _repository;

    public OrderCreatedHandler(IDocumentDBRepository&amp;lt;OrderFulfillment&amp;gt; repository)
        =&amp;gt; _repository = repository;

    public async Task Handle(OrderCreated message)
    {
        var orderFulfillment = (await _repository
                .GetItemsAsync(s =&amp;gt; s.OrderId == message.OrderId))
            .FirstOrDefault();

        if (orderFulfillment == null)
        {
            orderFulfillment = new OrderFulfillment
            {
                Id = Guid.NewGuid(),
                OrderId = message.OrderId
            };

            await _repository.CreateItemAsync(orderFulfillment);
        }

        orderFulfillment.Handle(message);

        await _repository.UpdateItemAsync(orderFulfillment);
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Not shown here - but we do need to make sure we only have a single fulfillment saga per order, so we can configure inside Cosmos DB &lt;code&gt;OrderId&lt;/code&gt; as a unique index.&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;orderFulfillment.Handle&lt;/code&gt; method needs to start, and request stock:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public void Handle(OrderCreated message)  
{
    Process(message, m =&amp;gt;
    {
        if (IsCancelled)
            return;

        LineItems = m.LineItems
            .Select(li =&amp;gt; new LineItem
            {
                ProductId = li.ProductId,
                AmountRequested = li.Quantity
            })
            .ToList();

        foreach (var lineItem in LineItems)
        {
            Send(new StockRequest
            {
                Id = Guid.NewGuid(),
                ProductId = lineItem.ProductId,
                AmountRequested = lineItem.AmountRequested,
                OrderFulfillmentId = Id
            });
        }
    });
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In my example, I've made the &lt;code&gt;OrderFulfillment&lt;/code&gt; saga coordinate with &lt;code&gt;Stock&lt;/code&gt; with our &lt;code&gt;StockRequest&lt;/code&gt;. This is instead of &lt;code&gt;Stock&lt;/code&gt; listening for &lt;code&gt;OrderCreated&lt;/code&gt; itself. My general thought here is that fulfillment manages the requests/returns for stock, and any business logic around that.&lt;/p&gt;

&lt;p&gt;I also have a little check at the beginning - if an order is cancelled, we don't want to send out stock requests. This is the piece that's enforcing commutative requests - we might receive an order rejected notice &lt;em&gt;before&lt;/em&gt; receiving the order created notice! When it comes to messaging, I always assume messages are received out of order, which means our business logic needs to be able to handle these situations.&lt;/p&gt;

&lt;h3&gt;
  
  
  Handling stock requests
&lt;/h3&gt;

&lt;p&gt;Our original &lt;code&gt;Stock&lt;/code&gt; implementation was quite naive, but this time we want to more intelligently handle orders. In our stock handler, we'll still have a document per product, but now it can make a decision based on the quantity available:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public void Handle(StockRequest message)  
{
    Process(message, e =&amp;gt;
    {
        if (QuantityAvailable &amp;gt;= message.AmountRequested)
        {
            QuantityAvailable -= e.AmountRequested;
            Send(new StockRequestConfirmed
            {
                Id = Guid.NewGuid(),
                OrderFulfillmentId = e.OrderFulfillmentId,
                ProductId = ProductId
            });
        }
        else
        {
            Send(new StockRequestDenied
            {
                Id = Guid.NewGuid(),
                OrderFulfillmentId = e.OrderFulfillmentId,
                ProductId = ProductId
            });
        }
    });
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Because we're using document messages with our inbox de-duping messages, we don't need to worry about processing the stock request twice. Our simple logic just checks the stock, and if it's successful we can deduct the stock and return a &lt;code&gt;StockRequestConfirmed&lt;/code&gt; message. If not, we can return a &lt;code&gt;StockRequestDenied&lt;/code&gt; message.&lt;/p&gt;

&lt;h3&gt;
  
  
  A successful order fulfillment
&lt;/h3&gt;

&lt;p&gt;Our original logic said that "an order can be fulfilled if the order is approved and we have enough stock". Approving an order is a human decision, so we have a basic form for doing so:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;@if (Model.Order.Status == Status.New)
{
    &amp;lt;form asp-controller="Order" asp-action="Reject" asp-route-id="@Model.Order.Id" method="post"&amp;gt;
        &amp;lt;input type="submit" value="Reject"/&amp;gt;
    &amp;lt;/form&amp;gt;
    &amp;lt;form asp-controller="Order" asp-action="Approve" asp-route-id="@Model.Order.Id" method="post"&amp;gt;
        &amp;lt;input type="submit" value="Approve"/&amp;gt;
    &amp;lt;/form&amp;gt;
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And when the order is approved, we just delegate to MediatR to handle this request:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public class ApproveOrder  
{
    public class Request : IRequest
    {
        public Guid Id { get; set; }
    }

    public class Handler : IRequestHandler&amp;lt;Request&amp;gt;
    {
        private readonly IDocumentDBRepository&amp;lt;OrderRequest&amp;gt; _orderRepository;

        public Handler(IDocumentDBRepository&amp;lt;OrderRequest&amp;gt; orderRepository)
        {
            _orderRepository = orderRepository;
        }

        public async Task&amp;lt;Unit&amp;gt; Handle(Request request, CancellationToken cancellationToken)
        {
            var orderRequest = await _orderRepository.GetItemAsync(request.Id);

            orderRequest.Approve();

            await _orderRepository.UpdateItemAsync(orderRequest);

            return Unit.Value;
        }
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Which then delegates to our document to approve the order request:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public void Approve()  
{
    if (Status == Status.Approved)
        return;

    if (Status == Status.Rejected)
        throw new InvalidOperationException("Cannot approve a rejected order.");

    Status = Status.Approved;
    Send(new OrderApproved
    {
        Id = Guid.NewGuid(),
        OrderId = Id
    });
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We only want to send out the &lt;code&gt;OrderApproved&lt;/code&gt; message once, so just some basic status checking handles that.&lt;/p&gt;

&lt;p&gt;On the order fulfillment side:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public void Handle(OrderApproved message)  
{
    Process(message, m =&amp;gt;
    {
        OrderApproved = true;

        if (IsCancelled)
        {
            ProcessCancellation();
        }
        else
        {
            CheckForSuccess();
        }
    });
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Each time we receive some external notification, we need to process the success/failure path, which I'll come back to in a bit. Our handler for &lt;code&gt;StockRequestConfirmed&lt;/code&gt; will be similar, except we're tracking stock on a line item by line item basis:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public void Handle(StockRequestConfirmed message)  
{
    Process(message, m =&amp;gt;
    {
        var lineItem = LineItems.Single(li =&amp;gt; li.ProductId == m.ProductId);
        lineItem.StockConfirmed = true;

        if (IsCancelled)
        {
            ReturnStock(lineItem);
        }
        else
        {
            CheckForSuccess();
        }
    });
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The &lt;code&gt;CheckForSuccess&lt;/code&gt; method will look to see if all the order fulfillment requirements are met:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;private void CheckForSuccess()  
{
    if (IsCancelled)
        return;

    if (LineItems.All(li =&amp;gt; li.StockConfirmed) &amp;amp;&amp;amp; OrderApproved)
    {
        Send(new OrderFulfillmentSuccessful
        {
            Id = Guid.NewGuid(),
            OrderId = OrderId
        });
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Only if all of our stock has been confirmed and our order has been approved will we send a message back to the &lt;code&gt;Order&lt;/code&gt; document to then finally complete the order:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public void Handle(OrderFulfillmentSuccessful message)  
{
    Process(message, m =&amp;gt;
    {
        if (Status == Status.Rejected || Status == Status.Cancelled)
            return;

        Status = Status.Completed;
    });
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The overall message flow looks something like this:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F8%2F2018%2FPicture0056.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F8%2F2018%2FPicture0056.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;For each step along the way, we've got idempotency handled for us by the inbox/outbox structures. However, we still need to handle out-of-order messages, which is why you'll see success/fail checks every time we receive a notification.&lt;/p&gt;

&lt;p&gt;Now that we've got the success path taken care of, let's look at the failure paths.&lt;/p&gt;

&lt;h3&gt;
  
  
  Cancelling the order fulfillment
&lt;/h3&gt;

&lt;p&gt;The first way our order fulfillment can be cancelled is if an order is rejected. From the web app, our &lt;code&gt;Order&lt;/code&gt; document handles a rejection:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public void Reject()  
{
    if (Status == Status.Rejected)
        return;

    if (Status == Status.Approved)
        throw new InvalidOperationException("Cannot reject an approved order.");

    if (Status == Status.Approved)
        throw new InvalidOperationException("Cannot reject a completed order.");

    Status = Status.Rejected;
    Send(new OrderRejected
    {
        Id = Guid.NewGuid(),
        OrderId = Id
    });
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Our order sends an &lt;code&gt;OrderRejected&lt;/code&gt; document message that our order fulfillment document receives:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public void Handle(OrderRejected message)  
{
    Process(message, m =&amp;gt;
    {
        OrderRejected = true;

        Cancel();
    });
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The &lt;code&gt;Cancel&lt;/code&gt; method marks the order fulfillment as cancelled and then processes the cancellation:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;private void Cancel()  
{
    IsCancelled = true;

    ProcessCancellation();
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Similarly, a notification of &lt;code&gt;StockRequestDenied&lt;/code&gt; will cancel the order fulfillment:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public void Handle(StockRequestDenied message)  
{
    Process(message, m =&amp;gt;
    {
        Cancel();
    });
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In order to process our order fulfillment cancellation, we need to do a couple of things. First, we need to notify our &lt;code&gt;Order&lt;/code&gt; document that it needs to be cancelled. And for any &lt;code&gt;Stock&lt;/code&gt; items that were fulfilled, we need to return that stock:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;private void ProcessCancellation()  
{
    if (!CancelOrderRequested &amp;amp;&amp;amp; !OrderRejected)
    {
        CancelOrderRequested = true;
        Send(new CancelOrderRequest
        {
            Id = Guid.NewGuid(),
            OrderId = OrderId
        });
    }

    foreach (var lineItem in LineItems.Where(li =&amp;gt; li.StockConfirmed))
    {
        ReturnStock(lineItem);
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Each step along the way, we keep track of what messages we've sent out so that we don't send notifications twice. To return stock:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;private void ReturnStock(LineItem lineItem)  
{
    if (lineItem.StockReturnRequested)
        return;

    lineItem.StockReturnRequested = true;
    Send(new StockReturnRequested
    {
        Id = Guid.NewGuid(),
        ProductId = lineItem.ProductId,
        AmountToReturn = lineItem.AmountRequested
    });
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If stock item has already had a return requested, we just skip it. Finally, the order can receive the cancel order request:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public void Handle(CancelOrderRequest message)  
{
    Process(message, m =&amp;gt;
    {
        if (Status == Status.Rejected)
            return;

        Status = Status.Cancelled;
    });
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;With our failure flow in place, the message flows looks something like:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F8%2F2018%2FPicture0057.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F8%2F2018%2FPicture0057.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Our order fulfillment saga can now handle the complex process of managing stock and order approvals, keeping track of each step along the way and dealing with success/failure when it receives the notifications. It handles idempotency, retries, and commutative/out-of-order messages.&lt;/p&gt;

&lt;p&gt;In the next post, we'll look at how we can implement the inbox/outbox pattern for other resources, allowing us to bridge to other kinds of databases where a distributed transaction is just plain impossible.&lt;/p&gt;

</description>
      <category>microservices</category>
      <category>architecture</category>
      <category>distributedsystems</category>
    </item>
    <item>
      <title>Life Beyond Distributed Transactions: An Apostate's Implementation - Dispatcher Failure Recovery</title>
      <dc:creator>Jimmy Bogard</dc:creator>
      <pubDate>Thu, 30 Aug 2018 13:15:14 +0000</pubDate>
      <link>https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---dispatcher-failure-recovery-2p00</link>
      <guid>https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---dispatcher-failure-recovery-2p00</guid>
      <description>

&lt;p&gt;Posts in this series:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---a-primer-2po6-temp-slug-8261948"&gt;A Primer&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---document-coordination-leb-temp-slug-1009285"&gt;Document Coordination&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---document-example-4fpk-temp-slug-5917155"&gt;Document Example&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---dispatching-example-31i6-temp-slug-684211"&gt;Dispatching Example&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---failures-and-retries-m6-temp-slug-2730961"&gt;Failures and Retries&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-dispatcher-failure-recovery/"&gt;Failure Recovery&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-sagas/"&gt;Sagas&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-relational-resources/"&gt;Relational Resources&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://github.com/jbogard/adventureworkscosmos"&gt;Sample code from this series&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In the last post, we looked at how we can recover from exceptions from &lt;em&gt;inside&lt;/em&gt; our code handling messages. We perform some action in our document, and something goes wrong. But what happens when something goes wrong &lt;em&gt;during&lt;/em&gt; the dispatch process:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--mX6PIgQ6--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0054.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--mX6PIgQ6--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0054.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;If our dispatcher itself fails, either:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Pulling a message from the outbox&lt;/li&gt;
&lt;li&gt;Sending a message to the receiver&lt;/li&gt;
&lt;li&gt;Saving documents&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Then our documents are still consistent, but we've lost the execution flow to dispatch. We've mitigated our failures somewhat, but we still can have the possibility of some unrecoverable failure in our dispatcher, and no amount of exception handling can prevent a document with a message sitting in its outbox, waiting for processing.&lt;/p&gt;

&lt;p&gt;If we were able to wrap everything, documents and queues, in a transaction, then we could gracefully recover. However, the point of this series is that we &lt;em&gt;don't&lt;/em&gt; have access to distributed transactions, so that option is out.&lt;/p&gt;

&lt;p&gt;What we need is some kind of background process looking for documents with pending messages in their outbox, ready to process.&lt;/p&gt;

&lt;h3&gt;
  
  
  Designing the dispatch rescuer
&lt;/h3&gt;

&lt;p&gt;We already have the dispatch recovery process using async messaging to retry an individual dispatch, which works great when the failure is in application code. When the failure is environmental, our only clue something is wrong is a document with messages in their outbox.&lt;/p&gt;

&lt;p&gt;The general process to recover from these failures would be:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Find any documents with unprocessed messages (ideally oldest first)&lt;/li&gt;
&lt;li&gt;Retry them one at a time&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;We have the possibility though that we have:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;In flight dispatching&lt;/li&gt;
&lt;li&gt;In flight retries&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Ideally, we have some sort of lagging processor, so that when we have issues, we don't interfere with normal processing. Luckily for us, Cosmos DB already comes with the ability to be notified that documents have been changed, the &lt;a href="https://docs.microsoft.com/en-us/azure/cosmos-db/change-feed"&gt;Change Feed&lt;/a&gt;, and this change feed even lets us work with a built-in delay. After each document changes, we can wait some amount of time where we assume that dispatching happened, and re-check the document to make sure dispatching occurred.&lt;/p&gt;

&lt;p&gt;Our rescuer will:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Get notified when a document changes&lt;/li&gt;
&lt;li&gt;Check to see if there are still outbox messages to process&lt;/li&gt;
&lt;li&gt;Send a message to reprocess that document&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;It's somewhat naive, as we'll get notified for all document changes. To make our lives a little bit easier, we can turn off immediate processing for document message dispatching and just dispatch through asynchronous processes, but it's not necessary.&lt;/p&gt;

&lt;h3&gt;
  
  
  Creating the change feed processor
&lt;/h3&gt;

&lt;p&gt;Using the &lt;a href="https://docs.microsoft.com/en-us/azure/cosmos-db/change-feed#using-the-change-feed-processor-library"&gt;documentation as our guide&lt;/a&gt;, we need to create two components:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;A document feed observer to receive document change notifications&lt;/li&gt;
&lt;li&gt;A change feed processor to host and invoke our observer&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Since we already have a background processor in our dispatcher, we can simply host the observer in the same endpoint. The observer won't actually be doing the work, however - we'll still send a message out to process the document. This is because NServiceBus still provides all the logic around retries and poison messages that I don't want to code again. Like most of my integrations, I kick out the work into a durable message and NServiceBus as quickly as possible.&lt;/p&gt;

&lt;p&gt;That makes my observer pretty small:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public class DocumentFeedObserver&amp;lt;T&amp;gt; : IChangeFeedObserver  
    where T : DocumentBase
{
    static ILog log = LogManager.GetLogger&amp;lt;DocumentFeedObserver&amp;lt;T&amp;gt;&amp;gt;();

    public Task OpenAsync(IChangeFeedObserverContext context) 
        =&amp;gt; Task.CompletedTask;

    public Task CloseAsync(
        IChangeFeedObserverContext context, 
        ChangeFeedObserverCloseReason reason)
        =&amp;gt; Task.CompletedTask;

    public async Task ProcessChangesAsync(
        IChangeFeedObserverContext context, 
        IReadOnlyList&amp;lt;Document&amp;gt; docs, 
        CancellationToken cancellationToken)
    {
        foreach (var doc in docs)
        {
            log.Info($"Processing changes for document {doc.Id}");

            var item = (dynamic)doc;

            if (item.Outbox.Count &amp;gt; 0)
            {
                ProcessDocumentMessages message = ProcessDocumentMessages.New&amp;lt;T&amp;gt;(item);

                await Program.Endpoint.SendLocal(message);
            }
        }
    }
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;The &lt;code&gt;OpenAsync&lt;/code&gt; and &lt;code&gt;CloseAsync&lt;/code&gt; methods won't do anything, all my logic is in the &lt;code&gt;ProcessChangesAsync&lt;/code&gt; method. In that method, I get a collection of changed documents. I made my &lt;code&gt;DocumentChangeObserver&lt;/code&gt; generic because each observer observes only one collection, so I have to create distinct observer instances per concrete &lt;code&gt;DocumentBase&lt;/code&gt; type.&lt;/p&gt;

&lt;p&gt;In the method, I loop over all the documents passed in and look to see if the document has any messages in the outbox. If so, I'll create a new &lt;code&gt;ProcessDocumentMessages&lt;/code&gt; to send to myself (as I'm also hosting NServiceBus in this application), which will then process the document messages.&lt;/p&gt;

&lt;p&gt;With our simple observer in place, we need to incorporate the observer in our application startup.&lt;/p&gt;

&lt;h3&gt;
  
  
  Configuring the observer
&lt;/h3&gt;

&lt;p&gt;For our observer, we have a couple of choices on how we want to process document changes. Because our observer will get called for &lt;em&gt;every&lt;/em&gt; document change, we want to be careful about the work it does.&lt;/p&gt;

&lt;p&gt;Our original design had document messages dispatched in the same request as the original work. If we keep this, we want to make sure that we minimize the amount of rework for a document with messages. Ideally, our observer only kicks out messages when there is truly something wrong with dispatching. This will also minimize the amount of queue messages, reserving them for the error case.&lt;/p&gt;

&lt;p&gt;So a simple solution would be to just introduce some delay in our processing:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;private static ChangeFeedProcessorBuilder CreateBuilder&amp;lt;T&amp;gt;(DocumentClient client)  
    where T : DocumentBase
{
    var builder = new ChangeFeedProcessorBuilder();
    var uri = new Uri(CosmosUrl);
    var dbClient = new ChangeFeedDocumentClient(client);

    builder
        .WithHostName(HostName)
        .WithFeedCollection(new DocumentCollectionInfo
        {
            DatabaseName = typeof(T).Name,
            CollectionName = "Items",
            Uri = uri,
            MasterKey = CosmosKey
        })
        .WithLeaseCollection(new DocumentCollectionInfo
        {
            DatabaseName = typeof(T).Name,
            CollectionName = "Leases",
            Uri = uri,
            MasterKey = CosmosKey
        })
        .WithProcessorOptions(new ChangeFeedProcessorOptions
        {
            FeedPollDelay = TimeSpan.FromSeconds(15),
        })
        .WithFeedDocumentClient(dbClient)
        .WithLeaseDocumentClient(dbClient)
        .WithObserver&amp;lt;DocumentFeedObserver&amp;lt;T&amp;gt;&amp;gt;();

    return builder;
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;The &lt;code&gt;ChangeFeedProcessorBuilder&lt;/code&gt; is configured for every document type we want to observe, with a timespan in this example of 15 seconds. I could bump this up a bit - say to an hour or so. It will really depend on the business, the SLAs they expect for work to complete.&lt;/p&gt;

&lt;p&gt;Finally, in our application startup, we need to create the builder, processor, and start it all up:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Endpoint = await NServiceBus.Endpoint.Start(endpointConfiguration)  
    .ConfigureAwait(false);

var builder = CreateBuilder&amp;lt;OrderRequest&amp;gt;(client);  
var processor = await builder.BuildAsync();

await processor.StartAsync();

Console.WriteLine("Press any key to exit");  
Console.ReadKey();

await Endpoint.Stop()  
    .ConfigureAwait(false);

await processor.StopAsync();
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;With this in place, we can have a final guard against failures, assuming that someone completely pulled the plug on our application and all we have left is a document with messages sitting in its outbox.&lt;/p&gt;

&lt;p&gt;In our next post, we'll look at using sagas to coordinate changes between documents - what happens if we want either all, or none of our changes to be processed in our documents?&lt;/p&gt;


</description>
      <category>microservices</category>
      <category>architecture</category>
      <category>distributedsystems</category>
    </item>
    <item>
      <title>Life Beyond Distributed Transactions: An Apostate's Implementation - Failures and Retries</title>
      <dc:creator>Jimmy Bogard</dc:creator>
      <pubDate>Thu, 16 Aug 2018 15:05:49 +0000</pubDate>
      <link>https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---failures-and-retries-2lpe</link>
      <guid>https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---failures-and-retries-2lpe</guid>
      <description>&lt;p&gt;Posts in this series:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---a-primer-2po6-temp-slug-8261948"&gt;A Primer&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---document-coordination-leb-temp-slug-1009285"&gt;Document Coordination&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---document-example-4fpk-temp-slug-5917155"&gt;Document Example&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---dispatching-example-31i6-temp-slug-684211"&gt;Dispatching Example&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-failures-and-retries/" rel="noopener noreferrer"&gt;Failures and Retries&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-dispatcher-failure-recovery/" rel="noopener noreferrer"&gt;Failure Recovery&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-sagas/" rel="noopener noreferrer"&gt;Sagas&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-relational-resources/" rel="noopener noreferrer"&gt;Relational Resources&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://github.com/jbogard/adventureworkscosmos" rel="noopener noreferrer"&gt;Sample code from this series&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In the last post, we looked at an example of dispatching document messages to other documents using a central dispatcher. Our example worked well in the happy path scenario, but what happens when something goes wrong? We of course do not want a failure in dispatching messages to make the entire request fail, but what would that mean for us?&lt;/p&gt;

&lt;p&gt;We described a general solution to put the dispatching work aside using queues and messaging, effectively saying "yes, dispatching failed, so let's put it aside to look at in the future". This would allow the overall main request to complete:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F7%2F2018%2FPicture0051.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F7%2F2018%2FPicture0051.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Our original example also assumed that we would dispatch our messages &lt;em&gt;immediately&lt;/em&gt; in the context of the same request, which isn't a bad default but maybe isn't always desirable. Let's first look at the scenario of dispatching immediately, and what failures could mean.&lt;/p&gt;

&lt;h3&gt;
  
  
  Characterizing our failures
&lt;/h3&gt;

&lt;p&gt;Dispatching failures could happen for a number of reasons, but I generally see a continuum:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Transient&lt;/li&gt;
&lt;li&gt;Delayed&lt;/li&gt;
&lt;li&gt;Permanent&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;My failures usually have some sort of time component associated with them. A transient failure may mean that if I simply try the action again immediately, it may work. This most often comes up with some sort of concurrency violation against the database.&lt;/p&gt;

&lt;p&gt;Delayed failures are a bit different, where I won't succeed if I try immediately, but I will if I just wait some amount of time.&lt;/p&gt;

&lt;p&gt;Permanent failures mean there's an unrecoverable failure, and no amount of retries will allow the operation to succeed.&lt;/p&gt;

&lt;p&gt;Of course, we could simply ignore failures, but our business and customers might not agree with that approach. How might we handle each of these kinds of failures?&lt;/p&gt;

&lt;h3&gt;
  
  
  Transient failures
&lt;/h3&gt;

&lt;p&gt;If something goes wrong, can we simply retry the operation? That seems fairly straightforward - but we don't want to retry &lt;em&gt;too&lt;/em&gt; many times. We can implement some simple policies, either with a hardcoded number of retries or using something like the &lt;a href="http://www.thepollyproject.org/" rel="noopener noreferrer"&gt;Polly Project&lt;/a&gt; to retry an action.&lt;/p&gt;

&lt;p&gt;To keep things simple, we can have a policy to address the most common transient failure - optimistic concurrency problems. We first want to &lt;a href="https://docs.microsoft.com/en-us/azure/cosmos-db/faq#how-does-the-sql-api-provide-concurrency" rel="noopener noreferrer"&gt;enable OCC checks&lt;/a&gt;, of course:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public async Task&amp;lt;Document&amp;gt; UpdateItemAsync(T item)  
{
    var ac = new AccessCondition
    {
        Condition = item.ETag,
        Type = AccessConditionType.IfMatch
    };

    return await _client.ReplaceDocumentAsync(
        UriFactory.CreateDocumentUri(DatabaseId, CollectionId, item.Id.ToString()),
        item,
        new RequestOptions { AccessCondition = ac });
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;When we get a concurrency violation, this results in a &lt;code&gt;DocumentClientException&lt;/code&gt; with a special status code (hooray HTTP!). We need some way to wrap our request and retry if necessary - time for another MediatR behavior! This one will retry our action some number of times:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public class RetryUnitOfWorkBehavior&amp;lt;TRequest, TResponse&amp;gt;  
    : IPipelineBehavior&amp;lt;TRequest, TResponse&amp;gt;
{
    private readonly IUnitOfWork _unitOfWork;

    public RetryUnitOfWorkBehavior(IUnitOfWork unitOfWork) 
        =&amp;gt; _unitOfWork = unitOfWork;

    public Task&amp;lt;TResponse&amp;gt; Handle(
        TRequest request, 
        CancellationToken cancellationToken,
        RequestHandlerDelegate&amp;lt;TResponse&amp;gt; next)
    {
        var retryCount = 0;

        while (true)
        {
            try
            {
                return next();
            }
            catch (DocumentClientException e)
            {
                if (e.StatusCode != HttpStatusCode.PreconditionFailed)
                    throw;

                if (retryCount &amp;gt;= 5)
                    throw;

                _unitOfWork.Reset();

                retryCount++;
            }
        }
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If our action files due to a concurrency problem, we need to clear out our unit of work's identity map and try again:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public void Reset()  
{
    _identityMap.Clear();
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then we just need to register our behavior with the container like our original unit of work behavior, and we're set. We could have of course modified our original behavior to add retries - but I want to keep them separate because they truly are different concerns.&lt;/p&gt;

&lt;p&gt;That works for immediate failures, but we still haven't looked at failures in our message dispatching. For that, we'll need to involve some messaging.&lt;/p&gt;

&lt;h3&gt;
  
  
  Deferred dispatching with messaging and NServiceBus
&lt;/h3&gt;

&lt;p&gt;The immediate retries can take care of transient failures during a request, but if the there's some deeper issue, we want to defer the document message dispatching to some time in the future. To make my life easier, and not have to implement half the &lt;a href="https://www.enterpriseintegrationpatterns.com/" rel="noopener noreferrer"&gt;Enterprise Integration Patterns book&lt;/a&gt; myself, I'll leverage &lt;a href="https://particular.net/nservicebus" rel="noopener noreferrer"&gt;NServiceBus&lt;/a&gt; to manage my messaging.&lt;/p&gt;

&lt;p&gt;Our original dispatcher looped through our unit of work's identity map to find documents with messages that need dispatching. We'll want to augment that behavior to catch any failures, and dispatch those messages offline:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public interface IOfflineDispatcher  
{
    Task DispatchOffline(DocumentBase document);
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Our &lt;code&gt;Complete&lt;/code&gt; method of the unit of work will now take these failed dispatches and instruct our offline dispatcher to dispatch these offline:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public async Task Complete()  
{
    var toSkip = new HashSet&amp;lt;DocumentBase&amp;gt;(DocumentBaseEqualityComparer.Instance);

    while (_identityMap
            .Except(toSkip, DocumentBaseEqualityComparer.Instance)
        .Any(a =&amp;gt; a.Outbox.Any()))
    {
        var document = _identityMap
            .Except(toSkip, DocumentBaseEqualityComparer.Instance)
            .FirstOrDefault(a =&amp;gt; a.Outbox.Any());

        if (document == null)
            continue;

        var ex = await _dispatcher.Dispatch(document);
        if (ex != null)
        {
            toSkip.Add(document);

            await _offlineDispatcher.DispatchOffline(document);
        }
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is a somewhat naive implementation - it doesn't allow for partial document message processing. If a document has 3 messages, we mark the retry the entire document instead of an individual message at at time. We could manage this more granularly, by including a "retry" collection on our document. But this introduces more issues - we could still have some system failure after dispatch and our document message never make it to retry.&lt;/p&gt;

&lt;p&gt;When our transaction scope is individual operations instead of the entire request, we have to assume failure at &lt;em&gt;every&lt;/em&gt; instance and examine what might go wrong.&lt;/p&gt;

&lt;p&gt;The offline dispatcher uses NServiceBus to send a durable message out:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public class UniformSessionOfflineDispatcher  
    : IOfflineDispatcher
{
    private readonly IUniformSession _uniformSession;

    public UniformSessionOfflineDispatcher(IUniformSession uniformSession)
        =&amp;gt; _uniformSession = uniformSession;

    public Task DispatchOffline(DocumentBase document)
        =&amp;gt; _uniformSession.Send(ProcessDocumentMessages.New(document));
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The &lt;a href="https://docs.particular.net/nservicebus/messaging/uniformsession" rel="noopener noreferrer"&gt;&lt;code&gt;IUniformSession&lt;/code&gt; piece&lt;/a&gt; from NServiceBus to send a message from any context (in a web application, backend service, etc.). Our message just includes the document ID and type:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public class ProcessDocumentMessages : ICommand  
{
    public Guid DocumentId { get; set; }
    public string DocumentType { get; set; }

    // For NSB
    public ProcessDocumentMessages() { }

    private ProcessDocumentMessages(Guid documentId, string documentType)
    {
        DocumentId = documentId;
        DocumentType = documentType;
    }

    public static ProcessDocumentMessages New&amp;lt;TDocument&amp;gt;(
        TDocument document)
        where TDocument : DocumentBase
    {
        return new ProcessDocumentMessages(
            document.Id, 
            document.GetType().AssemblyQualifiedName);
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We can use this information to load our document from the repository. With this message in place, we now need the component that will &lt;em&gt;receive&lt;/em&gt; our message. For this, it will really depend on our deployment, but for now I'll just make a .NET Core console application that includes our NServiceBus hosting piece and a handler for that message:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F7%2F2018%2FPicture0052.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F7%2F2018%2FPicture0052.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;I won't dig too much into the NServiceBus configuration as it's really not that germane, but let's look at the handler for that message:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public class ProcessDocumentMessagesHandler  
    : IHandleMessages&amp;lt;ProcessDocumentMessages&amp;gt;
{
    private readonly IDocumentMessageDispatcher _dispatcher;

    public ProcessDocumentMessagesHandler(IDocumentMessageDispatcher dispatcher) 
        =&amp;gt; _dispatcher = dispatcher;

    public Task Handle(ProcessDocumentMessages message, IMessageHandlerContext context)
        =&amp;gt; _dispatcher.Dispatch(message);
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Also not very exciting! This class is what NServiceBus dispatches the durable message to. For our simple example, I'm using RabbitMQ, so if something goes wrong our message goes into a queue:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F7%2F2018%2FPicture0053.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fjimmybogardsblog.blob.core.windows.net%2Fjimmybogardsblog%2F7%2F2018%2FPicture0053.png"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Our handler receives this message to process it. The dispatcher is slightly different, as it needs to work with a message instead of an actual document, so it needs to load it up first:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public async Task Dispatch(ProcessDocumentMessages command)  
{
    var documentType = Type.GetType(command.DocumentType);
    var repository = GetRepository(documentType);
    var document = await repository.FindById(command.DocumentId);

    if (document == null)
    {
        return;
    }

    foreach (var message in document.Outbox.ToArray())
    {
        var handler = GetHandler(message);

        await handler.Handle(message, _serviceFactory);

        document.ProcessDocumentMessage(message);

        await repository.Update(document);
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;One other key difference in this dispatch method is that we don't wrap anything in any kind of &lt;code&gt;try-catch&lt;/code&gt; to report back errors. In an in-process dispatch mode, we still want the main request to succeed. In the offline processing mode, we're only dealing with document dispatching. And since we're using NServiceBus, we can rely on its &lt;a href="https://docs.particular.net/nservicebus/recoverability/" rel="noopener noreferrer"&gt;built-in recoverability behavior&lt;/a&gt; with immediate and delayed retries, eventually moving messages to an error queue.&lt;/p&gt;

&lt;p&gt;With this in place, we can put forth an optimistic, try-immediately policy, but fall back on durable messaging if something goes wrong with our immediate dispatch. It's not bulletproof, and in the next post, I'll look at how we can implement some sort of doomsday scenario, where we have a failure between dispatch failure and queuing the retry message.&lt;/p&gt;

</description>
      <category>architecture</category>
      <category>microservices</category>
      <category>distributedsystems</category>
    </item>
    <item>
      <title>Life Beyond Distributed Transactions: An Apostate's Implementation - Dispatching Example</title>
      <dc:creator>Jimmy Bogard</dc:creator>
      <pubDate>Mon, 13 Aug 2018 19:24:06 +0000</pubDate>
      <link>https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---dispatching-example-3nnc</link>
      <guid>https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---dispatching-example-3nnc</guid>
      <description>

&lt;p&gt;Posts in this series:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---a-primer-2po6-temp-slug-8261948"&gt;A Primer&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---document-coordination-leb-temp-slug-1009285"&gt;Document Coordination&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---document-example-4fpk-temp-slug-5917155"&gt;Document Example&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-dispatching-example/"&gt;Dispatching Example&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-failures-and-retries/"&gt;Failures and Retries&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-dispatcher-failure-recovery/"&gt;Failure Recovery&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-sagas/"&gt;Sagas&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-relational-resources/"&gt;Relational Resources&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://github.com/jbogard/adventureworkscosmos"&gt;Sample code from this series&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In the last post, we looked at refactoring our documents to use messaging to communicate changes. We're still missing something, however - the dispatcher:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--uPSqc1jt--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0050.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--uPSqc1jt--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0050.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Our dispatcher is the main component that facilitates document communication. For a given document, it needs to:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Read messages out of a document's outbox&lt;/li&gt;
&lt;li&gt;Find the the document message handler for each, and invoke&lt;/li&gt;
&lt;li&gt;Manage failures for a document message handler&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;We'll tackle that last piece in a future post. There's one piece we do need to think about first - where does the dispatcher get its list of documents to dispatch messages to?&lt;/p&gt;

&lt;p&gt;Before we get to the dispatcher, we need to solve for this problem - knowing which documents need dispatching!&lt;/p&gt;

&lt;h3&gt;
  
  
  Introducing a unit of work
&lt;/h3&gt;

&lt;p&gt;For a given request, we'll load up a document and affect some change with it. We already have a pinch point in which our documents are loaded - the repository. If we want to dispatch document messages in the same request, we'll need to keep track of our documents that we've loaded in a request. For this, we can use a &lt;a href="https://martinfowler.com/eaaCatalog/unitOfWork.html"&gt;Unit of Work&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;Any ORM that you use will implement this pattern - for Entity Framework, for example, the DbContext is your Unit of Work. For Cosmos DB's SDK, there really isn't a concept of these ORM patterns. We have to introduce them ourselves.&lt;/p&gt;

&lt;p&gt;Our unit of work will keep track of documents for a given session/request, letting us interact with the loaded documents during the dispatch phase of a request. Our Unit of Work will also serve as an &lt;a href="https://www.martinfowler.com/eaaCatalog/identityMap.html"&gt;identity map&lt;/a&gt; - the thing that makes sure that when we load a document in a request, it's only loaded once. Here's our basic &lt;code&gt;IUnitOfWork&lt;/code&gt; interface:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public interface IUnitOfWork  
{
    T Find&amp;lt;T&amp;gt;(Guid id) where T : DocumentBase;
    void Register(DocumentBase document);
    void Register(IEnumerable&amp;lt;DocumentBase&amp;gt; documents);
    Task Complete();
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;The implementation contains the "identity map" as a simple &lt;code&gt;HashSet&lt;/code&gt;&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public class UnitOfWork : IUnitOfWork  
{
    private readonly ISet&amp;lt;DocumentBase&amp;gt; _identityMap 
        = new HashSet&amp;lt;DocumentBase&amp;gt;(DocumentBaseEqualityComparer.Instance);
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Then we can register an instance with our &lt;code&gt;UnitOfWork&lt;/code&gt;:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public void Register(DocumentBase document)  
{
    _identityMap.Add(document);
}

public void Register(IEnumerable&amp;lt;DocumentBase&amp;gt; documents)  
{
    foreach (var document in documents)
    {
        Register(document);
    }
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Finding an existing &lt;code&gt;DocumentBase&lt;/code&gt; just searches our identity map:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public T Find&amp;lt;T&amp;gt;(Guid id)  
    where T : DocumentBase
    =&amp;gt; _identityMap.OfType&amp;lt;T&amp;gt;().FirstOrDefault(ab =&amp;gt; ab.Id == id);
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;We'll come back to the &lt;code&gt;Complete&lt;/code&gt; method, because this will be the part where we dispatch. We still need the part where we register our documents in the unit of work, and this will be in our repository implementation:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public async Task&amp;lt;T&amp;gt; GetItemAsync(Guid id)  
{
    try
    {
        var root = _unitOfWork.Find&amp;lt;T&amp;gt;(id);

        if (root != null)
            return root;

        Document document = await _client.ReadDocumentAsync(UriFactory.CreateDocumentUri(DatabaseId, CollectionId, id.ToString()));
        var item = (T)(dynamic)document;

        _unitOfWork.Register(item);

        return item;
    }
    catch (DocumentClientException e)
    {
        if (e.StatusCode == System.Net.HttpStatusCode.NotFound)
        {
            return null;
        }

        throw;
    }
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;We'll repeat this for any method in our repository that loads a document, registering and looking up in our unit of work.&lt;/p&gt;

&lt;p&gt;With a means to track our documents, let's see how we'll dispatch.&lt;/p&gt;

&lt;h3&gt;
  
  
  Dispatching document messages
&lt;/h3&gt;

&lt;p&gt;Our dispatcher's fairly straightforward - the only wrinkle is we'll need to surface any potential exception out. Instead of just crashing in case something goes awry, we'll want to just surface the exception and let the caller decide how to handle failures:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public interface IDocumentMessageDispatcher  
{
    Task&amp;lt;Exception&amp;gt; Dispatch(DocumentBase document);
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;If I'm dispatching a document message to three handlers, I don't want one handler prevent dispatching to others.&lt;/p&gt;

&lt;p&gt;We have another challenge - our interface is not generic for dispatching, but the handlers and repositories are! We'll have to do some generics tricks to unwrap our base type to the correct generic types. The basic flow will be:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;For each document message:&lt;/li&gt;
&lt;li&gt;Find document message handlers&lt;/li&gt;
&lt;li&gt;Call the handler&lt;/li&gt;
&lt;li&gt;Remove the document message from the outbox&lt;/li&gt;
&lt;li&gt;Save the document&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Here's our basic implementation:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public async Task&amp;lt;Exception&amp;gt; Dispatch(DocumentBase document)  
{
    var repository = GetRepository(document.GetType());
    foreach (var documentMessage in document.Outbox.ToArray())
    {
        try
        {
            var handler = GetHandler(documentMessage);

            await handler.Handle(documentMessage, _serviceFactory);

            document.ProcessDocumentMessage(documentMessage);

            await repository.Update(document);
        }
        catch (Exception ex)
        {
            return ex;
        }
    }
    return null;
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;We first build a repository based on the document type. Next, we loop through each document message in the outbox. For each document message, we'll find the handler(s) and call them. Once those succeed, we'll process our document message (removing it from the outbox) and update our document. We want to update for each document message in the outbox - if there's 3 document messages in the outbox, we save 3 times to make sure once message completes we don't have to go back to it if something goes wrong.&lt;/p&gt;

&lt;p&gt;The &lt;code&gt;GetHandler&lt;/code&gt; method is a bit wonky, because we're bridging generics. Basically, we create a non-generic version of the document message handlers:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;private abstract class DomainEventDispatcherHandler  
{
    public abstract Task Handle(
        IDocumentMessage documentMessage, 
        ServiceFactory factory);
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Then create a generic version that inherits from this:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;private class DomainEventDispatcherHandler&amp;lt;T&amp;gt; : DomainEventDispatcherHandler  
    where T : IDocumentMessage
{
    public override Task Handle(IDocumentMessage documentMessage, ServiceFactory factory)
    {
        return HandleCore((T)documentMessage, factory);
    }

    private static async Task HandleCore(T domainEvent, ServiceFactory factory)
    {
        var handlers = factory.GetInstances&amp;lt;IDocumentMessageHandler&amp;lt;T&amp;gt;&amp;gt;();
        foreach (var handler in handlers)
        {
            await handler.Handle(domainEvent);
        }
    }
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;I've used this pattern countless times, basically to satisfy the compiler. I've tried &lt;code&gt;dynamic&lt;/code&gt; too but it introduces other problems. Then to call this, our &lt;code&gt;GetHandler&lt;/code&gt; instantiates the generic version, but returns the non-generic base class:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;private static DomainEventDispatcherHandler GetHandler(  
    IDocumentMessage documentMessage)
{
    var genericDispatcherType = typeof(DomainEventDispatcherHandler&amp;lt;&amp;gt;)
        .MakeGenericType(documentMessage.GetType());

    return (DomainEventDispatcherHandler)
        Activator.CreateInstance(genericDispatcherType);
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;With this, I can have non-generic code still call into generics. I'll do something similar with the repository:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;private abstract class DocumentDbRepo  
{
    public abstract Task&amp;lt;DocumentBase&amp;gt; FindById(Guid id);
    public abstract Task Update(DocumentBase document);
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;With these bridges in place, my dispatcher can interact with the concrete generic repositories and handlers. The final piece is the document cleaning up its outbox:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public void ProcessDocumentMessage(  
    IDocumentMessage documentMessage)
{
    _outbox?.Remove(documentMessage);
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;With our dispatcher done, and our unit of work in place, we can now focus on the piece that will &lt;em&gt;invoke&lt;/em&gt; our unit of work.&lt;/p&gt;

&lt;h3&gt;
  
  
  Building a MediatR behavior
&lt;/h3&gt;

&lt;p&gt;We want our unit of work to complete with each request once everything is "done". For ASP.NET Core applications, this might mean some kind of filter. For us, I want the dispatching to work really with any context, so one possibility is to use a &lt;a href="https://github.com/jbogard/MediatR/wiki/Behaviors"&gt;MediatR behavior&lt;/a&gt; to wrap our MediatR handler. A filter would work too of course, but we'd need to mimic our filters in tests if we want everything to still get dispatched appropriately.&lt;/p&gt;

&lt;p&gt;The behavior is pretty straightforward:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public class UnitOfWorkBehavior&amp;lt;TRequest, TResponse&amp;gt;  
    : IPipelineBehavior&amp;lt;TRequest, TResponse&amp;gt;
{
    private readonly IUnitOfWork _unitOfWork;

    public UnitOfWorkBehavior(IUnitOfWork unitOfWork)
    {
        _unitOfWork = unitOfWork;
    }

    public async Task&amp;lt;TResponse&amp;gt; Handle(
        TRequest request, 
        CancellationToken token, 
        RequestHandlerDelegate&amp;lt;TResponse&amp;gt; next)
    {
        var response = await next();

        await _unitOfWork.Complete();

        return response;
    }
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;We do the main work, then once that's finished, complete our unit of work.&lt;/p&gt;

&lt;p&gt;That's all of our infrastructure pieces, and the last part is registering these components with the DI container at startup:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;services.AddMediatR(typeof(Startup));

services.AddScoped(typeof(IDocumentDBRepository&amp;lt;&amp;gt;), typeof(DocumentDBRepository&amp;lt;&amp;gt;));  
services.AddScoped&amp;lt;IUnitOfWork, UnitOfWork&amp;gt;();  
services.AddScoped&amp;lt;IDocumentMessageDispatcher, DocumentMessageDispatcher&amp;gt;();  
services.AddScoped(typeof(IPipelineBehavior&amp;lt;,&amp;gt;), typeof(UnitOfWorkBehavior&amp;lt;,&amp;gt;));  
services.Scan(c =&amp;gt;  
{
    c.FromAssembliesOf(typeof(Startup))
        .AddClasses(t =&amp;gt; t.AssignableTo(typeof(IDocumentMessageHandler&amp;lt;&amp;gt;)))
        .AsImplementedInterfaces()
        .WithTransientLifetime();
});
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;We add our MediatR handlers using the &lt;a href="https://www.nuget.org/packages/MediatR.Extensions.Microsoft.DependencyInjection"&gt;MediatR.Extensions.Microsoft.DependencyInjection&lt;/a&gt; package, our generic repository, unit of work, dispatcher, and unit of work behavior. Finally, we add all of the &lt;code&gt;IDocumentMessageHandler&lt;/code&gt; implementations using &lt;a href="https://github.com/khellang/Scrutor"&gt;Scrutor&lt;/a&gt;, making our lives much easier to add all the handlers in one go.&lt;/p&gt;

&lt;p&gt;With all this in place, we can run and verify that our handlers fire and we can see the message in the inbox of the Stock item:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
    "QuantityAvailable": 99,
    "ProductId": 771,
    "id": "cfbb6333-ed9f-49e7-8640-bb920d5c9106",
    "Outbox": {
        "$type": "System.Collections.Generic.HashSet`1[[AdventureWorksCosmos.UI.Infrastructure.IDocumentMessage, AdventureWorksCosmos.UI]], System.Core",
        "$values": []
    },
    "Inbox": {
        "$type": "System.Collections.Generic.HashSet`1[[AdventureWorksCosmos.UI.Infrastructure.IDocumentMessage, AdventureWorksCosmos.UI]], System.Core",
        "$values": [
            {
                "$type": "AdventureWorksCosmos.UI.Models.Orders.ItemPurchased, AdventureWorksCosmos.UI",
                "ProductId": 771,
                "Quantity": 1,
                "Id": "2ab2108c-9698-49e8-93de-a3ced453836a"
            }
        ]
    },
    "_rid": "WQk4AKSQMwACAAAAAAAAAA==",
    "_self": "dbs/WQk4AA==/colls/WQk4AKSQMwA=/docs/WQk4AKSQMwACAAAAAAAAAA==/",
    "_etag": "\"060077c2-0000-0000-0000-5b71d8a10000\"",
    "_attachments": "attachments/",
    "_ts": 1534187681
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;We now have effective document messaging between our documents!&lt;/p&gt;

&lt;p&gt;Well, almost.&lt;/p&gt;

&lt;p&gt;In the next post, we'll walk through what to do when things go wrong: failures and retries.&lt;/p&gt;


</description>
      <category>microservices</category>
      <category>distributedsystems</category>
      <category>domaindrivendesign</category>
      <category>mediatr</category>
    </item>
    <item>
      <title>Life Beyond Distributed Transactions: An Apostate's Implementation - Document Example</title>
      <dc:creator>Jimmy Bogard</dc:creator>
      <pubDate>Thu, 09 Aug 2018 15:59:41 +0000</pubDate>
      <link>https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---document-example-1c9</link>
      <guid>https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---document-example-1c9</guid>
      <description>

&lt;p&gt;Posts in this series:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---a-primer-2po6-temp-slug-8261948"&gt;A Primer&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---document-coordination-leb-temp-slug-1009285"&gt;Document Coordination&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-document-example/"&gt;Document Example&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-dispatching-example/"&gt;Dispatching Example&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-failures-and-retries/"&gt;Failures and Retries&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-dispatcher-failure-recovery/"&gt;Failure Recovery&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-sagas/"&gt;Sagas&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-relational-resources/"&gt;Relational Resources&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://github.com/jbogard/adventureworkscosmos"&gt;Sample code from this series&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In the last post, I walked through the "happy path" scenario of coordinated communication/activities between multiple resources that otherwise can't participate in a transaction. In this post, I'll walk through a code example of building out document coordination in &lt;a href="https://azure.microsoft.com/en-us/services/cosmos-db/"&gt;Azure Cosmos DB&lt;/a&gt;. My starting point is this set of code for approving an invoice and updating stock:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[HttpPost]
public async Task&amp;lt;IActionResult&amp;gt; Approve(Guid id)  
{
    var orderRequest = await _orderRepository.GetItemAsync(id);

    orderRequest.Approve();

    await _orderRepository.UpdateItemAsync(orderRequest);

    foreach (var lineItem in orderRequest.Items)
    {
        var stock = (await _stockRepository
            .GetItemsAsync(s =&amp;gt; s.ProductId == lineItem.ProductId))
            .FirstOrDefault();

        stock.QuantityAvailable -= lineItem.Quantity;

        await _stockRepository.UpdateItemAsync(stock);
    }

    return RedirectToPage("/Orders/Show", new { id });
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;The repositories in my example are straight from the example code when you download a sample application in the Azure Portal, and just wrap the underlying &lt;a href="https://docs.microsoft.com/en-us/dotnet/api/microsoft.azure.documents.client.documentclient?view=azure-dotnet"&gt;DocumentClient&lt;/a&gt;.&lt;/p&gt;

&lt;h3&gt;
  
  
  Modeling our document
&lt;/h3&gt;

&lt;p&gt;First, we need to baseline our document messages. These objects can be POCOs, but we still need some base information. Since we want to enforce idempotent actions, we need to be able to distinguish between different messages. The easiest way to do so is with a unique identifier per message:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public interface IDocumentMessage  
{
    Guid Id { get; }
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Since our documents need to store and process messages in an inbox/outbox, we need to build out our base Document class to include these items. We can also build a completely separate object for our inbox/outbox, but for simplicity sake, we'll just use a base class:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public abstract class DocumentBase  
{
    [JsonProperty(PropertyName = "id")]
    public Guid Id { get; set; }

    private HashSet&amp;lt;IDocumentMessage&amp;gt; _outbox 
        = new HashSet&amp;lt;IDocumentMessage&amp;gt;(DocumentMessageEqualityComparer.Instance);
    private HashSet&amp;lt;IDocumentMessage&amp;gt; _inbox
        = new HashSet&amp;lt;IDocumentMessage&amp;gt;(DocumentMessageEqualityComparer.Instance);

    public IEnumerable&amp;lt;IDocumentMessage&amp;gt; Outbox
    {
        get =&amp;gt; _outbox;
        protected set =&amp;gt; _outbox = value == null
            ? new HashSet&amp;lt;IDocumentMessage&amp;gt;(DocumentMessageEqualityComparer.Instance)
            : new HashSet&amp;lt;IDocumentMessage&amp;gt;(value, DocumentMessageEqualityComparer.Instance);
    }

    public IEnumerable&amp;lt;IDocumentMessage&amp;gt; Inbox
    {
        get =&amp;gt; _inbox;
        protected set =&amp;gt; _inbox = value == null
            ? new HashSet&amp;lt;IDocumentMessage&amp;gt;(DocumentMessageEqualityComparer.Instance)
            : new HashSet&amp;lt;IDocumentMessage&amp;gt;(value, DocumentMessageEqualityComparer.Instance);
    }
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Each of our mailboxes are a &lt;a href="https://docs.microsoft.com/en-us/dotnet/api/system.collections.generic.hashset-1?view=netframework-4.7.2"&gt;HashSet&lt;/a&gt;, to ensure we enforce uniqueness of document messages inside our document. We wrap our mailboxes in a couple of convenience properties for storage purposes (since our documents are serialized using JSON.NET, we have to model appropriately for its serialization needs).&lt;/p&gt;

&lt;p&gt;We're using a custom equality comparer for document messages based on that interface and ID we added earlier:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public class DocumentMessageEqualityComparer  
    : IEqualityComparer&amp;lt;IDocumentMessage&amp;gt;
{
    public static readonly DocumentMessageEqualityComparer Instance
        = new DocumentMessageEqualityComparer();

    public bool Equals(IDocumentMessage x, IDocumentMessage y)
    {
        return x.Id == y.Id;
    }

    public int GetHashCode(IDocumentMessage obj)
    {
        return obj.Id.GetHashCode();
    }
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;With this, we can make sure that our document messages only exist in our inbox/outboxes once (assuming we can pick unique GUIDs).&lt;/p&gt;

&lt;p&gt;Next, we need to be able to send a message in our &lt;code&gt;DocumentBase&lt;/code&gt; class:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;protected void Send(IDocumentMessage documentMessage)  
{
    if (_outbox == null)
        _outbox = new HashSet&amp;lt;IDocumentMessage&amp;gt;(DocumentMessageEqualityComparer.Instance);

    _outbox.Add(documentMessage);
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;We have to check that the outbox exists and create it if it's not (due to serialization, it might not exist), then simply add the document message to the outbox.&lt;/p&gt;

&lt;p&gt;To process a document message, we need to make sure this action is idempotent. To check for idempotency, we'll examine our &lt;code&gt;Inbox&lt;/code&gt; before executing the action. We can wrap this all up in a single method that our derived documents will use:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;protected void Process&amp;lt;TDocumentMessage&amp;gt;(  
    TDocumentMessage documentMessage, 
    Action&amp;lt;TDocumentMessage&amp;gt; action)
    where TDocumentMessage : IDocumentMessage
{
    if (_inbox == null)
        _inbox = new HashSet&amp;lt;IDocumentMessage&amp;gt;(DocumentMessageEqualityComparer.Instance);

    if (_inbox.Contains(documentMessage))
        return;

    action(documentMessage);

    _inbox.Add(documentMessage);
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Our derived documents will need to call this method to process their messages with the idempotency check. Once a message is processed successfully, we'll add it to the inbox. And since our transaction boundary is the document, if something fails, the action never happened and the message never gets stored to the inbox. Only by keeping our inbox, outbox, and business data inside a transaction boundary can we guarantee all either succeeds or fails.&lt;/p&gt;

&lt;h3&gt;
  
  
  Refactoring our action
&lt;/h3&gt;

&lt;p&gt;Now that we have our basic mechanism of storing and processing messages, we can refactor our original action. It was split basically into two actions - one of approving the invoice, and another of updating stock.&lt;/p&gt;

&lt;p&gt;We need to "send" a message from our Order to Stock. But what should that message look like? A few options:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Command, "update stock"&lt;/li&gt;
&lt;li&gt;Event, "order approved"&lt;/li&gt;
&lt;li&gt;Event, "item purchased"&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;If I go with a command, I'm coupling the primary action with the intended side effect. But what if this side effect needs to change? Be removed? I don't want burden the main Order logic with that.&lt;/p&gt;

&lt;p&gt;What about the first event, "order approved"? I could go with this - but looking at the work done and communication, Stock doesn't care that an order was approved, it only really cares if an item was purchased. Approvals are really the internal business rules of an order, but the ultimate side effect is that items finally become "purchased" at this point in time. So if I used "order approved", I'd be coupling Stock to the internal business rules of Order. Even though it's an event, "order approved" concerns internal business processes that other documents/services shouldn't care about.&lt;/p&gt;

&lt;p&gt;Finally, we have "item purchased". This most closely matches what Stock cares about, and removes any kind of process coupling between these two aggregates. If I went with the macro event, "order approved", I'd still have to translate that to what it means for Stock.&lt;/p&gt;

&lt;p&gt;With this in mind, I'll create a document message representing this event:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public class ItemPurchased : IDocumentMessage  
{
    public int ProductId { get; set; }
    public int Quantity { get; set; }
    public Guid Id { get; set; }
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;I know how much of which product was purchased, and that's enough for Stock to deal with the consequences of that event.&lt;/p&gt;

&lt;p&gt;My &lt;code&gt;Order&lt;/code&gt; class then models its &lt;code&gt;Approve&lt;/code&gt; method to include sending these new messages:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public void Approve()  
{
    Status = Status.Approved;
    foreach (var lineItem in Items)
    {
        Send(new ItemPurchased
        {
            ProductId = lineItem.ProductId,
            Quantity = lineItem.Quantity,
            Id = Guid.NewGuid()
        });
    }
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;I don't have an idempotency check here (if the order is already approved, do nothing), but you get the idea.&lt;/p&gt;

&lt;p&gt;On the Stock side, I need to add a method to process the &lt;code&gt;ItemPurchased&lt;/code&gt; message:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public void Handle(ItemPurchased message)  
{
    Process(message, e =&amp;gt;
    {
        QuantityAvailable -= e.Quantity;
    });
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Finally, we need some way of linking our &lt;code&gt;ItemPurchased&lt;/code&gt; message with the &lt;code&gt;Stock&lt;/code&gt;, and that's the intent of an &lt;code&gt;IDocumentMessageHandler&lt;/code&gt;:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public interface IDocumentMessageHandler&amp;lt;in T&amp;gt;  
    where T : IDocumentMessage
{
    Task Handle(T message);
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;The part of our action that loaded up each &lt;code&gt;Stock&lt;/code&gt; is the code we'll put into our handler:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;public class UpdateStockFromItemPurchasedHandler  
    : IDocumentMessageHandler&amp;lt;ItemPurchased&amp;gt;
{
    private readonly IDocumentDBRepository&amp;lt;Stock&amp;gt; _repository;

    public UpdateStockFromItemPurchasedHandler(
        IDocumentDBRepository&amp;lt;Stock&amp;gt; repository) 
        =&amp;gt; _repository = repository;

    public async Task Handle(ItemPurchased message)
    {
        var stock = (await _repository
            .GetItemsAsync(s =&amp;gt; s.ProductId == message.ProductId))
            .Single();

        stock.Handle(message);

        await _repository.UpdateItemAsync(stock);
    }
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Not that exciting, as our document will handle the real business logic of handling the request. This class just connects the dots between an &lt;code&gt;IDocumentMessageHandler&lt;/code&gt; and some &lt;code&gt;DocumentBase&lt;/code&gt; instance.&lt;/p&gt;

&lt;p&gt;With these basic building blocks, we'll modify our action to only update the &lt;code&gt;Order&lt;/code&gt; instance:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;[HttpPost]
public async Task&amp;lt;IActionResult&amp;gt; Approve(Guid id)  
{
    var orderRequest = await _orderRepository.GetItemAsync(id);

    orderRequest.Approve();

    await _orderRepository.UpdateItemAsync(orderRequest);

    return RedirectToPage("/Orders/Show", new { id });
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Now when we approve our order, we only create messages in the outbox, which get persisted along with the order. If I look at the saved order in Cosmos DB, I can verify the items are persisted:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
    "Items": [
        {
            "Quantity": 1,
            "ListPrice": 3399.99,
            "ProductId": 771,
            "ProductName": "Mountain-100 Silver, 38",
            "Subtotal": 3399.99
        }
    ],
    "Status": 2,
    "Total": 3399.99,
    "Customer": {
        "FirstName": "Jane",
        "LastName": "Doe",
        "MiddleName": "Mary"
    },
    "id": "8bf4bda2-3796-431e-9936-8511243352d2",
    "Outbox": {
        "$type": "System.Collections.Generic.HashSet`1[[AdventureWorksCosmos.UI.Infrastructure.IDocumentMessage, AdventureWorksCosmos.UI]], System.Core",
        "$values": [
            {
                "$type": "AdventureWorksCosmos.UI.Models.Orders.ItemPurchased, AdventureWorksCosmos.UI",
                "ProductId": 771,
                "Quantity": 1,
                "Id": "987ce801-e7cf-4abf-aba7-83d7eed00610"
            }
        ]
    },
    "Inbox": {
        "$type": "System.Collections.Generic.HashSet`1[[AdventureWorksCosmos.UI.Infrastructure.IDocumentMessage, AdventureWorksCosmos.UI]], System.Core",
        "$values": []
    },
    "_rid": "lJFnANVMlwADAAAAAAAAAA==",
    "_self": "dbs/lJFnAA==/colls/lJFnANVMlwA=/docs/lJFnANVMlwADAAAAAAAAAA==/",
    "_etag": "\"02002652-0000-0000-0000-5b48f2140000\"",
    "_attachments": "attachments/",
    "_ts": 1531507220
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;In order to get that polymorphic behavior for my &lt;code&gt;IDocumentMessage&lt;/code&gt; collections, I needed to configure the JSON serializer settings in my repository:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;_client = new DocumentClient(new Uri(Endpoint), Key,  
    new JsonSerializerSettings
    {
        TypeNameHandling = TypeNameHandling.Auto
    });
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;With these pieces in place, I've removed the process coupling between updating an order's status and updating stock items using document messaging. Of course, we don't actually have anything &lt;em&gt;dispatching&lt;/em&gt; our messages. We'll cover the infrastructure for dispatching messages in the next post.&lt;/p&gt;


</description>
      <category>microservices</category>
      <category>distributedsystems</category>
      <category>domaindrivendesign</category>
    </item>
    <item>
      <title>Life Beyond Distributed Transactions: An Apostate's Implementation - Document Coordination</title>
      <dc:creator>Jimmy Bogard</dc:creator>
      <pubDate>Tue, 07 Aug 2018 18:24:19 +0000</pubDate>
      <link>https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---document-coordination-4l8</link>
      <guid>https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---document-coordination-4l8</guid>
      <description>

&lt;p&gt;Posts in this series:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---a-primer-2po6-temp-slug-8261948"&gt;A Primer&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-aggregate-coordination/"&gt;Document Coordination&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-document-example/"&gt;Document Example&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-dispatching-example/"&gt;Dispatching Example&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-failures-and-retries/"&gt;Failures and Retries&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-dispatcher-failure-recovery/"&gt;Failure Recovery&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-sagas/"&gt;Sagas&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-relational-resources/"&gt;Relational Resources&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://github.com/jbogard/adventureworkscosmos"&gt;Sample code from this series&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Quick note - I've updated this post to use the more accurate term "Document" than the less-accurate, DDD-specific term "Aggregate". In the images, it still has the older term "Aggregate", let's pretend I fixed all those images.&lt;/p&gt;

&lt;p&gt;In the last post, I walked through the general problem of distributed transactions, and some potential ideas around coordinating activities between similar or disparate resources. In a lot of cases, I would prefer to simply wrap our actions in a transaction, but as the resources I'm trying to transact together move further apart (process-wise or network-wise), transactions between these multiple resources becomes impossible.&lt;/p&gt;

&lt;p&gt;If I can only perform reliable transactions with a single resource at a time, whether that resource is a database or single record in Cosmos DB, I need to design my interactions so that both the &lt;em&gt;business data&lt;/em&gt; and &lt;em&gt;communications&lt;/em&gt; transact together. That means that my communication needs to be in the same transactional store as my business data!&lt;/p&gt;

&lt;p&gt;In the case of Azure Cosmos DB, I need to place my communication that I both send and receive inside my document. I store &lt;em&gt;incoming&lt;/em&gt; communication because any request to change data needs to be idempotent. I could just have some business rule "all actions must be idempotent", I can also achieve this by simply storing the incoming requests and checking to see if I've already processed this request before performing the action.&lt;/p&gt;

&lt;p&gt;For outgoing communication, I can't interact with any other resource transactionally, so I store outgoing communication on what I &lt;em&gt;can&lt;/em&gt; control - the document. So our final document looks like:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--77Q-qz1M--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0034.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--77Q-qz1M--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0034.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;When I perform an operation, communication and business data either save successfully or the entire operation rolls back.&lt;/p&gt;

&lt;h3&gt;
  
  
  Dispatching Changes
&lt;/h3&gt;

&lt;p&gt;When I find I have an operation that needs to affect more than one resource, I then use my outbox to communicate with those external resources:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--UA0iy2HJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0035.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--UA0iy2HJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0035.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;I can then safely transact that single resource:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--yrY4aKCx--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0036.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--yrY4aKCx--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0036.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Now that I've completed the first action, I can now begin my interaction with other resources. I use a dispatcher to do so - something that's responsible for reading from a resource's outbox:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--nc4ltB28--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0037.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--nc4ltB28--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0037.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;And pass them to the correct resource:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--yN7qYoxQ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0039.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--yN7qYoxQ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0039.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Each resource receives the incoming message, and performs 2 actions:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Affect the business data&lt;/li&gt;
&lt;li&gt;Store the incoming message in the inbox&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This entire operation is transactional:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--DQVV5ipT--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0040.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--DQVV5ipT--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0040.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;With that resource successful, the dispatcher moves on to the next resource. However, this operation can fail:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--wq9KknDz--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0041.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--wq9KknDz--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0041.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;When this happens, we move the message to an external retry queue:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--ucFCgJVY--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0042.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--ucFCgJVY--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0042.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;At this point the dispatcher is complete - but because we still have a failed resource, we can't remove our event from the original document's outbox.&lt;/p&gt;

&lt;h3&gt;
  
  
  Retries
&lt;/h3&gt;

&lt;p&gt;In our half-completed state, we need to re-process the outbox for our document. So our dispatcher wakes up from the Retry message and reads the messages in the original document's outbox:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--KAkTk5Bf--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0043.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--KAkTk5Bf--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0043.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The dispatcher dispatches our message again to each of the receiving resources, processing each one in turn. Our first resource has already processed this message, which it knows because it's checked its Inbox before changing the data (making it idempotent):&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--YmDDFqEJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0044.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--YmDDFqEJ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0044.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;With that complete, the dispatcher can move on to the (previously) failing document:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--nZ3HbRqY--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0045.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--nZ3HbRqY--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0045.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;And that document can now succeed. Of course, there might be a bug or something actively preventing us from succeeding - so in that case we'll need to design explicit failure strategies. For now, let's assume that our operations will &lt;em&gt;eventually&lt;/em&gt; succeed. Once we've confirmed that we've successfully dispatched our message to all receivers, we can go back to our original document and remove the message:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--SssRHMx8--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0046.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--SssRHMx8--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0046.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;This is then saved in its own individual transaction:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--rMknyBBM--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0047.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--rMknyBBM--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/Picture0047.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In each step of the way, we have to work in smaller transactional steps that can be individually retried at least once, since each operation is idempotent. By storing our communication in the same transactional boundary of our business data, we can introduce reliable resource coordination when each of our resources isn't able to participate in that transaction.&lt;/p&gt;

&lt;p&gt;In the next post, I'll walk through some code examples of this pattern in Cosmos DB.&lt;/p&gt;


</description>
      <category>distributedsystems</category>
      <category>microservices</category>
      <category>domaindrivendesign</category>
    </item>
    <item>
      <title>Life Beyond Distributed Transactions: An Apostate's Implementation - A Primer</title>
      <dc:creator>Jimmy Bogard</dc:creator>
      <pubDate>Wed, 01 Aug 2018 13:56:53 +0000</pubDate>
      <link>https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---a-primer-37nn</link>
      <guid>https://dev.to/jbogard/life-beyond-distributed-transactions-an-apostates-implementation---a-primer-37nn</guid>
      <description>

&lt;p&gt;Posts in this series:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-transactions-implementation-primer/"&gt;A Primer&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-aggregate-coordination/"&gt;Document Coordination&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-document-example/"&gt;Document Example&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-dispatching-example/"&gt;Dispatching Example&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-failures-and-retries/"&gt;Failures and Retries&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-dispatcher-failure-recovery/"&gt;Failure Recovery&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-sagas/"&gt;Sagas&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://jimmybogard.com/life-beyond-distributed-transactions-an-apostates-implementation-relational-resources/"&gt;Relational Resources&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;&lt;a href="https://github.com/jbogard/adventureworkscosmos"&gt;Sample code from this series&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;For those working with SQL databases, working with transactions is more or less a given. The most we may need to worry about is:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Using the appropriate isolation level&lt;/li&gt;
&lt;li&gt;Not doing too much in a single transaction to prevent excessive locks&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;The vast majority of applications I see can blissfully ignore the inner workings of transactions in database. We take for granted that our operations on one or more rows are either committed or rolled back.&lt;/p&gt;

&lt;p&gt;Things get more complicated when we start dealing with transactions no longer confined to a single resource, or in the case of many NoSQL databases, multiple entities. In many NoSQL databases, transactions are limited to a single entity/record. And if multi-entity transaction &lt;em&gt;are&lt;/em&gt; supported, there are a number of limitations that might make that choice undesirable.&lt;/p&gt;

&lt;p&gt;When we have two non-transactional resources, we know we have a number of overall patterns to try to coordinate these actions (covered in my &lt;a href="https://jimmybogard.com/refactoring-towards-resilience-a-primer/"&gt;Refactoring Towards Resilience series&lt;/a&gt;:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--jeCxjglZ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/0/2017/Picture2.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--jeCxjglZ--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/0/2017/Picture2.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;All of these options assume "I must have these two actions temporally coupled" and have them happen at the same time.&lt;/p&gt;

&lt;p&gt;But what if that wasn't the case? What if we moved away from trying to coordinate two actions, and had more loose coupling between our resources? What might that look like?&lt;/p&gt;

&lt;p&gt;And that's the main scope of Pat Helland's paper, &lt;a href="https://queue.acm.org/detail.cfm?id=3025012"&gt;Life Beyond Transactions: An Apostate's Opinion&lt;/a&gt;. In this paper, Pat describes a mechanism to overcome the fundamental issue of coordinating actions between resources when our transactions only cover a single entity - messaging!&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--57LuYBjo--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/helland7.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--57LuYBjo--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/helland7.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;We use some sort of messaging where our messages are saved &lt;em&gt;inside the entities&lt;/em&gt; to direct to other entities:&lt;/p&gt;

&lt;p&gt;&lt;a href="https://res.cloudinary.com/practicaldev/image/fetch/s--x-VNYYkn--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/helland5.png" class="article-body-image-wrapper"&gt;&lt;img src="https://res.cloudinary.com/practicaldev/image/fetch/s--x-VNYYkn--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://jimmybogardsblog.blob.core.windows.net/jimmybogardsblog/7/2018/helland5.png" alt=""&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Since the scope of a transaction is a single entity, if we need to affect other entities, we can't do that directly. Instead, we store the &lt;em&gt;intent&lt;/em&gt; to affect change as a message inside our entity. The transaction covers our business data, &lt;em&gt;and&lt;/em&gt; communication to the outside world.&lt;/p&gt;

&lt;p&gt;This certainly isn't the only way to tackle this problem, as I could use the &lt;a href="http://www.cs.cornell.edu/andru/cs711/2002fa/reading/sagas.pdf"&gt;Saga pattern&lt;/a&gt; as a means to manage failures between multi-entity activities. &lt;a href="https://caitiem.com/"&gt;Caitie McCaffrey&lt;/a&gt; has a &lt;a href="https://www.youtube.com/watch?v=0UTOLRTwOX0"&gt;great talk about this&lt;/a&gt;, but for my situation, I couldn't directly use the Saga pattern, since that implied there was some sort of logical "undo". Instead, I wanted to be able to any of the many coordination patterns available to me, &lt;em&gt;including&lt;/em&gt; the Saga pattern.&lt;/p&gt;

&lt;p&gt;Before we get into implementation details, let's look at some real code in a real database that doesn't completely allow multi-entity transactions.&lt;/p&gt;

&lt;h3&gt;
  
  
  Real World Example
&lt;/h3&gt;

&lt;p&gt;Let's suppose I have an ecommerce application, where I can view products, add items to a cart, check out, then finally, approve orders. As part of approving an order, I need to decrement stock. We'll skip any complex business rules, like negative stock, reservations, and the like. Just to keep things simple, when we order, we just subtract the quantity order from our stock reserve:&lt;/p&gt;



&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;var orderRequest = await _orderRepository.GetItemAsync(id);

orderRequest.Approve();

await _orderRepository.UpdateItemAsync(orderRequest);

foreach (var lineItem in orderRequest.Items)  
{
    var stock = (await _stockRepository
        .GetItemsAsync(s =&amp;gt; s.ProductId == lineItem.ProductId))
        .Single();

    stock.QuantityAvailable -= lineItem.Quantity;

    await _stockRepository.UpdateItemAsync(stock);
}
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;In my case, I'm using Azure Cosmos DB, which supports a variety of &lt;a href="https://docs.microsoft.com/en-us/azure/cosmos-db/consistency-levels"&gt;consistency levels&lt;/a&gt;. Azure Cosmos DB also supports multi-document transactions, but only in the form of &lt;a href="https://docs.microsoft.com/en-us/azure/cosmos-db/programming#database-program-transactions"&gt;stored procedures and functions&lt;/a&gt;, and even then, only with some limitations. Once you introduce other resources into the mix, like Azure Service Bus, Azure SQL Database, or really anything outside a single partition key, we can no longer use transactions.&lt;/p&gt;

&lt;p&gt;With this in mind, we look back at our original code, and ask ourselves, "Do this action need to couple these resources, or can we decouple them?" There are certainly cases where we need to coordinate two actions (using a coordinator as we saw before), but there are many cases we don't, and don't want to incur the cost of a larger-scoped transaction.&lt;/p&gt;

&lt;p&gt;For our above case, do we need to deduct stock right at the time we approve an order? Or can it happen later? According to the business, deducting stock doesn't need to happen immediately, but it does need to happen, eventually.&lt;/p&gt;

&lt;p&gt;In the next few posts, I'll walk through building out a mechanism for communication with other entities (and even other resources), to see how we might build out an atomic communication system with Azure Cosmos DB as our example.&lt;/p&gt;


</description>
      <category>distributedsystems</category>
      <category>microservices</category>
      <category>domaindrivendesign</category>
    </item>
  </channel>
</rss>
