<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Aleksei Popov</title>
    <description>The latest articles on DEV Community by Aleksei Popov (@fairday).</description>
    <link>https://dev.to/fairday</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F1385943%2Ff8a2728c-9d9b-453a-93be-094e8fbd9624.jpg</url>
      <title>DEV Community: Aleksei Popov</title>
      <link>https://dev.to/fairday</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/fairday"/>
    <language>en</language>
    <item>
      <title>Implementing Horizontally Scalable Transactional Outbox with .NET 8 and Kafka: A Practical Guide</title>
      <dc:creator>Aleksei Popov</dc:creator>
      <pubDate>Sun, 07 Apr 2024 21:21:53 +0000</pubDate>
      <link>https://dev.to/fairday/implementing-horizontally-scalable-transactional-outbox-pattern-with-net-8-and-kafka-a-practical-guide-1l1</link>
      <guid>https://dev.to/fairday/implementing-horizontally-scalable-transactional-outbox-pattern-with-net-8-and-kafka-a-practical-guide-1l1</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;In the world of distributed systems, reliable and performant messaging is key to the scalability of such systems.&lt;/p&gt;

&lt;p&gt;One of the most intricate challenges is the dual write problem.&lt;/p&gt;

&lt;p&gt;In this article, I'm going to provide a practical guide for implementing a transactional outbox solution to achieve 'at least once' guarantees, which can be horizontally scaled across multiple instances of the application.&lt;/p&gt;

&lt;p&gt;Before we dive into the implementation, let's quickly review what the dual write problem is and the challenges it poses.&lt;/p&gt;

&lt;p&gt;Additionally, if you're interested in learning more about achieving reliable messaging in a distributed system, welcome to my article about it on hackernoon &lt;a href="https://hackernoon.com/reliable-messaging-in-distributed-systems" rel="noopener noreferrer"&gt;https://hackernoon.com/reliable-messaging-in-distributed-systems&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Dual write problem
&lt;/h2&gt;

&lt;p&gt;To better explain the dual write problem, let's take a look at the following diagrams.&lt;/p&gt;

&lt;p&gt;Imagine we perform an operation by receiving an API request, storing data in the database, and sending events to Kafka to notify other systems about certain events.&lt;/p&gt;

&lt;p&gt;If we first commit the transaction and then send events to Kafka, we could find ourselves in a situation where messages can be lost due to potential network issues, Kafka cluster outages, or even a bug in the code.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F1nmrndn8cw2gxcg0usgh.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F1nmrndn8cw2gxcg0usgh.png" alt="Image description" width="800" height="411"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In the opposite scenario, where we first send messages and then commit the transaction, we can find ourselves in a situation of data inconsistency. This can occur if, for some reason, the database transaction fails and we have already notified other systems about events that did not actually happen. This may lead to system failures and serious consequences.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fo040x7sy3kbdoyd6a4py.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fo040x7sy3kbdoyd6a4py.png" alt="Image description" width="800" height="399"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;So, in critical business scenarios, losing messages could harm the business. For instance, this is particularly true in transactions and payment processing, which can be very sensetive.&lt;/p&gt;

&lt;h2&gt;
  
  
  Transactional Outbox
&lt;/h2&gt;

&lt;p&gt;To tackle this problem, the transactional outbox approach can be leveraged. It requires conducting business operations and storing events in an atomic way. To achieve this, a special outbox table can be created to persist messages that occurred during the transaction. Then, by using a special Relay component, data can be transmitted to Kafka. This approach helps to achieve 'at least once' guarantees without losing messages. Why 'at least once'? Because receiving responses from Kafka or updating the status of sent messages could fail, which would lead to retries.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9x95zrz32zuv80xm5c49.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9x95zrz32zuv80xm5c49.png" alt="Image description" width="800" height="437"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Let us quickly take a look at Kafka concepts before proceeding to implementation.&lt;/p&gt;

&lt;h2&gt;
  
  
  Kafka
&lt;/h2&gt;

&lt;p&gt;Kafka is a distributed streaming platform that was originally developed by LinkedIn and later open-sourced as a part of the Apache Software Foundation. It's designed to handle high volumes of data in real-time, making it a powerful tool for building data pipelines, integrating systems, and managing streaming data.&lt;/p&gt;

&lt;p&gt;Kafka operates on the concept of topics, which are essentially categories or feeds to which records are published. Producers publish data to topics, and consumers subscribe to topics to read the data. This architecture allows for high throughput and scalability, enabling Kafka to process and store streams of data from multiple sources efficiently.&lt;/p&gt;

&lt;p&gt;There are the key conceptions you need to know.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4vraj8o3u58xyco34y2h.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F4vraj8o3u58xyco34y2h.png" alt="Image description" width="800" height="417"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Topics&lt;/strong&gt;&lt;br&gt;
A topic is like a category or a box where you store messages. You can have different topics for different types of messages. For example, one topic for customer orders and another for shipping notifications.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Partitions&lt;/strong&gt;&lt;br&gt;
A partition is a subdivision of a topic, like dividing a big box (topic) into smaller sections (partitions) to organise messages better. This helps in processing messages more efficiently by allowing multiple messages to be read and written in parallel.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Consumers&lt;/strong&gt;&lt;br&gt;
A consumer is like someone who takes messages from the box (topic). Consumers read messages from topics to process them. For example, a service that reads shipping notifications and then updates a tracking system.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Producers&lt;/strong&gt;&lt;br&gt;
A producer is like someone who puts messages into the box (topic). Producers send messages to topics. For example, an online store system that sends a message every time an order is placed.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Consumer Groups&lt;/strong&gt;&lt;br&gt;
A consumer group is a team working together to read messages from a topic. The group ensures that each consumer handles messages from a different partition so they don't read the same message twice. It's like dividing the work of checking a big box's sections among several people, where each person is responsible for one section.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9z3yoffd1muy8afzrdoq.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9z3yoffd1muy8afzrdoq.png" alt="Image description" width="710" height="501"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Broker&lt;/strong&gt;&lt;br&gt;
A broker in Kafka is like a server or a librarian in a library. It manages the storage and handling of messages within topics. Just as a librarian organises books and keeps track of who borrows what, a broker stores messages and manages their distribution to consumers. If Kafka is a library, then each broker is a bookshelf in that library.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Cluster&lt;/strong&gt;&lt;br&gt;
A cluster in Kafka is a group of brokers working together. Imagine a library with multiple bookshelves; the library is the cluster, and each bookshelf is a broker. This setup allows Kafka to handle more messages and serve more consumers by spreading the load across multiple servers (brokers). A cluster ensures that if one server (broker) has an issue, the others can take over, keeping the system running smoothly.&lt;/p&gt;
&lt;h2&gt;
  
  
  Implementation
&lt;/h2&gt;

&lt;p&gt;You can find the whole implementation &lt;a href="https://github.com/alex-popov-stenn/SimpleTO" rel="noopener noreferrer"&gt;by this link&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;First, as mentioned above, we need to create an Outbox table to persist messages.&lt;/p&gt;
&lt;h3&gt;
  
  
  Define Outbox table
&lt;/h3&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;Outbox&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
&lt;span class="n"&gt;Id&lt;/span&gt;                      &lt;span class="n"&gt;uniqueidentifier&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt; &lt;span class="k"&gt;default&lt;/span&gt; &lt;span class="n"&gt;newsequentialid&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="n"&gt;DateTimestamp&lt;/span&gt;           &lt;span class="n"&gt;datetimeoffset&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;7&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt; &lt;span class="k"&gt;default&lt;/span&gt; &lt;span class="n"&gt;SYSDATETIMEOFFSET&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
&lt;span class="n"&gt;RawData&lt;/span&gt;                 &lt;span class="n"&gt;NVARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;MAX&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="n"&gt;MessageType&lt;/span&gt;             &lt;span class="n"&gt;NVARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;255&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="n"&gt;Topic&lt;/span&gt;                   &lt;span class="n"&gt;NVARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;255&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="n"&gt;PartitionBy&lt;/span&gt;             &lt;span class="n"&gt;NVARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;255&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="n"&gt;IsProcessed&lt;/span&gt;             &lt;span class="nb"&gt;INT&lt;/span&gt; &lt;span class="k"&gt;DEFAULT&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="n"&gt;IsSequential&lt;/span&gt;            &lt;span class="nb"&gt;INT&lt;/span&gt; &lt;span class="k"&gt;DEFAULT&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="n"&gt;Metadata&lt;/span&gt;                &lt;span class="n"&gt;NVARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;MAX&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="n"&gt;ReservedAt&lt;/span&gt;              &lt;span class="n"&gt;datetimeoffset&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;7&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="n"&gt;ExpiredAt&lt;/span&gt;               &lt;span class="n"&gt;datetimeoffset&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;7&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
&lt;span class="n"&gt;IsProcessing&lt;/span&gt;            &lt;span class="nb"&gt;INT&lt;/span&gt; &lt;span class="k"&gt;DEFAULT&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;&lt;code&gt;RawData&lt;/code&gt;: JSON message representation.&lt;br&gt;
&lt;code&gt;MessageType&lt;/code&gt;: Name of the message type.&lt;br&gt;
&lt;code&gt;Topic&lt;/code&gt;: Kafka topic name.&lt;br&gt;
&lt;code&gt;PartitionBy&lt;/code&gt;: Partition key for the Kafka topic.&lt;br&gt;
&lt;code&gt;IsSequential&lt;/code&gt;: flag indicating that only a single processing relay node can pick up these messages.&lt;br&gt;
&lt;code&gt;Metadata&lt;/code&gt;: Custom metadata, such as operation ID, trace ID, etc.&lt;/p&gt;
&lt;h3&gt;
  
  
  Create &lt;code&gt;IOutbox&lt;/code&gt; and &lt;code&gt;IRelay&lt;/code&gt; abstractions
&lt;/h3&gt;

&lt;p&gt;&lt;code&gt;IOutbox&lt;/code&gt; performs key operations such as storing messages, reserving them for processing, marking them as processed, and deleting the processed ones&lt;/p&gt;

&lt;p&gt;In the horizontal scalability section, I'm going to delve more deeply into the reserving API.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;interface&lt;/span&gt; &lt;span class="nc"&gt;IOutbox&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="n"&gt;AddAsync&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Func&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;?&lt;/span&gt; &lt;span class="n"&gt;partitionBy&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;bool&lt;/span&gt; &lt;span class="n"&gt;isSequential&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Dictionary&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;?&lt;/span&gt; &lt;span class="n"&gt;metadata&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="n"&gt;T&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt;&lt;span class="err"&gt;;&lt;/span&gt;
    &lt;span class="nc"&gt;Task&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;ImmutableArray&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;OutboxRecord&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;ReserveAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;top&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;TimeSpan&lt;/span&gt; &lt;span class="n"&gt;reservationTimeout&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;MarkAsProcessedAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ImmutableArray&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;OutboxRecord&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;DeleteProcessedAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And &lt;code&gt;IRelay&lt;/code&gt; has two simple operations, &lt;code&gt;Publish&lt;/code&gt; and &lt;code&gt;Cleanup&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;interface&lt;/span&gt; &lt;span class="nc"&gt;IRelay&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;      
    &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;PublishAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;CleanupAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Okay, now we have an abstraction for persisting and publishing new messages.&lt;/p&gt;

&lt;h3&gt;
  
  
  Serialization
&lt;/h3&gt;

&lt;p&gt;It is better to encapsulate serialization details within an abstraction that will be used in both the Outbox and Kafka components.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;interface&lt;/span&gt; &lt;span class="nc"&gt;ISerializer&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;Serialize&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="n"&gt;T&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt;&lt;span class="err"&gt;;&lt;/span&gt;
    &lt;span class="nc"&gt;T&lt;/span&gt; &lt;span class="n"&gt;Deserialize&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="n"&gt;T&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt;&lt;span class="err"&gt;;&lt;/span&gt;
&lt;span class="err"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Transaction control
&lt;/h3&gt;

&lt;p&gt;To address concurrency, I'm going to use optimistic concurrency control, which is provided with the snapshot isolation (technically it provides repeatable read safety) level of MS SQL Server.&lt;/p&gt;

&lt;p&gt;I enabled it in the initial migration using the following lines.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="n"&gt;migrationBuilder&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="k"&gt;Sql&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;"ALTER DATABASE CURRENT SET ALLOW_SNAPSHOT_ISOLATION ON;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;true&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="n"&gt;migrationBuilder&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="k"&gt;Sql&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nv"&gt;"ALTER DATABASE CURRENT SET READ_COMMITTED_SNAPSHOT ON;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;true&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Since we need to store new data and messages in an atomic way, we should open a transaction at the beginning of the operation and roll it back in case of any failure.&lt;/p&gt;

&lt;p&gt;Let's define the &lt;code&gt;IUnitOfWork&lt;/code&gt; interface for this purpose.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;interface&lt;/span&gt; &lt;span class="nc"&gt;IUnitOfWork&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;IQueryable&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;Query&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;()&lt;/span&gt; &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="n"&gt;T&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt;&lt;span class="err"&gt;;&lt;/span&gt;
    &lt;span class="nc"&gt;Task&lt;/span&gt; &lt;span class="n"&gt;AddAsync&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt; &lt;span class="n"&gt;entity&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="n"&gt;T&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt;&lt;span class="err"&gt;;&lt;/span&gt;
    &lt;span class="nc"&gt;Task&lt;/span&gt; &lt;span class="n"&gt;AddRangeAsync&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;IEnumerable&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;entities&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="n"&gt;T&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt;&lt;span class="err"&gt;;&lt;/span&gt;
    &lt;span class="nc"&gt;void&lt;/span&gt; &lt;span class="n"&gt;Remove&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt; &lt;span class="n"&gt;entity&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="n"&gt;T&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt;&lt;span class="err"&gt;;&lt;/span&gt;
    &lt;span class="nc"&gt;void&lt;/span&gt; &lt;span class="n"&gt;RemoveRange&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;IEnumerable&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;entities&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="n"&gt;T&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt;&lt;span class="err"&gt;;&lt;/span&gt;
    &lt;span class="nc"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;CommitAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;Task&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;IDbContextTransaction&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;BeginSnapshotTransactionAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;Task&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;IDbContextTransaction&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;BeginTransactionAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;Task&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;IDbContextTransaction&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;BeginTransactionAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;IsolationLevel&lt;/span&gt; &lt;span class="n"&gt;isolationLevel&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To minimise the overhead introduced by ORMs, I'm going to utilise the lightweight ORM Dapper for core Outbox queries.&lt;/p&gt;

&lt;p&gt;For instance, this is the &lt;code&gt;AddAsync&lt;/code&gt; implementation, which primarily retrieves a transaction, serializes a new message, and stores new messages by using the same transaction.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="n"&gt;AddAsync&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Func&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;T&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;?&lt;/span&gt; &lt;span class="n"&gt;partitionBy&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;bool&lt;/span&gt; &lt;span class="n"&gt;isSequential&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Dictionary&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;?&lt;/span&gt; &lt;span class="n"&gt;metadata&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; 
    &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="n"&gt;T&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt;
&lt;span class="err"&gt;{&lt;/span&gt;
    &lt;span class="nc"&gt;var&lt;/span&gt; &lt;span class="n"&gt;transaction&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;GetTransaction&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
    &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;connection&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;transaction&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Connection&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;query&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;SqlQueriesReader&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;ReadWithCache&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;InsertInOutboxQueryName&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;_serializer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;Serialize&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

    &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;commandDefinition&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;CommandDefinition&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;RawData&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;MessageType&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;GetMessageTypeName&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="n"&gt;Topic&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;PartitionBy&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;partitionBy&lt;/span&gt;&lt;span class="p"&gt;?.&lt;/span&gt;&lt;span class="nf"&gt;Invoke&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;??&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;IsSequential&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;isSequential&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;Metadata&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;metadata&lt;/span&gt; &lt;span class="p"&gt;!=&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt; &lt;span class="p"&gt;?&lt;/span&gt; &lt;span class="n"&gt;_serializer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;Serialize&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;metadata&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;
    &lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;transaction&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;transaction&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

    &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;connection&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;ExecuteAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;commandDefinition&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Also, for instance, in the PublishAsync method, a transaction is opened before making any changes.&lt;/p&gt;

&lt;p&gt;Here, messages are first sent to Kafka, and only then are these messages marked as processed, which guarantees at least once delivery.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;PublishAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;var&lt;/span&gt; &lt;span class="n"&gt;transaction&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_unitOfWork&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;BeginTransactionAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;IsolationLevel&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ReadCommitted&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

    &lt;span class="k"&gt;try&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_outbox&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;ReserveAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;BatchSize&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ReservationTimeout&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;builder&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;ImmutableArray&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CreateBuilder&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;MessageEnvelope&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;();&lt;/span&gt;

        &lt;span class="k"&gt;foreach&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="k"&gt;record&lt;/span&gt; &lt;span class="nc"&gt;in&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="n"&gt;MessageEnvelope&lt;/span&gt;
            &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="n"&gt;PayloadType&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;record&lt;/span&gt;&lt;span class="err"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;MessageType&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;Payload&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;record&lt;/span&gt;&lt;span class="err"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;JsonRawData&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;Topic&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;record&lt;/span&gt;&lt;span class="err"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;Topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;Key&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;record&lt;/span&gt;&lt;span class="err"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;PartitionBy&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;Created&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;record&lt;/span&gt;&lt;span class="err"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;Timestamp&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                &lt;span class="n"&gt;Metadata&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;record&lt;/span&gt;&lt;span class="err"&gt;.&lt;/span&gt;&lt;span class="nc"&gt;Metadata&lt;/span&gt;
            &lt;span class="p"&gt;};&lt;/span&gt;
            &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;Add&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;

        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_kafkaMessageSender&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;SendAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;ToImmutable&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_outbox&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;MarkAsProcessedAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;transaction&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;CommitAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;catch&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;transaction&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;RollbackAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;throw&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Kafka Producer
&lt;/h3&gt;

&lt;p&gt;The Kafka production implementation is simple. To transfer data to consumers, the following message structure is used.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;sealed&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;MessageEnvelope&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;Topic&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="k"&gt;get&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;init&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;default&lt;/span&gt;&lt;span class="p"&gt;!;&lt;/span&gt;
    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;?&lt;/span&gt; &lt;span class="n"&gt;Key&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="k"&gt;get&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;init&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;Payload&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="k"&gt;get&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;init&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;default&lt;/span&gt;&lt;span class="p"&gt;!;&lt;/span&gt;
    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;PayloadType&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="k"&gt;get&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;init&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;default&lt;/span&gt;&lt;span class="p"&gt;!;&lt;/span&gt;
    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="n"&gt;Dictionary&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;?&lt;/span&gt; &lt;span class="n"&gt;Metadata&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="k"&gt;get&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;init&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="n"&gt;DateTimeOffset&lt;/span&gt; &lt;span class="n"&gt;Created&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="k"&gt;get&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;init&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And it sends asynchronously using the synchronous version of Produce, which yields better performance results in high-load data transferring.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;SendAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ImmutableArray&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;MessageEnvelope&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;messages&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;foreach&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="k"&gt;in&lt;/span&gt; &lt;span class="n"&gt;messages&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;try&lt;/span&gt;
            &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;kafkaPayload&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;_serializer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;Serialize&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

                &lt;span class="n"&gt;_producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;Produce&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
                    &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="n"&gt;Message&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="n"&gt;Key&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Key&lt;/span&gt; &lt;span class="p"&gt;??&lt;/span&gt; &lt;span class="n"&gt;_defaultKey&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Value&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;kafkaPayload&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Headers&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="nf"&gt;PrepareHeaders&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Metadata&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Created&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;PayloadType&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="n"&gt;report&lt;/span&gt; &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt;
                    &lt;span class="p"&gt;{&lt;/span&gt;
                        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;report&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Status&lt;/span&gt; &lt;span class="p"&gt;!=&lt;/span&gt; &lt;span class="n"&gt;PersistenceStatus&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Persisted&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                            &lt;span class="c1"&gt;//throw new Exception($"Failed to send message to Kafka, Id: {message.Id}, Topic: {topic}");&lt;/span&gt;
                            &lt;span class="n"&gt;_logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;LogError&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Failed kafka message producing with Key {Key}, Error: {error}"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;report&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;report&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Code&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
                    &lt;span class="p"&gt;});&lt;/span&gt;

                &lt;span class="n"&gt;_logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;LogInformation&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Message sent to Kafka, Id: {Id}, Topic: {Topic}"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Topic&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
            &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ProduceException&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;ex&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;Exception&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;$"Failed to send message to Kafka, Id: &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Key&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s"&gt;, Topic: &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Topic&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="s"&gt;"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ex&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;

        &lt;span class="n"&gt;_producer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;Flush&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CompletedTask&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Kafka Consumer
&lt;/h3&gt;

&lt;p&gt;We need to subscribe to a specific topic on the consumer side first.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;var&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="n"&gt;ConsumerBuilder&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;_consumerConfig&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;Build&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;Subscribe&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;_topic&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;It also improves performance by consuming messages in batch with the following hack.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;static&lt;/span&gt; &lt;span class="n"&gt;IReadOnlyCollection&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;ConsumeResult&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;TKey&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;TValue&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;ConsumeBatch&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;TKey&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;TValue&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;(&lt;/span&gt;
    &lt;span class="k"&gt;this&lt;/span&gt; &lt;span class="n"&gt;IConsumer&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;TKey&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;TValue&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;TimeSpan&lt;/span&gt; &lt;span class="n"&gt;consumeTimeout&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;maxBatchSize&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;stoppingToken&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;Consume&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;consumeTimeout&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;?.&lt;/span&gt;&lt;span class="n"&gt;Message&lt;/span&gt; &lt;span class="k"&gt;is&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;Array&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Empty&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;ConsumeResult&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;TKey&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;TValue&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&amp;gt;();&lt;/span&gt;

    &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;messageBatch&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="n"&gt;List&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;ConsumeResult&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;TKey&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;TValue&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="p"&gt;};&lt;/span&gt;

    &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;messageBatch&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Count&lt;/span&gt; &lt;span class="p"&gt;&amp;lt;&lt;/span&gt; &lt;span class="n"&gt;maxBatchSize&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;Consume&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;TimeSpan&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Zero&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;?.&lt;/span&gt;&lt;span class="n"&gt;Message&lt;/span&gt; &lt;span class="k"&gt;is&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;break&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

        &lt;span class="n"&gt;messageBatch&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;Add&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;messageBatch&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The main body of the consuming logic is related to handling payloads and moving the offset. It is very necessary to commit the offset only when messages have been handled, to achieve an 'at least once' guarantee.&lt;/p&gt;

&lt;p&gt;Consumer logic must be idempotent to handle duplicate messages.&lt;/p&gt;

&lt;p&gt;I was also writing about why network failure is unavoidable and why consumers must be idempotent here. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://dev.to/fairday/techniques-for-building-predictable-and-reliable-api-part-1-45bf"&gt;https://dev.to/fairday/techniques-for-building-predictable-and-reliable-api-part-1-45bf&lt;/a&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="p"&gt;(!&lt;/span&gt;&lt;span class="n"&gt;stoppingToken&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;IsCancellationRequested&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;try&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;payloads&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;ConsumeBatch&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;TimeSpan&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;FromMinutes&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="m"&gt;1&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;_maxConsumeBatchSize&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;stoppingToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;payloads&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Count&lt;/span&gt; &lt;span class="p"&gt;==&lt;/span&gt; &lt;span class="m"&gt;0&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;continue&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

        &lt;span class="k"&gt;foreach&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt; &lt;span class="k"&gt;in&lt;/span&gt; &lt;span class="n"&gt;payloads&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;serializer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Deserialize&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;MessageEnvelope&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Value&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

            &lt;span class="k"&gt;foreach&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;handler&lt;/span&gt; &lt;span class="k"&gt;in&lt;/span&gt; &lt;span class="n"&gt;messageHandlers&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;handler&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;HandleAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;stoppingToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;

        &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;Commit&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;OperationCanceledException&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;break&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ConsumeException&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;IsFatal&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="c1"&gt;// https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#fatal-consumer-errors&lt;/span&gt;
            &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;LogCritical&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Fatal error consuming message"&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
            &lt;span class="k"&gt;break&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;else&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;LogError&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Error consuming message"&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;catch&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;Exception&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;LogError&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"Error consuming message"&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And do whatever we need in the message handler.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;internal&lt;/span&gt; &lt;span class="k"&gt;sealed&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;RootMessageHandler&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;IMessageHandler&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="n"&gt;ILogger&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;RootMessageHandler&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;_logger&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="n"&gt;ISerializer&lt;/span&gt; &lt;span class="n"&gt;_serializer&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;RootMessageHandler&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ILogger&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;RootMessageHandler&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ISerializer&lt;/span&gt; &lt;span class="n"&gt;serializer&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;_logger&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;logger&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="n"&gt;_serializer&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;serializer&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;HandleAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;MessageEnvelope&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;type&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Type&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;GetType&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;PayloadType&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;type&lt;/span&gt; &lt;span class="p"&gt;==&lt;/span&gt; &lt;span class="k"&gt;typeof&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;InvoiceCreatedEvent&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;invoiceCreatedEvent&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;_serializer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Deserialize&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;InvoiceCreatedEvent&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
            &lt;span class="n"&gt;_logger&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;LogInformation&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Received invoice created event with ID {InvoiceId}"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;invoiceCreatedEvent&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;InvoiceId&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;

        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CompletedTask&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Horizontal Scalability
&lt;/h2&gt;

&lt;p&gt;In order to achieve higher throughput and handle more requests, there is a standard way to scale services horizontally or vertically by increasing CPU and memory resources. Vertical scaling reaches hardware limits faster, where horizontal scalability involves cloning servers to handle the increased load.&lt;/p&gt;

&lt;p&gt;To make the Outbox architecture suitable for horizontal scalability, several nodes need to read records exclusively from the &lt;code&gt;Outbox&lt;/code&gt; table.&lt;/p&gt;

&lt;h3&gt;
  
  
  What is Select for update?
&lt;/h3&gt;

&lt;p&gt;"SELECT FOR UPDATE" is a SQL command used within a transaction to lock selected rows against concurrent access by other transactions. Essentially, it's a way to say, "I'm working with this data, and I don't want anyone else to change it until I'm done." This command ensures that any selected data remains consistent and unchanged from the moment it's read to the moment a transaction involving it is completed.&lt;/p&gt;

&lt;p&gt;The following statement helps to achieve Select for Update behaviour in MS SQL Server by combining &lt;code&gt;UPDLOCK&lt;/code&gt;, &lt;code&gt;ROWLOCK&lt;/code&gt; and &lt;code&gt;READPAST&lt;/code&gt; table hints.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="n"&gt;IF&lt;/span&gt; &lt;span class="k"&gt;EXISTS&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;Outbox&lt;/span&gt; &lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;IsProcessed&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="n"&gt;IsSequential&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;IsProcessing&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt; &lt;span class="k"&gt;OR&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;IsProcessing&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="n"&gt;SYSDATETIMEOFFSET&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="n"&gt;ExpiredAt&lt;/span&gt;&lt;span class="p"&gt;)))&lt;/span&gt;
&lt;span class="k"&gt;BEGIN&lt;/span&gt;
&lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="n"&gt;forReservation&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="n"&gt;TOP&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;MaxLimit&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;Outbox&lt;/span&gt;
    &lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;UPDLOCK&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ROWLOCK&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;READPAST&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;WHERE&lt;/span&gt; &lt;span class="n"&gt;IsProcessed&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="n"&gt;IsSequential&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;IsProcessing&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;0&lt;/span&gt; &lt;span class="k"&gt;OR&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;IsProcessing&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt; &lt;span class="k"&gt;AND&lt;/span&gt; &lt;span class="n"&gt;SYSDATETIMEOFFSET&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="n"&gt;ExpiredAt&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="k"&gt;UPDATE&lt;/span&gt; &lt;span class="n"&gt;forReservation&lt;/span&gt;
&lt;span class="k"&gt;SET&lt;/span&gt; &lt;span class="n"&gt;IsProcessing&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ReservedAt&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;SYSDATETIMEOFFSET&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;ExpiredAt&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;DATEADD&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;SECOND&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;@&lt;/span&gt;&lt;span class="n"&gt;ReservationSeconds&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;SYSDATETIMEOFFSET&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
&lt;span class="k"&gt;OUTPUT&lt;/span&gt; &lt;span class="n"&gt;INSERTED&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;
&lt;span class="k"&gt;END&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;As you can see, it reserves records exclusively by setting &lt;code&gt;ExpiredAt&lt;/code&gt; date.&lt;/p&gt;

&lt;h3&gt;
  
  
  How to deal with cases when preserving ordering is essential?
&lt;/h3&gt;

&lt;p&gt;Although 'SELECT FOR UPDATE' offers many advantages in terms of scalability, it has a major drawback in situations where ordering must be preserved. This is because many workers will process data in parallel, and it's very likely that the chronological order will be affected.&lt;/p&gt;

&lt;p&gt;In such scenarios, distributed locks might be helpful for transferring data in a single-threaded manner by a specific attribute. For instance, 'IsSequential' is a flag indicating that this message must be processed by a specific node, which pulls only records with such a flag.&lt;/p&gt;

&lt;h2&gt;
  
  
  Example
&lt;/h2&gt;

&lt;p&gt;Let us look at a straightforward example where we need to publish an &lt;code&gt;InvoiceCreatedEvent&lt;/code&gt; event after creating a new invoice.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;ApiController&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nf"&gt;Route&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"[controller]"&lt;/span&gt;&lt;span class="p"&gt;)]&lt;/span&gt;
&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;InvoicesController&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;ControllerBase&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="n"&gt;IInvoiceService&lt;/span&gt; &lt;span class="n"&gt;_invoiceService&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="n"&gt;IUnitOfWork&lt;/span&gt; &lt;span class="n"&gt;_unitOfWork&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;InvoicesController&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;IInvoiceService&lt;/span&gt; &lt;span class="n"&gt;invoiceService&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;IUnitOfWork&lt;/span&gt; &lt;span class="n"&gt;unitOfWork&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;_invoiceService&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;invoiceService&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="n"&gt;_unitOfWork&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;unitOfWork&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;HttpPost&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;InvoicePayload&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;CreateInvoice&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="n"&gt;FromBody&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="n"&gt;InvoiceModel&lt;/span&gt; &lt;span class="n"&gt;model&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;var&lt;/span&gt; &lt;span class="n"&gt;transaction&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_unitOfWork&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;BeginSnapshotTransactionAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

        &lt;span class="k"&gt;try&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;invoiceId&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_invoiceService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;CreateInvoice&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;model&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
            &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;transaction&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;CommitAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="n"&gt;InvoicePayload&lt;/span&gt;
            &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="n"&gt;Id&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;invoiceId&lt;/span&gt;
            &lt;span class="p"&gt;};&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;catch&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;transaction&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;RollbackAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
            &lt;span class="k"&gt;throw&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;In this service, a new message is persisted in &lt;code&gt;Outbox&lt;/code&gt; together with creating a new invoice.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;internal&lt;/span&gt; &lt;span class="k"&gt;sealed&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;InvoiceService&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;IInvoiceService&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="n"&gt;IInvoiceRepository&lt;/span&gt; &lt;span class="n"&gt;_invoiceRepository&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="n"&gt;IOutbox&lt;/span&gt; &lt;span class="n"&gt;_outbox&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;InvoiceService&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;IInvoiceRepository&lt;/span&gt; &lt;span class="n"&gt;invoiceRepository&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;IOutbox&lt;/span&gt; &lt;span class="n"&gt;outbox&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;_invoiceRepository&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;invoiceRepository&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="n"&gt;_outbox&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;outbox&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Guid&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;CreateInvoice&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;InvoiceModel&lt;/span&gt; &lt;span class="n"&gt;model&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;invoice&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Invoice&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;Create&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;model&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Amount&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;model&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DueDate&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_invoiceRepository&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;AddEntityAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;invoice&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_invoiceRepository&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;SaveChangesAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;operationId&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Guid&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;NewGuid&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;

        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_outbox&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;AddAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="n"&gt;InvoiceCreatedEvent&lt;/span&gt;
            &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="n"&gt;InvoiceId&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;invoice&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Id&lt;/span&gt;
            &lt;span class="p"&gt;},&lt;/span&gt; 
            &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;InvoiceCreatedEvent&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;partitionBy&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;InvoiceId&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;ToString&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
            &lt;span class="n"&gt;isSequential&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;false&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;metadata&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="n"&gt;Dictionary&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt;
            &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="s"&gt;"OperationId"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;operationId&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;ToString&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
            &lt;span class="p"&gt;},&lt;/span&gt; 
            &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;invoice&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Id&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If we open the database, we will see a new message added to Outbox after committing a transaction.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgfwf9ylfye33wnysht8u.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fgfwf9ylfye33wnysht8u.png" alt="Image description" width="800" height="256"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Here we see the same consumed message sent by &lt;code&gt;Relay&lt;/code&gt; to Kafka&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fpsxx5hpd7mzwz7k7wtey.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fpsxx5hpd7mzwz7k7wtey.png" alt="Image description" width="800" height="278"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Also, I added debug logs to the visibility of messaging transferring&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F331o1w8q8hlsia19v5j5.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F331o1w8q8hlsia19v5j5.png" alt="Image description" width="800" height="88"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  What is missed?
&lt;/h2&gt;

&lt;p&gt;Although this solution is almost ready for production, there are many enhancements that can be added, such as:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Splitting between internal and public data contracts&lt;/li&gt;
&lt;li&gt;Integrating open telemetry with Jaeger and Prometheus&lt;/li&gt;
&lt;li&gt;Schema Registry integration to guard against breaking changes in data contracts&lt;/li&gt;
&lt;li&gt;Dead Letter Queue&lt;/li&gt;
&lt;li&gt;Batch data transferring&lt;/li&gt;
&lt;li&gt;Housekeeping to store all messages for future investigation and auditing purposes&lt;/li&gt;
&lt;li&gt;Admin Dashboard for managing the outbox state&lt;/li&gt;
&lt;li&gt;Performance tests&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;And many other potential improvements.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;In this article, I demonstrated how to implement scalable, production-ready, reliable messaging with a transactional outbox approach and Kafka.&lt;/p&gt;

&lt;p&gt;See you next time!&lt;/p&gt;

</description>
    </item>
    <item>
      <title>Techniques for building predictable and reliable API: Part 2</title>
      <dc:creator>Aleksei Popov</dc:creator>
      <pubDate>Tue, 26 Mar 2024 16:22:22 +0000</pubDate>
      <link>https://dev.to/fairday/techniques-for-building-predictable-and-reliable-api-part-2-57hi</link>
      <guid>https://dev.to/fairday/techniques-for-building-predictable-and-reliable-api-part-2-57hi</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://dev.to/fairday/techniques-for-building-predictable-and-reliable-api-part-1-45bf"&gt;In the first part,&lt;/a&gt; I talked about how to apply the idempotency technique to achieve exactly once guarantee for API invocation.&lt;/p&gt;

&lt;p&gt;In this one, I plan on talking about asynchronous processing and API techniques to build scalable and reliable.&lt;/p&gt;

&lt;h2&gt;
  
  
  Asynchronous processing
&lt;/h2&gt;

&lt;p&gt;There are operations that are better processed asynchronously, meaning that instead of doing the job right now, we push it into the task queue and then notify the originators about the result once it is ready.&lt;/p&gt;

&lt;p&gt;It could be useful if interaction is required with external systems that may be slow, operate asynchronously, having strict rate limits, or even have problems with availability.&lt;/p&gt;

&lt;h3&gt;
  
  
  Okay, what's the problem?
&lt;/h3&gt;

&lt;p&gt;Let us imagine that we have the following system where the main components are:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;(Originator) Revenue calculation service&lt;/strong&gt; - This service is part of an analytics product that periodically requests data for the purpose of calculating revenue. It might analyze sales transactions, payments, or other financial data to provide insights into revenue trends, performance against forecasts, and areas for financial improvement.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;(Originator) Risk assessment service&lt;/strong&gt; - This service evaluates the risk associated with various operations or transactions. It could be assessing credit risk, fraud risk, operational risks, or other types of financial or non-financial risks. Its purpose is to mitigate potential losses by identifying high-risk scenarios and suggesting preventive measures.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Transaction Categorisation service&lt;/strong&gt; - This service schedules transactions classifications into categories such as groceries, utilities, entertainment, revenue, etc.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Categorization Processor service&lt;/strong&gt; - This service is purely a backend service responsible for the actual processing and categorization logic of transactions.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Webhooks delivery service&lt;/strong&gt; - This service manages the sending of webhooks, which are automated messages sent from one app to another when something happens.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Bank transactions provider service&lt;/strong&gt; - This service acts as an interface to bank systems for retrieving transaction data. It may authenticate, fetch, and format bank transaction data for use by other components in the financial technology ecosystem.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Queue (Kafka / Azure Service Bus / RabbitMq / etc)&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;In the following diagram, you can see a blueprint of use cases. Both use cases consist of seven steps, starting with &lt;code&gt;10 Pull transactions for the last period (month or year)&lt;/code&gt; and ending with &lt;code&gt;retrieving processing results by ID.&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fuvbscpyl0wmt6i5ctin3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fuvbscpyl0wmt6i5ctin3.png" alt="Image description" width="800" height="741"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The Transaction Categorization service API requires the provision of an external ID. Under this external ID, all the transactions will be held and categorized. Since the architecture of the transaction categorization implies that it learns over time from data associated with a specific external ID, sending transactions for the same entity (for example, a B2C company) to different external IDs will lead to categorization issues and poor quality. Therefore, the main recommendation is to accumulate all transactions for the same entity together. Consequently, both the Revenue Calculation and Risk Assessment services provide a shared CustomerId throughout the entire system.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
 "event": "processed",
    "created": "2024-05-20T09:23:53+00:00",
    "data": {
        "externalId": "0cf9e867-9970-4a76-915",
        "status": "processed"
    },
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is the webhook payload example received by both originators (Revenue Calculation and Risk Assessment services). However, since they use the same external ID, how can we distinguish between the two originators? Relying on the date? Unlikely, as it is not a reliable approach.&lt;/p&gt;

&lt;h3&gt;
  
  
  Okay, what if we prohibited having multiple processings in progress for a specific external ID?
&lt;/h3&gt;

&lt;p&gt;This approach might slightly improve the situation if the originators somehow track that they have started processing. However, a problem arises if the first originator begins processing and finishes, but the webhook doesn't reach the originator for any reason. Meanwhile, another originator starts and successfully completes another processing, and the webhook is delivered to both. In this case, we will face an issue because the first originator might mistakenly pick up the second webhook.&lt;/p&gt;

&lt;h2&gt;
  
  
  Technique 1: Metadata
&lt;/h2&gt;

&lt;p&gt;In order to distinguish webhooks somehow, we may utilize attaching custom metadata to the send transaction to process the request.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F5j4vmh30x65uliggczfs.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F5j4vmh30x65uliggczfs.png" alt="Image description" width="800" height="741"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;As you can see here, in this example, the Revenue Calculation service attached specific metadata in order to identify its processing result and skip others.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
 "event": "processed",
    "created": "2024-05-20T09:23:53+00:00",
    "data": {
        "externalId": "0cf9e867-9970-4a76-915",
        "status": "processed"
    },
    "metadata": {
    "id": "revenue_calc_id"
   }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;However, since the categorisation processing service does not support searching by metadata due to potential complexity, we still face a situation with a race condition when webhooks are delayed, sent out of processing order, or when originators are pulling stale data.&lt;/p&gt;

&lt;p&gt;Okay, then we need something much simpler!&lt;/p&gt;

&lt;h2&gt;
  
  
  Technique 2: Two-Steps processing
&lt;/h2&gt;

&lt;p&gt;To completely avoid race conditions due to the distributed nature of the system and network failures, we can improve the architecture by introducing the following features.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Sending transactions to the Process API will return a task ID related to this processing.&lt;/li&gt;
&lt;li&gt;The Send Transactions to Process API will accept an idempotency key to avoid starting duplicate tasks. It won't affect consumers but will save resources and reduce operational complexity.&lt;/li&gt;
&lt;li&gt;Task IDs will be added to the webhook payload.&lt;/li&gt;
&lt;li&gt;The Retrieve Processing Results API will be improved to allow searching by task ID.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fq7yfd6itti0gqaz7iawy.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fq7yfd6itti0gqaz7iawy.png" alt="Image description" width="800" height="741"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;This is the payload example contains taskid value. Thus, originators can distinguish between webhooks and identify only those that are related to them.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
 "event": "processed",
    "created": "2024-05-20T09:23:53+00:00",
    "data": {
        "taskid": "task_Eqio3Y4ohyNiMphrXwG58p",
        "externalId": "0cf9e867-9970-4a76-915",
        "status": "processed"
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Asynchronous processing and timeouts
&lt;/h2&gt;

&lt;p&gt;The last thing I would like to talk about in this article is related to timeouts.&lt;/p&gt;

&lt;p&gt;In nature, there are two main types of communication between two services or components: Fire and forget and Request and Reply.&lt;/p&gt;

&lt;h3&gt;
  
  
  Fire and forget
&lt;/h3&gt;

&lt;p&gt;This is a communication pattern where a client sends a message (or request) to a server and immediately continues its execution without waiting for a response. The server might process the message at some point in the future, but the client does not know when or indeed if the message has been successfully received or processed. Technically, it even doesn't care wherther it was processed or not.&lt;/p&gt;

&lt;p&gt;Usually, this pattern refers to asynchronous nature and is used when the client does not require an immediate response or confirmation. It's often used for logging, sending notifications, or other tasks where receiving a response is not critical to the continuation of the client's processing.&lt;/p&gt;

&lt;h3&gt;
  
  
  Request and Reply
&lt;/h3&gt;

&lt;p&gt;This is a communication pattern where a client sends a request to a server and waits for a response. The server processes the request and then sends back a reply to the client.&lt;/p&gt;

&lt;p&gt;Usaully, this pattern refers to synchronous nature, meaning the client blocks and waits until the response is received before continuing its execution.&lt;/p&gt;

&lt;p&gt;However, the interesting thing is that even when I use asynchronous (non-blocking for current execution) communication for account charging or order delivery, I am still interested in a reply. If there is no reply after some time, what should the system do? Cancel? Retry? Send an alert to the on-call team?&lt;/p&gt;

&lt;p&gt;The main idea is that despite the communication approach you use (synchronous or asynchronous), you should be concerned about possible network failures and plan for them before they happen.&lt;/p&gt;

&lt;p&gt;The simplest approach would be to incorporate a timeout on the caller side, similar to what we have in a blocking request. To implement this, we need to understand where to store the expiration date. The answer lies in the domain objects. Timeout behavior is usually part of their lifecycle. For instance, if a document request for a loan application isn't completed within 14 days, we should close this document request and probably try again.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;In this article, I have covered several problems related to the nature of distributed systems and the complexity of asynchronous operations, unavoidable network failures, and possible techniques to mitigate them, thereby improving the quality attributes of the system and experience for both customers and internal users.&lt;/p&gt;

&lt;p&gt;See you next time!&lt;/p&gt;

</description>
      <category>microservices</category>
      <category>api</category>
      <category>csharp</category>
      <category>architecture</category>
    </item>
    <item>
      <title>Techniques for building predictable and reliable API</title>
      <dc:creator>Aleksei Popov</dc:creator>
      <pubDate>Tue, 26 Mar 2024 15:30:20 +0000</pubDate>
      <link>https://dev.to/fairday/techniques-for-building-predictable-and-reliable-api-part-1-45bf</link>
      <guid>https://dev.to/fairday/techniques-for-building-predictable-and-reliable-api-part-1-45bf</guid>
      <description>&lt;h2&gt;
  
  
  Introduction
&lt;/h2&gt;

&lt;p&gt;Application Programming Interfaces (APIs) facilitate communication and data sharing between various applications, empowering developers to construct integrated and sophisticated systems. Nonetheless, crafting APIs that are scalable, reliable and user-friendly presents significant challenges.&lt;/p&gt;

&lt;p&gt;There are many useful design patterns that can guide you in building scalable APIs to serve the needs of your consumers, such as:  &lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Versioning&lt;/li&gt;
&lt;li&gt;Caching&lt;/li&gt;
&lt;li&gt;Pagination&lt;/li&gt;
&lt;li&gt;Rate liming&lt;/li&gt;
&lt;li&gt;Authentication and authorisation&lt;/li&gt;
&lt;li&gt;Circuit Breaker&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;However, in this article, I am going to focus on a less popular topic related to the predictability and reliability of APIs by providing examples of such challenges and techniques to tackle them. Most of these techniques are very basic and obvious, yet they are periodically missing in implementations. From time to time, I will provide examples using C# and .NET, but all of these are language agnostic. &lt;/p&gt;

&lt;p&gt;Before jumping into the problems, let us talk a bit about networks and their reliability. &lt;/p&gt;

&lt;h3&gt;
  
  
  What is network?
&lt;/h3&gt;

&lt;blockquote&gt;
&lt;p&gt;A network, in the context of computing and telecommunications, is a collection of interconnected devices that can communicate and share data with each other. These devices can range from computers, mobile devices, and servers to IoT (Internet of Things) devices, routers, and switches. Networks can be small and localized, like a home Wi‑Fi network, or vast and global, like the internet.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;As we can see from this definition, there are so many things that can go wrong. This leads us to the conclusion that networks are unreliable and failure is unavoidable  &lt;/p&gt;

&lt;h3&gt;
  
  
  But why is this the case? Is it because of the well-known principle, Murphy's Law?
&lt;/h3&gt;

&lt;p&gt;Murphy's Law states that:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Whatever can go wrong, will go wrong. So a solution is better the less possibilities there are for something to go wrong.  &lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Partly, yes :) However, the reality is somewhat different. There are many obvious reasons why it might fail, such as:  &lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Physical Disruptions &lt;/li&gt;
&lt;li&gt;Interference and Capacity Issues
&lt;/li&gt;
&lt;li&gt;Cyber Attacks
&lt;/li&gt;
&lt;li&gt;Configuration and Human Errors&lt;/li&gt;
&lt;li&gt;Propagation Delays and Routing Issues&lt;/li&gt;
&lt;li&gt;Compatibility and Standardization Issues&lt;/li&gt;
&lt;li&gt;Maintenance and Upgrades&lt;/li&gt;
&lt;li&gt;Technical Limitation&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;And there are many other reasons that we, as a software engineers, are unlikely to be able to impact drastically.&lt;/p&gt;

&lt;h2&gt;
  
  
  Retry
&lt;/h2&gt;

&lt;p&gt;Okay, if networks are so unreliable, what can we do about that? &lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ftszgvyj2fual1xus2y96.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ftszgvyj2fual1xus2y96.png" alt="Image description" width="800" height="197"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The simplest solution is to retry. If it fails, it is very likely due to a temporary issue. For example, someone might have temporarily disconnected the cable from the server and then reconnected it.&lt;/p&gt;

&lt;p&gt;So, we decided to add retry policies with &lt;a href="https://en.wikipedia.org/wiki/Exponential_backoff"&gt;expotential backoff&lt;/a&gt; to our HttpClient using &lt;a href="https://github.com/App-vNext/Polly"&gt;Polly&lt;/a&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;Microsoft.Extensions.DependencyInjection&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;Polly&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;Polly.Contrib.WaitAndRetry&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;Polly.Extensions.Http&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;namespace&lt;/span&gt; &lt;span class="nn"&gt;MyService&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;static&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;ServiceCollectionExtensions&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;static&lt;/span&gt; &lt;span class="n"&gt;IServiceCollection&lt;/span&gt; &lt;span class="nf"&gt;AddMyService&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;this&lt;/span&gt; &lt;span class="n"&gt;IServiceCollection&lt;/span&gt; &lt;span class="n"&gt;services&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;services&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;AddHttpClient&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;IMyService&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;MyService&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;((&lt;/span&gt;&lt;span class="n"&gt;sp&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;client&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt;
                &lt;span class="p"&gt;{&lt;/span&gt;
                    &lt;span class="c1"&gt;// ... configure http client&lt;/span&gt;
                &lt;span class="p"&gt;})&lt;/span&gt;
                &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;AddPolicyHandler&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nf"&gt;GetRetryPolicy&lt;/span&gt;&lt;span class="p"&gt;());&lt;/span&gt;

            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;services&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;

        &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;static&lt;/span&gt; &lt;span class="n"&gt;IAsyncPolicy&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;HttpResponseMessage&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;GetRetryPolicy&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;delay&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;Backoff&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;DecorrelatedJitterBackoffV2&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;medianFirstRetryDelay&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;TimeSpan&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;FromSeconds&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="m"&gt;1&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;retryCount&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="m"&gt;5&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;HttpPolicyExtensions&lt;/span&gt;
                &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;HandleTransientHttpError&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
                &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;WaitAndRetryAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;delay&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;and are happy to be deploying this stuff to production. Now it works better because it helps to avoid temporary issues.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9ywim6vqxe0lj682gvd6.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F9ywim6vqxe0lj682gvd6.png" alt="Image description" width="800" height="197"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;However, let us imagine that the situation is a bit more tricky because we didn't receive a response from the server. This could be due to a network issue, a timeout, a proxy or firewall issue, HTTP version incompatibilities, or just a client-side bug. &lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fbhrqi0qodc8oou89tivh.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fbhrqi0qodc8oou89tivh.png" alt="Image description" width="800" height="197"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;It it wasn't processed by server and we didn't get response from the server, in this case retry will work help us to avoid this issue.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fk7rqpuapf12uy8q6q4xw.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fk7rqpuapf12uy8q6q4xw.png" alt="Image description" width="800" height="190"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;However, the situation becomes more interesting if it was processed successfully. So, the question is, what will a retry do in this case? Will it cause any harm?&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F6pmv0hjmwuzbpdsmg3od.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F6pmv0hjmwuzbpdsmg3od.png" alt="Image description" width="800" height="190"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The answer is simple: it depends on the implementation. In some cases, it will throw an error, in others, it can produce unexpected changes and side effects in the system.  &lt;/p&gt;

&lt;h3&gt;
  
  
  Real world use cases
&lt;/h3&gt;

&lt;p&gt;In theory, it looks dangerous, but do we exactly have systems that might be seriously affected by this? &lt;/p&gt;

&lt;p&gt;First of all, based on statistic from different sources and just personal feelings, service oriented and microservices architectures are adopted by vast majority of companies which automatically puts them in situation of having distributed system.&lt;/p&gt;

&lt;p&gt;Although microservices are gaining popularity due to their advantages, such as independent deployment, a heterogeneous development stack, simplified architecture control, and the ability to scale independently, they also bring a lot of challenges.&lt;/p&gt;

&lt;p&gt;All of these companies are engaged in the development and maintenance of complex and highly available systems. These systems are designed to provide a wide array of services, including but not limited to payment processing, delivery logistics, file management, and the operation of online storefronts, among others. Their goal is to ensure uninterrupted service and optimal performance across a variety of sectors, catering to the diverse needs of their customer base while maintaining security, efficiency, and scalability  &lt;/p&gt;

&lt;p&gt;Who would be happy if they received two taxis, were charged twice, or purchased more than the desired amount of things?  &lt;/p&gt;

&lt;p&gt;No one, so, we must have solutions in place to avoid such situations.&lt;/p&gt;

&lt;h2&gt;
  
  
  Idempotency
&lt;/h2&gt;

&lt;p&gt;Idempotency is a concept where doing the same thing multiple times has the same effect as doing it just once. In computing, an idempotent operation can be repeated many times without changing the result beyond the initial application.  &lt;/p&gt;

&lt;p&gt;It is believed that the HTTP methods GET, PUT, and DELETE are idempotent, whereas POST and PATCH are not. Formally, that's true, but what's to stop you from implementing DELETE in a non-idempotent way? It also depends on the approach used to define the API. If REST Level 0 is utilized, everything uses POST. Even an operation like RetrieveMyInvoices would be exposed as POST. In addition, if RetrieveMyInvoices returns a new set of invoices after a second call, does that mean it is not idempotent? &lt;/p&gt;

&lt;h3&gt;
  
  
  API and Exactly Once Semantics
&lt;/h3&gt;

&lt;p&gt;It refers to the concept and implementation within APIs ensuring that an operation or a message is processed exactly once. For example, if an order is submitted to the delivery system, it will be delivered only once. The same principle applies to charges or transactions.&lt;/p&gt;

&lt;h3&gt;
  
  
  Idempotency key
&lt;/h3&gt;

&lt;p&gt;To achieve exactly-once semantics, we can leverage a solution that attaches an idempotency key to the request, either in the headers or explicitly in the payload.  &lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fqvl4zr43pzhdjfyjmsvm.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fqvl4zr43pzhdjfyjmsvm.png" alt="Image description" width="800" height="451"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;After retrying the same request with the identical idempotency key, Service B will verify that a request with such a key has already been processed and will simply return the previous response. &lt;/p&gt;

&lt;h3&gt;
  
  
  How to choose idempotency key?
&lt;/h3&gt;

&lt;p&gt;Initially, we need to ensure the absence of duplicate idempotency keys to guarantee unique request identification. UUIDs or GUIDs can be employed for this purpose. Referring to the Wikipedia definition&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;While each generated GUID is not guaranteed to be unique, the total number of unique keys (2128 or 3.4×1038) is so large that the probability of the same number being generated twice is very small. For example, consider the observable universe, which contains about 5×1022 stars; every star could then have 6.8×1015 universally unique GUIDs.  &lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Okay, now we need to set a reasonable expiration time for the idempotency key to clean up already processed requests. For instance, Stripe uses the following &lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;You can remove keys from the system automatically after they’re at least 24 hours old. We generate a new request if a key is reused after the original is pruned. The idempotency layer compares incoming parameters to those of the original request and errors if they’re the same to prevent accidental misuse.  &lt;/p&gt;
&lt;/blockquote&gt;

&lt;h3&gt;
  
  
  How to store idempotency keys?
&lt;/h3&gt;

&lt;p&gt;The simplest solution would be to create a separate table for processed idempotency keys.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CREATE TABLE [dbo].[IdempotencyKey] (
    [Key] VARCHAR(255) PRIMARY KEY,
    [Namespace] VARCHAR(255) NOT NULL,
    ExpiresAt DATETIME NOT NULL,
    Payload NVARCHAR(MAX)
);
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And during the processing of the message or request, insert the value in the same transaction.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;INSERT INTO IdempotencyKey (Key, Namespace, ExpiresAt, Payload)
VALUES ('uniqueKey123', 'myservice', '2024-03-25 10:00:00', '{"status": "processed", "details": "Transaction completed successfully."}');
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Let us look at what we need to configure on the application level.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;By the way, the whole implementation can be found in the GitHub repository&lt;/strong&gt; &lt;a href="https://github.com/alex-popov-stenn/SimpleIdempotency"&gt;by this link&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;First of all, we need to create an entity and configure mapping with EF Core. There are many cases where frameworks with less overhead, for instance, &lt;a href="https://github.com/DapperLib/Dapper"&gt;Dapper&lt;/a&gt;, could be more efficient, but for simplicity and familiarity, let us stick with EF Core.&lt;/p&gt;

&lt;p&gt;Let's create entity an first&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;IdempotencyKey&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;protected&lt;/span&gt; &lt;span class="nf"&gt;IdempotencyKey&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;static&lt;/span&gt; &lt;span class="n"&gt;IdempotencyKey&lt;/span&gt; &lt;span class="nf"&gt;Create&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;@namespace&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;?&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;TimeSpan&lt;/span&gt; &lt;span class="n"&gt;expiration&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;expiresAt&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;DateTime&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;UtcNow&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;Add&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;expiration&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="nf"&gt;Create&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;@namespace&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;expiresAt&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;static&lt;/span&gt; &lt;span class="n"&gt;IdempotencyKey&lt;/span&gt; &lt;span class="nf"&gt;Create&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;@namespace&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;?&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;DateTime&lt;/span&gt; &lt;span class="n"&gt;absoluteExpirationTime&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;idempotencyKey&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;IdempotencyKey&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;@namespace&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;v&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;absoluteExpirationTime&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;idempotencyKey&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="nf"&gt;IdempotencyKey&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;@namespace&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;?&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;DateTime&lt;/span&gt; &lt;span class="n"&gt;expiresAt&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;Key&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt; &lt;span class="p"&gt;??&lt;/span&gt; &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;ArgumentNullException&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;nameof&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;));&lt;/span&gt;
        &lt;span class="n"&gt;Payload&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="n"&gt;Namespace&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;@namespace&lt;/span&gt; &lt;span class="p"&gt;??&lt;/span&gt; &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;ArgumentNullException&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;nameof&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;@namespace&lt;/span&gt;&lt;span class="p"&gt;));&lt;/span&gt;
        &lt;span class="n"&gt;ExpiresAt&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;expiresAt&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;Namespace&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="k"&gt;get&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;protected&lt;/span&gt; &lt;span class="k"&gt;set&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;!;&lt;/span&gt;
    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;Key&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="k"&gt;get&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;protected&lt;/span&gt; &lt;span class="k"&gt;set&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;!;&lt;/span&gt;
    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;?&lt;/span&gt; &lt;span class="n"&gt;Payload&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="k"&gt;get&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;protected&lt;/span&gt; &lt;span class="k"&gt;set&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="n"&gt;DateTime&lt;/span&gt; &lt;span class="n"&gt;ExpiresAt&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="k"&gt;get&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="k"&gt;protected&lt;/span&gt; &lt;span class="k"&gt;set&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Then mapping&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;internal&lt;/span&gt; &lt;span class="k"&gt;sealed&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;IdempotencyKeyEntityMap&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;IEntityTypeConfiguration&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;IdempotencyKey&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;Configure&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;EntityTypeBuilder&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;IdempotencyKey&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;HasKey&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;x&lt;/span&gt; &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Key&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;Property&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;x&lt;/span&gt; &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;x&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Key&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;HasMaxLength&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="m"&gt;128&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="nf"&gt;IsRequired&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;HasIndex&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;o&lt;/span&gt; &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="n"&gt;o&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Namespace&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;o&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Key&lt;/span&gt; &lt;span class="p"&gt;})&lt;/span&gt;
            &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;HasDatabaseName&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"IX_IdempotencyKey_Uniqueness"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;IsUnique&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Ensure you register it with your DbContext&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;MyDbContext&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;DbContext&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;MyDbContext&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;DbContextOptions&lt;/span&gt; &lt;span class="n"&gt;options&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; 
        &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;base&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;options&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;protected&lt;/span&gt; &lt;span class="k"&gt;override&lt;/span&gt; &lt;span class="k"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;OnModelCreating&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ModelBuilder&lt;/span&gt; &lt;span class="n"&gt;modelBuilder&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;base&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;OnModelCreating&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;modelBuilder&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="n"&gt;modelBuilder&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;ApplyConfigurationsFromAssembly&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
            &lt;span class="k"&gt;typeof&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;SafeAssemblySearchAncestor&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;Assembly&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;SafeAssemblySearchAncestor&lt;/code&gt; is simply an empty type located in the assembly where all &lt;code&gt;IEntityTypeConfiguration&lt;/code&gt; implementations are found.&lt;/p&gt;

&lt;p&gt;We also need a repository to store keys. I won't dive into the implementation of this one since it heavily depends on how you work with the &lt;code&gt;DbContext&lt;/code&gt;.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;interface&lt;/span&gt; &lt;span class="nc"&gt;IIdempotencyKeyRepository&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;RemoveAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;@namespace&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;AddEntityAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;IdempotencyKey&lt;/span&gt; &lt;span class="n"&gt;idempotencyKey&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;SaveChangesAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;Task&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;IdempotencyKey&lt;/span&gt;&lt;span class="p"&gt;?&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;GetFirstOrDefaultAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;@namespace&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;Task&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;IdempotencyKey&lt;/span&gt;&lt;span class="p"&gt;?&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;GetNotExpiredFirstOrDefaultAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;@namespace&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;Task&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;IdempotencyKey&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="k"&gt;]&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;GetExpiredAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;count&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;RemoveRangeAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;IdempotencyKey&lt;/span&gt;&lt;span class="p"&gt;[]&lt;/span&gt; &lt;span class="n"&gt;expired&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;I prefer to separate business logic from persistence details to ensure it remains agnostic about storage methods, thereby enhancing maintainability. Therefore, let's define several behaviours that interact with keys.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;interface&lt;/span&gt; &lt;span class="nc"&gt;IIdempotencyCache&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;Task&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;CacheEntry&lt;/span&gt;&lt;span class="p"&gt;?&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;GetAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;@namespace&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;SetAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;@namespace&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;object&lt;/span&gt;&lt;span class="p"&gt;?&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;TimeSpan&lt;/span&gt; &lt;span class="n"&gt;expiration&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;RemoveAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;@namespace&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;key&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And for cleaning expired ones&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;interface&lt;/span&gt; &lt;span class="nc"&gt;IIdempotencyCacheCleaner&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;CleanExpiredAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;int&lt;/span&gt; &lt;span class="n"&gt;maxSize&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The only step missing is attaching this logic to Web API.&lt;br&gt;
To achieve this, we can utilise &lt;a href="https://learn.microsoft.com/en-us/aspnet/core/mvc/controllers/filters?view=aspnetcore-8.0#action-filters"&gt;ASP.NET Core Filters&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;I developed a very primitive &lt;code&gt;IdempotentFilter&lt;/code&gt; to achieve the desired behaviour.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;sealed&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;IdempotentFilter&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;IAsyncActionFilter&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="n"&gt;IIdempotencyCache&lt;/span&gt; &lt;span class="n"&gt;_idempotencyCache&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;_namespace&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="n"&gt;TimeSpan&lt;/span&gt; &lt;span class="n"&gt;_expiration&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;IdempotentFilter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;IIdempotencyCache&lt;/span&gt; &lt;span class="n"&gt;idempotencyCache&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;IConfiguration&lt;/span&gt; &lt;span class="n"&gt;configuration&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;_idempotencyCache&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;idempotencyCache&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="n"&gt;_namespace&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;configuration&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"IdempotencyNamespace"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="p"&gt;??&lt;/span&gt; &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;ArgumentException&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"There is no such key in the configuration"&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="n"&gt;_expiration&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;TimeSpan&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;FromSeconds&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;int&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;Parse&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;configuration&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"IdempotencyKeyExpirationInSeconds"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="p"&gt;??&lt;/span&gt; &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;ArgumentException&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"There is no such key in the configuration"&lt;/span&gt;&lt;span class="p"&gt;)));&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;OnActionExecutionAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ActionExecutingContext&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ActionExecutionDelegate&lt;/span&gt; &lt;span class="n"&gt;next&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;idempotencyKey&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;HttpContext&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Request&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Headers&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;IdempotencyHeaders&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;IdempotencyKey&lt;/span&gt;&lt;span class="p"&gt;].&lt;/span&gt;&lt;span class="nf"&gt;SingleOrDefault&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;HttpContext&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;RequestAborted&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;idempotencyKey&lt;/span&gt; &lt;span class="k"&gt;is&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;next&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;

        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;cacheEntry&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_idempotencyCache&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;GetAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;_namespace&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;idempotencyKey&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cacheEntry&lt;/span&gt; &lt;span class="p"&gt;!=&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cacheEntry&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt; &lt;span class="k"&gt;is&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="c1"&gt;//Requires more complicated logic to handle different types of MVC results&lt;/span&gt;
                &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Result&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;ObjectResult&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cacheEntry&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="n"&gt;StatusCode&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="m"&gt;200&lt;/span&gt; &lt;span class="p"&gt;};&lt;/span&gt;
                &lt;span class="k"&gt;return&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
            &lt;span class="k"&gt;else&lt;/span&gt;
            &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Result&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;OkResult&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
                &lt;span class="k"&gt;return&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;

        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;executedContext&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;next&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;

        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;executedContext&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Result&lt;/span&gt; &lt;span class="k"&gt;is&lt;/span&gt; &lt;span class="n"&gt;ObjectResult&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="n"&gt;Value&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;not&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt; &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_idempotencyCache&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;SetAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;_namespace&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;idempotencyKey&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Value&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;_expiration&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;else&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_idempotencyCache&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;SetAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;_namespace&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;idempotencyKey&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;_expiration&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;And then used it for the controller's action.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;ApiController&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="nf"&gt;Route&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"[controller]"&lt;/span&gt;&lt;span class="p"&gt;)]&lt;/span&gt;
&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;InvoicesController&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;ControllerBase&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="n"&gt;IInvoiceService&lt;/span&gt; &lt;span class="n"&gt;_invoiceService&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;InvoicesController&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;IInvoiceService&lt;/span&gt; &lt;span class="n"&gt;invoiceService&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;_invoiceService&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;invoiceService&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;HttpPost&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;Transactional&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;Idempotent&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;InvoicePayload&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;CreateInvoice&lt;/span&gt;&lt;span class="p"&gt;([&lt;/span&gt;&lt;span class="n"&gt;FromBody&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="n"&gt;InvoiceModel&lt;/span&gt; &lt;span class="n"&gt;model&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;invoiceId&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_invoiceService&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;CreateInvoice&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;model&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="n"&gt;InvoicePayload&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;Id&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;invoiceId&lt;/span&gt;
        &lt;span class="p"&gt;};&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;It is noteworthy to say that we need to apply both filters in specific order to achieve the desired behaviour: &lt;strong&gt;Transactional&lt;/strong&gt; and &lt;strong&gt;Idempotent&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;Why? &lt;/p&gt;

&lt;p&gt;Because we need to have a global (shared) transaction for both operations to be atomic: creating an invoice and storing the idempotency key.&lt;/p&gt;

&lt;p&gt;TransactionFilter is very simple, and it utilises a transaction with a snapshot isolation type (optimistic locking) to gain better performance.&lt;/p&gt;

&lt;p&gt;This is the sequence diagram that explains how it works.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3mo7z6hxnz9fsm2t1b0q.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F3mo7z6hxnz9fsm2t1b0q.png" alt="Image description" width="800" height="287"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;And very simple &lt;code&gt;TransactionFilter&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;TransactionFilter&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;IAsyncActionFilter&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="n"&gt;IUnitOfWork&lt;/span&gt; &lt;span class="n"&gt;_unitOfWork&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;TransactionFilter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;IUnitOfWork&lt;/span&gt; &lt;span class="n"&gt;unitOfWork&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;_unitOfWork&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;unitOfWork&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;OnActionExecutionAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ActionExecutingContext&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ActionExecutionDelegate&lt;/span&gt; &lt;span class="n"&gt;next&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;HttpContext&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;RequestAborted&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;var&lt;/span&gt; &lt;span class="n"&gt;transaction&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_unitOfWork&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;BeginSnapshotTransactionAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;executedContext&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;next&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;

        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;executedContext&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Exception&lt;/span&gt; &lt;span class="p"&gt;==&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;transaction&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;CommitAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;else&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;transaction&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;RollbackAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Now, let us try to call it.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;curl -X 'POST' \
  'https://localhost:7174/Invoices' \
  -H 'accept: text/plain' \
  -H 'x-idempotency-key: test-key' \
  -H 'Content-Type: application/json' \
  -d '{
  "amount": 150000,
  "dueDate": "2024-03-28T15:48:43.991Z"
}'
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;





&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "Id": "01381c12-c3de-4f45-9b47-be7908b6c528"
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;If the idempotency key already exists, we will immediately return the response for processing. Let's try to call it again with the same key.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "Id": "01381c12-c3de-4f45-9b47-be7908b6c528"
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;But if a duplicate request happens simultaneously, we will receive a 'duplicate key value violates unique constraint' error.&lt;/p&gt;

&lt;p&gt;&lt;em&gt;Microsoft.Data.SqlClient.SqlException (0x80131904): Violation of PRIMARY KEY constraint 'PK_IdempotencyKey'. Cannot insert duplicate key in object 'dbo.IdempotencyKey'. The duplicate key value is (id-2).&lt;/em&gt;&lt;/p&gt;

&lt;p&gt;In such cases, it is better to have retries on the client side to smooth such situations.&lt;/p&gt;

&lt;p&gt;Implementing this strategy significantly helps us in preventing race conditions. By ensuring that each message or request is processed in a synchronized manner within the same transaction, we effectively eliminate the possibility of concurrent processes interfering with each other. &lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Can Redis be utilised instead of SQL Database?&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;Some time ago, I was pretty sure that it could and could be used for all operations. However, &lt;a href="https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html"&gt;this article&lt;/a&gt; from Martin Kleppmann made me doubt about it. The main danger is that to achieve exactly once execution, we need to acquire a lock using the idempotency key, execute the operation and then store the result. However, due to possible garbage collection (GC) pauses and clock desynchronization, we might find ourselves in a situation where two threads, operating under the same key, execute the same operation.&lt;/p&gt;

&lt;p&gt;Although this problem is unlikely to occur often, we should avoid using it for critical operations.&lt;/p&gt;

&lt;h3&gt;
  
  
  What else can be chosen as the idempotency key?
&lt;/h3&gt;

&lt;p&gt;Let us take a look on the situation with Order, Payment, and Delivery.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ftqilduzppoaejt1rinkx.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Ftqilduzppoaejt1rinkx.png" alt="Image description" width="800" height="402"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;In this very simple use case, a customer submits an order, which automatically starts the order processing. It involves two simple steps: charging the customer's account and then initiating delivery. Of course, in real life, there are many more steps, with potential compensation in case of failure at some stage, but for simplicity, I will omit them.  &lt;/p&gt;

&lt;p&gt;So, to avoid submitting an order multiple times, an idempotency key is utilized. This key is generated on the client side and provided in the 'submit order' command.&lt;/p&gt;

&lt;p&gt;It then creates a new order, stores it in the database, and initiates order processing. Now, we need to charge the customer for this order idempotently.&lt;/p&gt;

&lt;p&gt;What should be used in this situation for idempotency? Surely, the same offer ID. Why? Because all the subsequent steps revolve around a certain process, which is represented by the order ID.&lt;/p&gt;

&lt;p&gt;If the order is cancelled, or if we don't receive a response at some step, we will retry the operation for this order. &lt;/p&gt;

&lt;p&gt;Technically, both the delivery service and the payment service are aware of the OrderID in their respective domains, and they are likely to require the OrderID in commands to charge the customer and to initiate delivery. &lt;/p&gt;

&lt;p&gt;Thus, a custom idempotency key is redundant since the idempotency key is represented as part of the business domain objects.&lt;/p&gt;

&lt;p&gt;Therefore, idempotency is one of the pivotal things in building reliable and predictable distributed systems.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Now, let us take a look at another problem related to asynchronous processing.&lt;/strong&gt;&lt;/p&gt;

&lt;h2&gt;
  
  
  Asynchronous processing
&lt;/h2&gt;

&lt;p&gt;There are operations that are better processed asynchronously, meaning that instead of doing the job right now, we push it into the task queue and then notify the originators about the result once it is ready.&lt;/p&gt;

&lt;p&gt;It could be useful if interaction is required with external systems that may be slow, operate asynchronously, having strict rate limits, or even have problems with availability.&lt;/p&gt;

&lt;h3&gt;
  
  
  Okay, what's the problem?
&lt;/h3&gt;

&lt;p&gt;Let us imagine that we have the following system where the main components are:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;(Originator) Revenue calculation service&lt;/strong&gt; - This service is part of an analytics product that periodically requests data for the purpose of calculating revenue. It might analyze sales transactions, payments, or other financial data to provide insights into revenue trends, performance against forecasts, and areas for financial improvement.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;(Originator) Risk assessment service&lt;/strong&gt; - This service evaluates the risk associated with various operations or transactions. It could be assessing credit risk, fraud risk, operational risks, or other types of financial or non-financial risks. Its purpose is to mitigate potential losses by identifying high-risk scenarios and suggesting preventive measures.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Transaction Categorisation service&lt;/strong&gt; - This service schedules transactions classifications into categories such as groceries, utilities, entertainment, revenue, etc.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Categorization Processor service&lt;/strong&gt; - This service is purely a backend service responsible for the actual processing and categorization logic of transactions.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Webhooks delivery service&lt;/strong&gt; - This service manages the sending of webhooks, which are automated messages sent from one app to another when something happens.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Bank transactions provider service&lt;/strong&gt; - This service acts as an interface to bank systems for retrieving transaction data. It may authenticate, fetch, and format bank transaction data for use by other components in the financial technology ecosystem.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Queue (Kafka / Azure Service Bus / RabbitMq / etc)&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;In the following diagram, you can see a blueprint of use cases. Both use cases consist of seven steps, starting with &lt;code&gt;10 Pull transactions for the last period (month or year)&lt;/code&gt; and ending with &lt;code&gt;retrieving processing results by ID.&lt;/code&gt;&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fuvbscpyl0wmt6i5ctin3.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fuvbscpyl0wmt6i5ctin3.png" alt="Image description" width="800" height="741"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;The Transaction Categorization service API requires the provision of an external ID. Under this external ID, all the transactions will be held and categorized. Since the architecture of the transaction categorization implies that it learns over time from data associated with a specific external ID, sending transactions for the same entity (for example, a B2C company) to different external IDs will lead to categorization issues and poor quality. Therefore, the main recommendation is to accumulate all transactions for the same entity together. Consequently, both the Revenue Calculation and Risk Assessment services provide a shared CustomerId throughout the entire system.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
 "event": "processed",
    "created": "2024-05-20T09:23:53+00:00",
    "data": {
        "externalId": "0cf9e867-9970-4a76-915",
        "status": "processed"
    },
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is the webhook payload example received by both originators (Revenue Calculation and Risk Assessment services). However, since they use the same external ID, how can we distinguish between the two originators? Relying on the date? Unlikely, as it is not a reliable approach.&lt;/p&gt;

&lt;h3&gt;
  
  
  Okay, what if we prohibited having multiple processings in progress for a specific external ID?
&lt;/h3&gt;

&lt;p&gt;This approach might slightly improve the situation if the originators somehow track that they have started processing. However, a problem arises if the first originator begins processing and finishes, but the webhook doesn't reach the originator for any reason. Meanwhile, another originator starts and successfully completes another processing, and the webhook is delivered to both. In this case, we will face an issue because the first originator might mistakenly pick up the second webhook.&lt;/p&gt;

&lt;h2&gt;
  
  
  Technique 1: Metadata
&lt;/h2&gt;

&lt;p&gt;In order to distinguish webhooks somehow, we may utilize attaching custom metadata to the send transaction to process the request.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F5j4vmh30x65uliggczfs.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F5j4vmh30x65uliggczfs.png" alt="Image description" width="800" height="741"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;As you can see here, in this example, the Revenue Calculation service attached specific metadata in order to identify its processing result and skip others.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
 "event": "processed",
    "created": "2024-05-20T09:23:53+00:00",
    "data": {
        "externalId": "0cf9e867-9970-4a76-915",
        "status": "processed"
    },
    "metadata": {
    "id": "revenue_calc_id"
   }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;However, since the categorisation processing service does not support searching by metadata due to potential complexity, we still face a situation with a race condition when webhooks are delayed, sent out of processing order, or when originators are pulling stale data.&lt;/p&gt;

&lt;p&gt;Okay, then we need something much simpler!&lt;/p&gt;

&lt;h2&gt;
  
  
  Technique 2: Two-Steps processing
&lt;/h2&gt;

&lt;p&gt;To completely avoid race conditions due to the distributed nature of the system and network failures, we can improve the architecture by introducing the following features.&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Sending transactions to the Process API will return a task ID related to this processing.&lt;/li&gt;
&lt;li&gt;The Send Transactions to Process API will accept an idempotency key to avoid starting duplicate tasks. It won't affect consumers but will save resources and reduce operational complexity.&lt;/li&gt;
&lt;li&gt;Task IDs will be added to the webhook payload.&lt;/li&gt;
&lt;li&gt;The Retrieve Processing Results API will be improved to allow searching by task ID.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;&lt;a href="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fq7yfd6itti0gqaz7iawy.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/cdn-cgi/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fq7yfd6itti0gqaz7iawy.png" alt="Image description" width="800" height="741"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;This is the payload example contains taskid value. Thus, originators can distinguish between webhooks and identify only those that are related to them.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
 "event": "processed",
    "created": "2024-05-20T09:23:53+00:00",
    "data": {
        "taskid": "task_Eqio3Y4ohyNiMphrXwG58p",
        "externalId": "0cf9e867-9970-4a76-915",
        "status": "processed"
    }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Asynchronous processing and timeouts
&lt;/h2&gt;

&lt;p&gt;The last thing I would like to talk about in this article is related to timeouts.&lt;/p&gt;

&lt;p&gt;In nature, there are two main types of communication between two services or components: Fire and forget and Request and Reply.&lt;/p&gt;

&lt;h3&gt;
  
  
  Fire and forget
&lt;/h3&gt;

&lt;p&gt;This is a communication pattern where a client sends a message (or request) to a server and immediately continues its execution without waiting for a response. The server might process the message at some point in the future, but the client does not know when or indeed if the message has been successfully received or processed. Technically, it even doesn't care wherther it was processed or not.&lt;/p&gt;

&lt;p&gt;Usually, this pattern refers to asynchronous nature and is used when the client does not require an immediate response or confirmation. It's often used for logging, sending notifications, or other tasks where receiving a response is not critical to the continuation of the client's processing.&lt;/p&gt;

&lt;h3&gt;
  
  
  Request and Reply
&lt;/h3&gt;

&lt;p&gt;This is a communication pattern where a client sends a request to a server and waits for a response. The server processes the request and then sends back a reply to the client.&lt;/p&gt;

&lt;p&gt;Usaully, this pattern refers to synchronous nature, meaning the client blocks and waits until the response is received before continuing its execution.&lt;/p&gt;

&lt;p&gt;However, the interesting thing is that even when I use asynchronous (non-blocking for current execution) communication for account charging or order delivery, I am still interested in a reply. If there is no reply after some time, what should the system do? Cancel? Retry? Send an alert to the on-call team?&lt;/p&gt;

&lt;p&gt;The main idea is that despite the communication approach you use (synchronous or asynchronous), you should be concerned about possible network failures and plan for them before they happen.&lt;/p&gt;

&lt;p&gt;The simplest approach would be to incorporate a timeout on the caller side, similar to what we have in a blocking request. To implement this, we need to understand where to store the expiration date. The answer lies in the domain objects. Timeout behavior is usually part of their lifecycle. For instance, if a document request for a loan application isn't completed within 14 days, we should close this document request and probably try again.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusion
&lt;/h2&gt;

&lt;p&gt;In this article, I talked about one of the necessary techniques for building a predictable and reliable API. Also I have covered several problems related to the nature of distributed systems and the complexity of asynchronous operations, unavoidable network failures, and possible techniques to mitigate them, thereby improving the quality attributes of the system and experience for both customers and internal users.&lt;/p&gt;

&lt;p&gt;See you next time! &lt;/p&gt;

</description>
      <category>microservices</category>
      <category>csharp</category>
      <category>architecture</category>
      <category>api</category>
    </item>
  </channel>
</rss>
