<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Wesley Conde</title>
    <description>The latest articles on DEV Community by Wesley Conde (@wesleyskap).</description>
    <link>https://dev.to/wesleyskap</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.us-east-2.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F23645%2Fd9461ff7-fa61-46a1-ae57-cdb987709980.jpeg</url>
      <title>DEV Community: Wesley Conde</title>
      <link>https://dev.to/wesleyskap</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/wesleyskap"/>
    <language>en</language>
    <item>
      <title>Scheduled and delayed messages:Routing with TTL and dead-letter exchange in RabbitMQ</title>
      <dc:creator>Wesley Conde</dc:creator>
      <pubDate>Tue, 30 Jun 2026 17:35:00 +0000</pubDate>
      <link>https://dev.to/wesleyskap/scheduled-and-delayed-messagesrouting-with-ttl-and-dead-letter-exchange-in-rabbitmq-7dg</link>
      <guid>https://dev.to/wesleyskap/scheduled-and-delayed-messagesrouting-with-ttl-and-dead-letter-exchange-in-rabbitmq-7dg</guid>
      <description>&lt;h2&gt;
  
  
  The challenge of scheduling events without plugins
&lt;/h2&gt;

&lt;p&gt;Often, we need to delay message delivery in our messaging architectures: for example, sending a reminder email 2 hours after account creation, or retrying a temporarily failed transaction after a planned 10-minute delay.&lt;/p&gt;

&lt;p&gt;While RabbitMQ offers an official delayed messages plugin (&lt;code&gt;rabbitmq-delayed-message-exchange&lt;/code&gt;), it is not always available or permitted in managed cloud environments (like AWS Amazon MQ).&lt;/p&gt;

&lt;p&gt;The &lt;strong&gt;onkai-unified-bus&lt;/strong&gt; bypasses this limitation by designing a native topology based on message &lt;strong&gt;Time-To-Live (TTL)&lt;/strong&gt; expiration directed toward a &lt;strong&gt;Dead-Letter Exchange (DLX)&lt;/strong&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  The delayed message routing flow
&lt;/h2&gt;

&lt;p&gt;To delay a message, we create an indirect routing flow:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Publish the event to a temporary queue that has &lt;strong&gt;no active consumers&lt;/strong&gt;.&lt;/li&gt;
&lt;li&gt;Configure a &lt;code&gt;x-message-ttl&lt;/code&gt; limit on this queue equivalent to the desired delay.&lt;/li&gt;
&lt;li&gt;Configure a Dead-Letter Exchange (&lt;code&gt;DLX&lt;/code&gt;) pointing to the final target exchange.&lt;/li&gt;
&lt;li&gt;When the TTL expires, RabbitMQ automatically evicts the message from the temporary queue and routes it to the exchange configured in the DLX.
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;package&lt;/span&gt; &lt;span class="n"&gt;main&lt;/span&gt;

&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="s"&gt;"context"&lt;/span&gt;
    &lt;span class="s"&gt;"fmt"&lt;/span&gt;
    &lt;span class="s"&gt;"github.com/rabbitmq/amqp091-go"&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c"&gt;// SetupDelayedTopology declares a consumer-less queue with TTL and DLX arguments&lt;/span&gt;
&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;SetupDelayedTopology&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ch&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;amqp091_go&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Channel&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;delayMs&lt;/span&gt; &lt;span class="kt"&gt;int&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;delayQueueName&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Sprintf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"delay-queue-%dms"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;delayMs&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="c"&gt;// Declares the queue with specific TTL and DLX configurations&lt;/span&gt;
    &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;ch&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;QueueDeclare&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="n"&gt;delayQueueName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="no"&gt;true&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;  &lt;span class="c"&gt;// durable&lt;/span&gt;
        &lt;span class="no"&gt;false&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="c"&gt;// auto-delete&lt;/span&gt;
        &lt;span class="no"&gt;false&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="c"&gt;// exclusive&lt;/span&gt;
        &lt;span class="no"&gt;false&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="c"&gt;// no-wait&lt;/span&gt;
        &lt;span class="n"&gt;amqp091_go&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Table&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="s"&gt;"x-message-ttl"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;             &lt;span class="kt"&gt;int32&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;delayMs&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;       &lt;span class="c"&gt;// Retention period&lt;/span&gt;
            &lt;span class="s"&gt;"x-dead-letter-exchange"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;    &lt;span class="s"&gt;"main-event-exchange"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="c"&gt;// Destination post expiration&lt;/span&gt;
            &lt;span class="s"&gt;"x-dead-letter-routing-key"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="s"&gt;"notifications.send"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;delayQueueName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Benefits of the native approach
&lt;/h2&gt;

&lt;p&gt;Using this routing pattern:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Eliminates dependencies on third-party broker plugins.&lt;/li&gt;
&lt;li&gt;Leverages the native persistence and reliability guarantees of the RabbitMQ engine.&lt;/li&gt;
&lt;li&gt;Supports varied delay durations by declaring queues with specific TTLs on the fly.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Technical terms demystified
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;TTL (Time-To-Live):&lt;/strong&gt; The maximum period of time a message can remain in a queue before being discarded or moved to a dead-letter exchange.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Dead-Letter Exchange (DLX):&lt;/strong&gt; A normal RabbitMQ exchange where expired, rejected, or failed messages are routed automatically.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Delayed Topology:&lt;/strong&gt; An arrangement of exchanges and queues configured to temporarily hold messages for a controlled time.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;link: &lt;a href="https://github.com/wesleyskap/onkai-unified-bus" rel="noopener noreferrer"&gt;https://github.com/wesleyskap/onkai-unified-bus&lt;/a&gt;&lt;/p&gt;

</description>
      <category>csharp</category>
      <category>scheduling</category>
      <category>events</category>
      <category>rabbitmq</category>
    </item>
    <item>
      <title>Avoiding duplicate processing: The inbox pattern and idempotent consumers</title>
      <dc:creator>Wesley Conde</dc:creator>
      <pubDate>Fri, 15 May 2026 19:39:00 +0000</pubDate>
      <link>https://dev.to/wesleyskap/avoiding-duplicate-processing-the-inbox-pattern-and-idempotent-consumers-4568</link>
      <guid>https://dev.to/wesleyskap/avoiding-duplicate-processing-the-inbox-pattern-and-idempotent-consumers-4568</guid>
      <description>&lt;h2&gt;
  
  
  The at-least-once delivery problem
&lt;/h2&gt;

&lt;p&gt;In high-scale distributed systems, guaranteeing that a message is delivered exactly once (&lt;strong&gt;Exactly-Once&lt;/strong&gt;) is a highly complex and network-inefficient task. For this reason, most message brokers (like RabbitMQ and Kafka) guarantee &lt;strong&gt;At-Least-Once&lt;/strong&gt; delivery.&lt;/p&gt;

&lt;p&gt;This means that during network hiccups or server restarts, the same event may be delivered to your consumer more than once. If your consumer processes this duplicate message (for example, approving a payment again or deducting balance twice), it will cause severe data inconsistency.&lt;/p&gt;

&lt;p&gt;To ensure each event is processed exactly once by the business logic, we implement the &lt;strong&gt;Inbox Pattern&lt;/strong&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  The inbox pattern
&lt;/h2&gt;

&lt;p&gt;The Inbox Pattern stores the IDs of all successfully processed messages in a persistent, transactional database table. Before initiating any business logic, the consumer checks if the incoming message ID already exists in the inbox history:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;package&lt;/span&gt; &lt;span class="n"&gt;main&lt;/span&gt;

&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="s"&gt;"context"&lt;/span&gt;
    &lt;span class="s"&gt;"database/sql"&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;InboxStore&lt;/span&gt; &lt;span class="k"&gt;interface&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;HasProcessed&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;messageID&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;bool&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;MarkAsProcessed&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;messageID&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;SQLInboxStore&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;db&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;sql&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DB&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;store&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;SQLInboxStore&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;HasProcessed&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;messageID&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;bool&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="n"&gt;exists&lt;/span&gt; &lt;span class="kt"&gt;bool&lt;/span&gt;
    &lt;span class="n"&gt;query&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="s"&gt;"SELECT EXISTS(SELECT 1 FROM inbox_messages WHERE message_id = ?)"&lt;/span&gt;
    &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;store&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;db&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;QueryRowContext&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;query&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;messageID&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Scan&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;exists&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;exists&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;store&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;SQLInboxStore&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;MarkAsProcessed&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;messageID&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;store&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;db&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ExecContext&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
        &lt;span class="s"&gt;"INSERT INTO inbox_messages (message_id, processed_at) VALUES (?, NOW())"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; 
        &lt;span class="n"&gt;messageID&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Transactional idempotent consumption
&lt;/h2&gt;

&lt;p&gt;We couple the Inbox verification and recording inside the same database transaction that executes the business logic. If the message is already present, the flow terminates gracefully without executing the business logic again:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;c&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;OrderCompletedConsumer&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;Consume&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="n"&gt;Message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;tx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;c&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;db&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;BeginTx&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="n"&gt;tx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Rollback&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="c"&gt;// 1. Checks duplicate state using the InboxStore within the transaction&lt;/span&gt;
    &lt;span class="n"&gt;alreadyProcessed&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;c&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;inboxStore&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;HasProcessed&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ID&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="n"&gt;alreadyProcessed&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="c"&gt;// Already processed! Exit silently to avoid processing again&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="c"&gt;// 2. Executes the core business logic&lt;/span&gt;
    &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;c&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;executeOrderBusinessLogic&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="c"&gt;// 3. Marks the message as processed in the Inbox&lt;/span&gt;
    &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;c&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;inboxStore&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;MarkAsProcessed&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ID&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;tx&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Commit&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Technical terms demystified
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Idempotency:&lt;/strong&gt; The property of certain operations where they can be applied multiple times without changing the result beyond the initial application.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Inbox Pattern:&lt;/strong&gt; A pattern where incoming messages are logged into a local table/database to check and filter out duplicates before executing business logic.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Silent Ignore:&lt;/strong&gt; The practice of ignoring a duplicate request without returning a failure, avoiding unnecessary error logs or retries.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;link: &lt;a href="https://github.com/wesleyskap/onkai-unified-bus" rel="noopener noreferrer"&gt;https://github.com/wesleyskap/onkai-unified-bus&lt;/a&gt;&lt;/p&gt;

</description>
      <category>chsarp</category>
      <category>idempotent</category>
      <category>pattern</category>
      <category>brokers</category>
    </item>
    <item>
      <title>Zero-allocation dispatcher: Routing millions of events without heap allocations in C#</title>
      <dc:creator>Wesley Conde</dc:creator>
      <pubDate>Mon, 20 Apr 2026 17:09:00 +0000</pubDate>
      <link>https://dev.to/wesleyskap/zero-allocation-dispatcherrouting-millions-of-events-without-heap-allocations-in-go-26ok</link>
      <guid>https://dev.to/wesleyskap/zero-allocation-dispatcherrouting-millions-of-events-without-heap-allocations-in-go-26ok</guid>
      <description>&lt;h2&gt;
  
  
  The bottleneck of dynamic allocations in high-throughput event buses
&lt;/h2&gt;

&lt;p&gt;In high-performance event-driven architectures, every microsecond counts. When implementing a local in-memory Event Bus in .NET, the most common pattern is allocating new message containers and wrapping delegates dynamically. While simple, this approach creates a large volume of short-lived objects on the heap. As a consequence, the Garbage Collector (GC) runs aggressively, causing gen-0/gen-1 collection overhead that degrades overall latency.&lt;/p&gt;

&lt;p&gt;To solve this cleanly, &lt;strong&gt;onkai-unified-bus&lt;/strong&gt; adopts a virtual zero-allocation strategy. By utilizing a high-performance, structured dispatch engine, it processes events via internally pooled message envelopes and concurrent channels, giving .NET developers maximum throughput with minimal memory footprint.&lt;/p&gt;

&lt;h2&gt;
  
  
  Bootstrapping the event bus
&lt;/h2&gt;

&lt;p&gt;The library exposes a fluent registration API for Microsoft Dependency Injection. Under the hood, this sets up the zero-allocation transport pipelines, pooled resources, and background workers automatically.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;Microsoft.Extensions.DependencyInjection&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;Onkai.EventBus.Core.Extensions&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;Onkai.EventBus.RabbitMQ.Extensions&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;builder&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;WebApplication&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;CreateBuilder&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;args&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="c1"&gt;// Register Core EventBus infrastructure&lt;/span&gt;
&lt;span class="n"&gt;builder&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Services&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;AddEventBus&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
                &lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;UseRabbitMq&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;config&lt;/span&gt; &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt;
                &lt;span class="p"&gt;{&lt;/span&gt;
                    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;HostName&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"localhost"&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
                    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;UserName&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"guest"&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
                    &lt;span class="n"&gt;config&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Password&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"guest"&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
                &lt;span class="p"&gt;});&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  High-speed event publishing
&lt;/h2&gt;

&lt;p&gt;Once configured, developers simply inject the thread-safe &lt;code&gt;IEventPublisher&lt;/code&gt; interface to dispatch events. The framework takes care of acquiring a pooled wrapper envelope, serializing the event payload, and scheduling delivery via optimized pipelines:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;Onkai.EventBus.Abstractions&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;sealed&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;OrderService&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="n"&gt;IEventPublisher&lt;/span&gt; &lt;span class="n"&gt;_publisher&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;OrderService&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;IEventPublisher&lt;/span&gt; &lt;span class="n"&gt;publisher&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;_publisher&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;publisher&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;CheckoutAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;Guid&lt;/span&gt; &lt;span class="n"&gt;orderId&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;orderEvent&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;OrderCreatedEvent&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;orderId&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="m"&gt;199.99m&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"customer@example.com"&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

        &lt;span class="c1"&gt;// Dispatches the event through onkai-unified-bus high-performance engine&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_publisher&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;PublishAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;orderEvent&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Technical terms demystified
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Heap:&lt;/strong&gt; The region of system memory where dynamically allocated data lives at runtime. Managing the heap and performing Garbage Collection is far more expensive than using the Stack.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;ObjectPool:&lt;/strong&gt; A design pattern and .NET class that caches temporary objects for future reuse, reducing allocations.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;System.Threading.Channels:&lt;/strong&gt; A set of APIs in .NET that provides a high-performance, zero-allocation producer-consumer queue for concurrent architectures.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;link: &lt;a href="https://github.com/wesleyskap/onkai-unified-bus" rel="noopener noreferrer"&gt;https://github.com/wesleyskap/onkai-unified-bus&lt;/a&gt;&lt;/p&gt;

</description>
      <category>csharp</category>
      <category>allocations</category>
      <category>throughput</category>
      <category>buses</category>
    </item>
    <item>
      <title>Distributed transactions with sagas: Orchestrating steps and compensations</title>
      <dc:creator>Wesley Conde</dc:creator>
      <pubDate>Sat, 18 Apr 2026 18:01:00 +0000</pubDate>
      <link>https://dev.to/wesleyskap/distributed-transactions-with-sagas-orchestrating-steps-and-compensations-3kk6</link>
      <guid>https://dev.to/wesleyskap/distributed-transactions-with-sagas-orchestrating-steps-and-compensations-3kk6</guid>
      <description>&lt;h2&gt;
  
  
  Distributed transactions in microservices
&lt;/h2&gt;

&lt;p&gt;In a monolithic application, maintaining data consistency is straightforward: open a database transaction, modify the tables, and commit the changes. If any step fails, the database rolls back everything.&lt;/p&gt;

&lt;p&gt;However, in a microservices architecture, each service owns its database. We cannot execute a single database transaction across the Order service, Inventory service, and Billing service.&lt;/p&gt;

&lt;p&gt;To maintain consistency across multiple microservice boundaries without slow distributed locks (like two-phase commits - 2PC), we implement the &lt;strong&gt;Orchestrated Saga&lt;/strong&gt; pattern.&lt;/p&gt;

&lt;h2&gt;
  
  
  The saga pattern and compensations
&lt;/h2&gt;

&lt;p&gt;A Saga is a sequence of local transactions executed by individual microservices. If a local transaction fails, the orchestrator triggers &lt;strong&gt;Compensation&lt;/strong&gt; transactions in reverse order to undo the side effects of previous steps. Here is how &lt;code&gt;SagaOrchestrator&amp;lt;TState&amp;gt;&lt;/code&gt; is built in &lt;code&gt;onkai-unified-bus&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;System.Collections.Concurrent&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;Onkai.EventBus.Abstractions&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;namespace&lt;/span&gt; &lt;span class="nn"&gt;Onkai.EventBus.Core.Sagas&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;sealed&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;SagaOrchestrator&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;TState&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt;
    &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="n"&gt;TState&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt;&lt;span class="err"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;new&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="n"&gt;ISagaStateStore&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;TState&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;_store&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="n"&gt;ConcurrentDictionary&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Func&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;SagaContext&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;TState&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;_compensations&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;

    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;SagaOrchestrator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ISagaStateStore&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;TState&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;store&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;_store&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;store&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="c1"&gt;// Registers a compensation callback for a given step name&lt;/span&gt;
    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;RegisterCompensation&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;stepName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Func&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;SagaContext&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;TState&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;compensation&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;_compensations&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;stepName&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;compensation&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Running saga steps safely
&lt;/h2&gt;

&lt;p&gt;As business steps execute, the orchestrator updates the saga state persistently. If a step fails, the orchestrator automatically triggers the rollback flow, invoking registered compensations in reverse chronological order:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="n"&gt;ExecuteStepAsync&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;TEvent&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;(&lt;/span&gt;
    &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;sagaId&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;TEvent&lt;/span&gt; &lt;span class="n"&gt;@event&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;Func&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;SagaContext&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;TState&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;,&lt;/span&gt; &lt;span class="n"&gt;TEvent&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;stepAction&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="n"&gt;TEvent&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;IEvent&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_store&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;GetAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sagaId&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;??&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="n"&gt;SagaContext&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;TState&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="n"&gt;SagaId&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;sagaId&lt;/span&gt; &lt;span class="p"&gt;};&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Status&lt;/span&gt; &lt;span class="k"&gt;is&lt;/span&gt; &lt;span class="s"&gt;"Failed"&lt;/span&gt; &lt;span class="k"&gt;or&lt;/span&gt; &lt;span class="s"&gt;"Compensated"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;try&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;stepAction&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;@event&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CompletedSteps&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;Add&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;typeof&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;TEvent&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;Name&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_store&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;SaveAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;catch&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;RollbackSagaAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;typeof&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;TEvent&lt;/span&gt;&lt;span class="p"&gt;).&lt;/span&gt;&lt;span class="n"&gt;Name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;RollbackSagaAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;SagaContext&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;TState&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="n"&gt;failingStep&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Status&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"Failed"&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_store&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;SaveAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

    &lt;span class="c1"&gt;// Compensate the failing step and all previously completed steps&lt;/span&gt;
    &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;CompensateStepAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;failingStep&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CompletedSteps&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Count&lt;/span&gt; &lt;span class="p"&gt;-&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt; &lt;span class="p"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="m"&gt;0&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt; &lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="p"&gt;--)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;completedStep&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CompletedSteps&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;i&lt;/span&gt;&lt;span class="p"&gt;];&lt;/span&gt;
        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="nf"&gt;CompensateStepAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;completedStep&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Status&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"Compensated"&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_store&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;SaveAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Technical terms demystified
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Saga Pattern:&lt;/strong&gt; A design pattern to coordinate distributed local transactions using structured compensations to restore consistency.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Compensation:&lt;/strong&gt; A logical transaction that rolls back or undoes the changes made by a previously successful operation.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Saga State Store:&lt;/strong&gt; A persistent or in-memory database that tracks the active steps and context of ongoing sagas.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;link: &lt;a href="https://github.com/wesleyskap/onkai-unified-bus" rel="noopener noreferrer"&gt;https://github.com/wesleyskap/onkai-unified-bus&lt;/a&gt;&lt;/p&gt;

</description>
      <category>cshap</category>
      <category>pattern</category>
      <category>orchestrating</category>
      <category>saga</category>
    </item>
    <item>
      <title>Reflection-free dispatcher: Optimizing performance and supporting native AOT</title>
      <dc:creator>Wesley Conde</dc:creator>
      <pubDate>Thu, 16 Apr 2026 17:41:00 +0000</pubDate>
      <link>https://dev.to/wesleyskap/reflection-free-dispatcher-optimizing-performance-and-supporting-native-aot-3ljh</link>
      <guid>https://dev.to/wesleyskap/reflection-free-dispatcher-optimizing-performance-and-supporting-native-aot-3ljh</guid>
      <description>&lt;h2&gt;
  
  
  The costs of dynamic dispatch and AOT compilation
&lt;/h2&gt;

&lt;p&gt;In traditional messaging frameworks, when a new event arrives, the router must dynamically find which class or struct handles that message type. In most runtimes, this is done using &lt;strong&gt;Reflection&lt;/strong&gt; to inspect types at runtime.&lt;/p&gt;

&lt;p&gt;While simple and flexible, reflection introduces severe issues:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;
&lt;strong&gt;Performance Overhead:&lt;/strong&gt; Dynamic inspections and method invocations allocate metadata on the &lt;em&gt;heap&lt;/em&gt;, adding load to the garbage collector.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Incompatibility with Native AOT (Ahead-of-Time):&lt;/strong&gt; AOT compilers prune unused code and metadata during build time to produce tiny binaries. Invoking code via dynamic reflection often fails or crashes in production because the compiler cannot predict what will be dynamically inspected.&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;The &lt;strong&gt;onkai-unified-bus&lt;/strong&gt; solves this by replacing dynamic invocations with a statically-typed dispatcher using a concurrent cache of typed executors.&lt;/p&gt;

&lt;h2&gt;
  
  
  Statically-typed consumer executors
&lt;/h2&gt;

&lt;p&gt;Instead of using reflection to find and invoke the consumer method on every message, the framework registers generic executor objects during startup. The dispatcher delegates execution through static interfaces:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;Onkai.EventBus.Abstractions&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;namespace&lt;/span&gt; &lt;span class="nn"&gt;Onkai.EventBus.Core.Subscription&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="c1"&gt;// Defines a contract to dispatch events to untyped consumer instances without using reflection invocation.&lt;/span&gt;
&lt;span class="k"&gt;internal&lt;/span&gt; &lt;span class="k"&gt;interface&lt;/span&gt; &lt;span class="nc"&gt;IEventConsumerExecutor&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;ExecuteAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;object&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;IEvent&lt;/span&gt; &lt;span class="n"&gt;@event&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ConsumeContext&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c1"&gt;// A generic helper that casts untyped inputs and executes the strongly-typed ConsumeAsync method directly.&lt;/span&gt;
&lt;span class="k"&gt;internal&lt;/span&gt; &lt;span class="k"&gt;sealed&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;EventConsumerExecutor&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;TEvent&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;IEventConsumerExecutor&lt;/span&gt;
    &lt;span class="k"&gt;where&lt;/span&gt; &lt;span class="n"&gt;TEvent&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;IEvent&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;ExecuteAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;object&lt;/span&gt; &lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;IEvent&lt;/span&gt; &lt;span class="n"&gt;@event&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ConsumeContext&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt; &lt;span class="p"&gt;==&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;ArgumentNullException&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;nameof&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;));&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;@event&lt;/span&gt; &lt;span class="p"&gt;==&lt;/span&gt; &lt;span class="k"&gt;null&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;throw&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;ArgumentNullException&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;nameof&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;@event&lt;/span&gt;&lt;span class="p"&gt;));&lt;/span&gt;

        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;typedConsumer&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;IEventConsumer&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;TEvent&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;)&lt;/span&gt;&lt;span class="n"&gt;consumer&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;typedEvent&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;TEvent&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="n"&gt;@event&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;typedConsumer&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;ConsumeAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;typedEvent&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;cancellationToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  The reflection-free dispatch engine
&lt;/h2&gt;

&lt;p&gt;Upon receiving a message, the dispatcher loads the matching executor from a thread-safe map (&lt;code&gt;ConcurrentDictionary&lt;/code&gt;) cached by event type, executing the interface call directly in nanoseconds:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;System.Collections.Concurrent&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;Onkai.EventBus.Abstractions&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;sealed&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;RabbitMqConsumer&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="n"&gt;ConcurrentDictionary&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;Type&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;IEventConsumerExecutor&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;_executors&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;

    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;ExecuteConsumerAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;Type&lt;/span&gt; &lt;span class="n"&gt;eventType&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;object&lt;/span&gt; &lt;span class="n"&gt;consumerInstance&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;IEvent&lt;/span&gt; &lt;span class="n"&gt;eventData&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;ConsumeContext&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="c1"&gt;// Get or add the cached compiled executor statically without reflection lookup&lt;/span&gt;
        &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;executor&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;_executors&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;GetOrAdd&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;eventType&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;t&lt;/span&gt; &lt;span class="p"&gt;=&amp;gt;&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;executorType&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;typeof&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;EventConsumerExecutor&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&amp;gt;).&lt;/span&gt;&lt;span class="nf"&gt;MakeGenericType&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;t&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;IEventConsumerExecutor&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="n"&gt;Activator&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;CreateInstance&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;executorType&lt;/span&gt;&lt;span class="p"&gt;)!;&lt;/span&gt;
        &lt;span class="p"&gt;});&lt;/span&gt;

        &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;executor&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;ExecuteAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;consumerInstance&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;eventData&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Technical terms demystified
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Native AOT (Ahead-of-Time):&lt;/strong&gt; A compilation technology that compiles source code directly to native machine code at build time, bypassing JIT compilers or runtime interpreters.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Reflection-Free Dispatcher:&lt;/strong&gt; A routing pattern that uses static interfaces or pre-compiled delegates to invoke handlers without inspecting objects at runtime.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Type Casting:&lt;/strong&gt; Explicitly converting a generic interface or variable to its concrete structural type.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;link: &lt;a href="https://github.com/wesleyskap/onkai-unified-bus" rel="noopener noreferrer"&gt;https://github.com/wesleyskap/onkai-unified-bus&lt;/a&gt;&lt;/p&gt;

</description>
      <category>csharp</category>
      <category>aot</category>
      <category>heap</category>
      <category>bus</category>
    </item>
    <item>
      <title>Reliable messaging:Guaranteeing at-least-once with transactional outbox pattern</title>
      <dc:creator>Wesley Conde</dc:creator>
      <pubDate>Sun, 12 Apr 2026 19:46:00 +0000</pubDate>
      <link>https://dev.to/wesleyskap/reliable-messagingguaranteeing-at-least-once-with-transactional-outbox-pattern-1d3d</link>
      <guid>https://dev.to/wesleyskap/reliable-messagingguaranteeing-at-least-once-with-transactional-outbox-pattern-1d3d</guid>
      <description>&lt;h2&gt;
  
  
  The two-phase distributed transaction dilemma
&lt;/h2&gt;

&lt;p&gt;Consider this classic microservice scenario: your payment service completes a payment and saves the record in its local database. Next, it publishes a &lt;code&gt;PaymentApprovedEvent&lt;/code&gt; to your message broker to notify the shipping service.&lt;/p&gt;

&lt;p&gt;What happens if the database write fails but the message is already published? The customer didn't pay, but the product will be shipped. And what if the database write succeeds, but the network drops before publishing the message to the broker? The customer paid, but the product will never be dispatched.&lt;/p&gt;

&lt;p&gt;The &lt;strong&gt;Transactional Outbox&lt;/strong&gt; pattern resolves this data consistency dilemma, ensuring &lt;strong&gt;At-Least-Once&lt;/strong&gt; message delivery.&lt;/p&gt;

&lt;h2&gt;
  
  
  Outbox pattern design
&lt;/h2&gt;

&lt;p&gt;Instead of publishing the event directly to the broker, we save the message in a special table called &lt;code&gt;outbox&lt;/code&gt; within the same database, using the &lt;strong&gt;same database transaction&lt;/strong&gt; that writes the business data. In .NET, this is naturally achieved using Entity Framework Core's &lt;code&gt;DbContext&lt;/code&gt; within a transaction scope:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;System.Text.Json&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;Microsoft.EntityFrameworkCore&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;Onkai.EventBus.Abstractions&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;sealed&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;OrderService&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="n"&gt;DbContext&lt;/span&gt; &lt;span class="n"&gt;_dbContext&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="n"&gt;IOutboxStore&lt;/span&gt; &lt;span class="n"&gt;_outboxStore&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;OrderService&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;DbContext&lt;/span&gt; &lt;span class="n"&gt;dbContext&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;IOutboxStore&lt;/span&gt; &lt;span class="n"&gt;outboxStore&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;_dbContext&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;dbContext&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="n"&gt;_outboxStore&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;outboxStore&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;CreateOrderAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;Guid&lt;/span&gt; &lt;span class="n"&gt;orderId&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;decimal&lt;/span&gt; &lt;span class="n"&gt;amount&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="c1"&gt;// Execute both business write and outbox write inside the same atomic database transaction&lt;/span&gt;
        &lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;var&lt;/span&gt; &lt;span class="n"&gt;transaction&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_dbContext&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Database&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;BeginTransactionAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="k"&gt;try&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="c1"&gt;// 1. Writes business data&lt;/span&gt;
            &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;order&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="n"&gt;Order&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt; &lt;span class="n"&gt;Id&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;orderId&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Amount&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;amount&lt;/span&gt; &lt;span class="p"&gt;};&lt;/span&gt;
            &lt;span class="n"&gt;_dbContext&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;Add&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;order&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
            &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_dbContext&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;SaveChangesAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

            &lt;span class="c1"&gt;// 2. Serializes and registers event in the outbox table via the outbox store within the transaction&lt;/span&gt;
            &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;orderEvent&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;OrderCreatedEvent&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;orderId&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;amount&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
            &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;_outboxStore&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;SaveMessageAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;orderEvent&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

            &lt;span class="c1"&gt;// Commits the transaction. If it fails, everything is rolled back.&lt;/span&gt;
            &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;transaction&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;CommitAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;catch&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;transaction&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;RollbackAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;token&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
            &lt;span class="k"&gt;throw&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  The outbox processor (relay)
&lt;/h2&gt;

&lt;p&gt;A background hosted service (&lt;code&gt;OutboxProcessor&lt;/code&gt;) periodically polls unpublished messages from the outbox store, attempts to publish them to the broker transport, and marks them as published upon receiving confirmations (ACKs):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight csharp"&gt;&lt;code&gt;&lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;Microsoft.Extensions.Hosting&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;Onkai.EventBus.Core.Transport&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="k"&gt;sealed&lt;/span&gt; &lt;span class="k"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;OutboxProcessor&lt;/span&gt; &lt;span class="p"&gt;:&lt;/span&gt; &lt;span class="n"&gt;BackgroundService&lt;/span&gt;
&lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;private&lt;/span&gt; &lt;span class="k"&gt;readonly&lt;/span&gt; &lt;span class="n"&gt;IServiceProvider&lt;/span&gt; &lt;span class="n"&gt;_serviceProvider&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

    &lt;span class="k"&gt;public&lt;/span&gt; &lt;span class="nf"&gt;OutboxProcessor&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;IServiceProvider&lt;/span&gt; &lt;span class="n"&gt;serviceProvider&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;_serviceProvider&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;serviceProvider&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;protected&lt;/span&gt; &lt;span class="k"&gt;override&lt;/span&gt; &lt;span class="k"&gt;async&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt; &lt;span class="nf"&gt;ExecuteAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;CancellationToken&lt;/span&gt; &lt;span class="n"&gt;stoppingToken&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;while&lt;/span&gt; &lt;span class="p"&gt;(!&lt;/span&gt;&lt;span class="n"&gt;stoppingToken&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;IsCancellationRequested&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;using&lt;/span&gt; &lt;span class="nn"&gt;var&lt;/span&gt; &lt;span class="n"&gt;scope&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;_serviceProvider&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;CreateScope&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt;
            &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;store&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;scope&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ServiceProvider&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;GetRequiredService&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;IOutboxStore&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;();&lt;/span&gt;
            &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;transport&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;scope&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ServiceProvider&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;GetRequiredService&lt;/span&gt;&lt;span class="p"&gt;&amp;lt;&lt;/span&gt;&lt;span class="n"&gt;IMessageTransport&lt;/span&gt;&lt;span class="p"&gt;&amp;gt;();&lt;/span&gt;

            &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;pendingMessages&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;store&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;GetUnpublishedMessagesAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stoppingToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
            &lt;span class="k"&gt;foreach&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="k"&gt;in&lt;/span&gt; &lt;span class="n"&gt;pendingMessages&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="kt"&gt;var&lt;/span&gt; &lt;span class="n"&gt;envelope&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="n"&gt;TransportEnvelope&lt;/span&gt;
                &lt;span class="p"&gt;{&lt;/span&gt;
                    &lt;span class="n"&gt;EventId&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;EventId&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="n"&gt;EventName&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;EventName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="n"&gt;Body&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Body&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
                    &lt;span class="n"&gt;CorrelationId&lt;/span&gt; &lt;span class="p"&gt;=&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CorrelationId&lt;/span&gt;
                &lt;span class="p"&gt;};&lt;/span&gt;

                &lt;span class="c1"&gt;// Publishes the envelope to the broker&lt;/span&gt;
                &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;transport&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;SendAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;envelope&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;stoppingToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

                &lt;span class="c1"&gt;// Updates status in the outbox table&lt;/span&gt;
                &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;store&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;MarkAsPublishedAsync&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;stoppingToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;

            &lt;span class="k"&gt;await&lt;/span&gt; &lt;span class="n"&gt;Task&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;Delay&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;TimeSpan&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="nf"&gt;FromSeconds&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="m"&gt;5&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="n"&gt;stoppingToken&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Technical terms demystified
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;At-Least-Once Delivery:&lt;/strong&gt; A guarantee that all messages will be delivered to their destination at least once, accepting the possibility of duplicates.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Database Transaction:&lt;/strong&gt; A set of operations executed as a single, atomic logical unit of work (all or nothing).&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Outbox Relay:&lt;/strong&gt; The component responsible for reading the database outbox table and reliably publishing the records to the network.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;link: &lt;a href="https://github.com/wesleyskap/onkai-unified-eventbus" rel="noopener noreferrer"&gt;https://github.com/wesleyskap/onkai-unified-eventbus&lt;/a&gt;&lt;/p&gt;

</description>
      <category>csharp</category>
      <category>microsservice</category>
      <category>outbox</category>
      <category>database</category>
    </item>
    <item>
      <title>Background message consumption: Designing a resilient hosted service</title>
      <dc:creator>Wesley Conde</dc:creator>
      <pubDate>Wed, 08 Apr 2026 14:42:00 +0000</pubDate>
      <link>https://dev.to/wesleyskap/background-message-consumption-designing-a-resilient-hosted-service-54ll</link>
      <guid>https://dev.to/wesleyskap/background-message-consumption-designing-a-resilient-hosted-service-54ll</guid>
      <description>&lt;h2&gt;
  
  
  The importance of isolated asynchronous processing
&lt;/h2&gt;

&lt;p&gt;In modern microservices, web servers handle fast, short-lived HTTP requests. However, consuming messages from queues or topics (such as RabbitMQ, Kafka, or NATS) requires a different runtime model: continuous and asynchronous listening. This message consumption loop cannot run on the same thread as web requests, otherwise, it would freeze the main web server.&lt;/p&gt;

&lt;p&gt;To solve this, &lt;strong&gt;onkai-unified-bus&lt;/strong&gt; implements a background execution engine (Worker/Background Service) that manages the message consumption lifecycle in an isolated and resilient manner.&lt;/p&gt;

&lt;h2&gt;
  
  
  Implementing a background consumption worker
&lt;/h2&gt;

&lt;p&gt;The background worker encapsulates the infinite listening loop of the transport broker. It boots alongside the application container startup and ensures correct handling of termination signals using cancellation contexts (&lt;code&gt;context.Context&lt;/code&gt;):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;package&lt;/span&gt; &lt;span class="n"&gt;main&lt;/span&gt;

&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="s"&gt;"context"&lt;/span&gt;
    &lt;span class="s"&gt;"log"&lt;/span&gt;
    &lt;span class="s"&gt;"sync"&lt;/span&gt;
    &lt;span class="s"&gt;"time"&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;MessageConsumer&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;driver&lt;/span&gt;    &lt;span class="n"&gt;Driver&lt;/span&gt;
    &lt;span class="n"&gt;topic&lt;/span&gt;     &lt;span class="kt"&gt;string&lt;/span&gt;
    &lt;span class="n"&gt;stopChan&lt;/span&gt;  &lt;span class="k"&gt;chan&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt;&lt;span class="p"&gt;{}&lt;/span&gt;
    &lt;span class="n"&gt;waitGroup&lt;/span&gt; &lt;span class="n"&gt;sync&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;WaitGroup&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;NewMessageConsumer&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;driver&lt;/span&gt; &lt;span class="n"&gt;Driver&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;MessageConsumer&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;MessageConsumer&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;driver&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;   &lt;span class="n"&gt;driver&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;    &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;stopChan&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="nb"&gt;make&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;chan&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt;&lt;span class="p"&gt;{}),&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c"&gt;// Start spawns the goroutine that consumes messages asynchronously&lt;/span&gt;
&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;mc&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;MessageConsumer&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;Start&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;mc&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;waitGroup&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Add&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="m"&gt;1&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;go&lt;/span&gt; &lt;span class="k"&gt;func&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="n"&gt;mc&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;waitGroup&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Done&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Printf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"[Consumer] Starting subscription on topic: %s"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;mc&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;mc&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;driver&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Subscribe&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;mc&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="k"&gt;func&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="n"&gt;Message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="c"&gt;// Simulates message processing logic&lt;/span&gt;
            &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Printf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"[Consumer] Processing message: %s"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ID&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
        &lt;span class="p"&gt;})&lt;/span&gt;

        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Printf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"[Consumer] Topic subscription error: %v"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}()&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c"&gt;// Stop gracefully stops the consumer, draining active processing execution&lt;/span&gt;
&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;mc&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;MessageConsumer&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;Stop&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="nb"&gt;close&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;mc&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;stopChan&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;mc&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;waitGroup&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Wait&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Println&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"[Consumer] Graceful shutdown completed."&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Failure handling and connection lifecycle
&lt;/h2&gt;

&lt;p&gt;A production background worker must not crash the application if the network connection with the broker flickers. The consumption layer integrates with the driver to automatically re-establish the subscription after disconnections:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Continuous physical connection monitoring.&lt;/li&gt;
&lt;li&gt;On-demand topology declaration (re-declaring queues if deleted).&lt;/li&gt;
&lt;li&gt;Cooperative cancellation with &lt;code&gt;context.Context&lt;/code&gt; during machine shutdowns.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Technical terms demystified
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Background Service:&lt;/strong&gt; A process executed in the background that performs continuous tasks (like reading a queue) without direct client interaction.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Graceful Drain:&lt;/strong&gt; The process of stopping the intake of new events and waiting for active messages to complete before terminating the process.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Cooperative Cancellation:&lt;/strong&gt; A pattern where parallel routines monitor a centralized signal (like a context or channel) to immediately stop execution.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;link: &lt;a href="https://github.com/wesleyskap/onkai-unified-bus" rel="noopener noreferrer"&gt;https://github.com/wesleyskap/onkai-unified-bus&lt;/a&gt;&lt;/p&gt;

</description>
      <category>csharp</category>
      <category>asynchronous</category>
      <category>rabbitmq</category>
      <category>kafka</category>
    </item>
    <item>
      <title>Asynchronous tracing: Propagating telemetry context across event boundaries</title>
      <dc:creator>Wesley Conde</dc:creator>
      <pubDate>Sat, 28 Mar 2026 11:15:00 +0000</pubDate>
      <link>https://dev.to/wesleyskap/asynchronous-tracing-propagating-telemetry-context-across-event-boundaries-2k9j</link>
      <guid>https://dev.to/wesleyskap/asynchronous-tracing-propagating-telemetry-context-across-event-boundaries-2k9j</guid>
      <description>&lt;h2&gt;
  
  
  The challenge of tracing asynchronous pipelines
&lt;/h2&gt;

&lt;p&gt;In traditional synchronous microservice architectures (like HTTP or gRPC), propagating trace contexts to assemble call trees is straightforward. Requests carry trace data directly inside standard headers.&lt;/p&gt;

&lt;p&gt;However, in event-driven systems, this execution flow is decoupled. A producer publishes a message to a broker and immediately finishes its task. Seconds or minutes later, a consumer pulls the message and processes it. How do you correlate the telemetry of the consumer with the original transaction initiated by the producer?&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;onkai-unified-bus&lt;/strong&gt; solves this by injecting and extracting W3C Trace Context headers transparently inside the physical metadata headers of every message.&lt;/p&gt;

&lt;h2&gt;
  
  
  Injecting and extracting trace context
&lt;/h2&gt;

&lt;p&gt;To ensure compatibility with industry-leading observability tools (such as OpenTelemetry, Jaeger, and Zipkin), the bus exposes injection and extraction adapters conforming to the W3C standard.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;import&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="s"&gt;"context"&lt;/span&gt;
    &lt;span class="s"&gt;"go.opentelemetry.io/otel"&lt;/span&gt;
    &lt;span class="s"&gt;"go.opentelemetry.io/otel/propagation"&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="c"&gt;// InjectTrace injects active trace context data into the message headers&lt;/span&gt;
&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;InjectTrace&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;Message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;propagator&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;otel&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;GetTextMapPropagator&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;carrier&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;propagation&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;MapCarrier&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Headers&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;propagator&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Inject&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;carrier&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="c"&gt;// ExtractTrace extracts trace context data from the message headers and returns a new context.Context&lt;/span&gt;
&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;ExtractTrace&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="n"&gt;Message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;propagator&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;otel&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;GetTextMapPropagator&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;carrier&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;propagation&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;MapCarrier&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Headers&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;propagator&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Extract&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;carrier&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  End-to-end visual correlation
&lt;/h2&gt;

&lt;p&gt;When a producer creates an event, the active Span ID is serialized into the message's &lt;code&gt;traceparent&lt;/code&gt; header. When the event reaches the transport driver (e.g. RabbitMQ), the consumer's middleware intercepts it, extracts the context using &lt;code&gt;ExtractTrace&lt;/code&gt;, and spawns a child Span under the original parent trace.&lt;/p&gt;

&lt;p&gt;Thanks to this mechanism, even if an event is deferred and processed long after it was generated, your APM dashboards will show the exact correlation graph across all distributed systems.&lt;/p&gt;

&lt;h3&gt;
  
  
  Technical terms demystified
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;W3C Trace Context:&lt;/strong&gt; A unified standard defining mandatory headers (&lt;code&gt;traceparent&lt;/code&gt;, &lt;code&gt;tracestate&lt;/code&gt;) to guarantee tracing interoperability across heterogeneous services.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Span ID:&lt;/strong&gt; A unique identifier representing a single logical unit of work (such as a database query or message handling block).&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Map Carrier:&lt;/strong&gt; A utility adapter that maps OpenTelemetry fields into standard string-to-string dictionary headers.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;link: &lt;a href="https://github.com/wesleyskap/onkai-unified-bus" rel="noopener noreferrer"&gt;https://github.com/wesleyskap/onkai-unified-bus&lt;/a&gt;&lt;/p&gt;

</description>
      <category>csharp</category>
      <category>tracing</category>
      <category>bus</category>
      <category>broker</category>
    </item>
    <item>
      <title>Error state machines:Designing dead-letter queues (DLQ) and jittered retries</title>
      <dc:creator>Wesley Conde</dc:creator>
      <pubDate>Fri, 27 Mar 2026 17:19:00 +0000</pubDate>
      <link>https://dev.to/wesleyskap/error-state-machinesdesigning-dead-letter-queues-dlq-and-jittered-retries-8im</link>
      <guid>https://dev.to/wesleyskap/error-state-machinesdesigning-dead-letter-queues-dlq-and-jittered-retries-8im</guid>
      <description>&lt;h2&gt;
  
  
  Transient vs. permanent errors
&lt;/h2&gt;

&lt;p&gt;When a consumer fails to process an event, the worst action is retrying it immediately and infinitely. If the error is transient (e.g., temporary network blip), rapid retries will overwhelm the failing downstream service. If the error is permanent (e.g., corrupted payload or schema validation failure), repeating the operation wastes CPU cycles and stalls progress.&lt;/p&gt;

&lt;p&gt;To handle this &lt;strong&gt;onkai-unified-bus&lt;/strong&gt; implements a robust error state machine combining &lt;strong&gt;Exponential Backoff with Jitter&lt;/strong&gt; retries and a &lt;strong&gt;Dead-Letter Queue (DLQ)&lt;/strong&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Exponential backoff with Jitter
&lt;/h2&gt;

&lt;p&gt;To prevent dozens of disconnected consumers from retrying or reconnecting at the exact same millisecond (the "thundering herd" effect), we inject random delay variance (Jitter) combined with an exponentially increasing interval between retries.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;CalculateBackoff&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;attempt&lt;/span&gt; &lt;span class="kt"&gt;int&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;baseDelay&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;maxDelay&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Duration&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Duration&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="c"&gt;// Exponential calculation: base * 2^attempt&lt;/span&gt;
    &lt;span class="n"&gt;backoff&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="kt"&gt;float64&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;baseDelay&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;math&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Pow&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="m"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;float64&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;attempt&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="n"&gt;duration&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Duration&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;backoff&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;duration&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;maxDelay&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;duration&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;maxDelay&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="c"&gt;// Add +/- 10% Jitter (random noise) to spread out retry load&lt;/span&gt;
    &lt;span class="n"&gt;jitter&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;rand&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Float64&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="m"&gt;0.2&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="m"&gt;0.1&lt;/span&gt;
    &lt;span class="n"&gt;duration&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Duration&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;float64&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;duration&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="m"&gt;1.0&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;jitter&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;duration&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Isolation via dead-letter queue (DLQ)
&lt;/h2&gt;

&lt;p&gt;If an event fails after reaching the maximum number of retry attempts (e.g., 5 attempts), it is classified as a permanent logical failure. To prevent stalling the rest of the queue, the event bus intercepts the offending message, attaches the error's stack trace to the message headers, and forwards it to a designated isolation queue called the &lt;strong&gt;Dead-Letter Queue (DLQ)&lt;/strong&gt;.&lt;/p&gt;

&lt;p&gt;From the DLQ, faulty messages can be audited manually or processed through administrative tools once the underlying business logic bug is fixed.&lt;/p&gt;

&lt;h3&gt;
  
  
  Technical terms demystified
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Dead-Letter Queue (DLQ):&lt;/strong&gt; A secondary queue dedicated to holding messages that couldn't be delivered or processed successfully by consumers.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Thundering Herd Effect:&lt;/strong&gt; A situation where many concurrent processes wake up at the same time to handle a single event, overwhelming system resources.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Jitter:&lt;/strong&gt; The deliberate introduction of small, random time variations to prevent processes from synchronizing their retries.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;link: &lt;a href="https://github.com/wesleyskap/onkai-unified-bus" rel="noopener noreferrer"&gt;https://github.com/wesleyskap/onkai-unified-bus&lt;/a&gt;&lt;/p&gt;

</description>
      <category>csharp</category>
      <category>errors</category>
      <category>transient</category>
      <category>jitter</category>
    </item>
    <item>
      <title>Decoupled drivers: Abstracting NATS, RabbitMQ, and Kafka with a unified interface</title>
      <dc:creator>Wesley Conde</dc:creator>
      <pubDate>Sun, 22 Mar 2026 17:05:00 +0000</pubDate>
      <link>https://dev.to/wesleyskap/decoupled-drivers-abstracting-nats-rabbitmq-and-kafka-with-a-unified-interface-98o</link>
      <guid>https://dev.to/wesleyskap/decoupled-drivers-abstracting-nats-rabbitmq-and-kafka-with-a-unified-interface-98o</guid>
      <description>&lt;h2&gt;
  
  
  The problem of broker coupling
&lt;/h2&gt;

&lt;p&gt;Many engineering teams start their projects by coupling their business logic directly to a specific messaging client library (such as RabbitMQ's amqp driver or Kafka's sarama client). As the ecosystem grows, needs shift: perhaps Kafka is overkill for small microservices and you want NATS, or you need disk durability that simple NATS setups lack.&lt;/p&gt;

&lt;p&gt;Migrating when your business logic is heavily coupled with the specific types and SDK methods of a particular broker is a tedious, error-prone refactoring nightmare.&lt;/p&gt;

&lt;p&gt;To prevent this, &lt;strong&gt;onkai-unified-bus&lt;/strong&gt; introduces a structured driver abstraction layer behind clean Go interfaces.&lt;/p&gt;

&lt;h2&gt;
  
  
  Defining the unified contract
&lt;/h2&gt;

&lt;p&gt;The key to transport decoupling is establishing cohesive behavioral contracts. The unified interface exposes the minimum common denominator of requirements for asynchronous messaging:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;Message&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;ID&lt;/span&gt;      &lt;span class="kt"&gt;string&lt;/span&gt;
    &lt;span class="n"&gt;Payload&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;&lt;span class="kt"&gt;byte&lt;/span&gt;
    &lt;span class="n"&gt;Headers&lt;/span&gt; &lt;span class="k"&gt;map&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;Driver&lt;/span&gt; &lt;span class="k"&gt;interface&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;Connect&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;
    &lt;span class="n"&gt;Publish&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="n"&gt;Message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;
    &lt;span class="n"&gt;Subscribe&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;handler&lt;/span&gt; &lt;span class="k"&gt;func&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;msg&lt;/span&gt; &lt;span class="n"&gt;Message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;
    &lt;span class="n"&gt;Close&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Managing physical connections and reconnections
&lt;/h2&gt;

&lt;p&gt;Each concrete driver (e.g. &lt;code&gt;RabbitMQDriver&lt;/code&gt;, &lt;code&gt;NATSDriver&lt;/code&gt;) implements the interface above and takes full responsibility for managing its physical network connections.&lt;/p&gt;

&lt;p&gt;For instance, the RabbitMQ driver listens to channel closed events (&lt;code&gt;amqp.Channel.NotifyClose&lt;/code&gt;) in a background monitoring routine to automatically attempt reconnections on network dropouts, without exposing any of these internal mechanics to the application layers.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;d&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;RabbitMQDriver&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;handleReconnect&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Context&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;-&lt;/span&gt;&lt;span class="n"&gt;d&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;closeChan&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt;
        &lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;-&lt;/span&gt;&lt;span class="n"&gt;d&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;conn&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;NotifyClose&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;make&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;chan&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;amqp&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Println&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"RabbitMQ connection lost, attempting reconnect..."&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                &lt;span class="n"&gt;d&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;reconnect&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Technical terms demystified
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Go Interfaces:&lt;/strong&gt; Types that define method signatures (behaviors). Any struct that implements these methods implicitly satisfies the interface.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Message Broker:&lt;/strong&gt; A middleware agent that translates and routes messages across distributed components (e.g., RabbitMQ, NATS, Kafka).&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Decoupling:&lt;/strong&gt; The design practice of ensuring software components have little or no direct dependency on one another.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;link: &lt;a href="https://github.com/wesleyskap/onkai-unified-bus" rel="noopener noreferrer"&gt;https://github.com/wesleyskap/onkai-unified-bus&lt;/a&gt;&lt;/p&gt;

</description>
      <category>csharp</category>
      <category>messagebroker</category>
      <category>bus</category>
      <category>eventdriven</category>
    </item>
    <item>
      <title>Flow control and backpressure: Preventing memory saturation in event consumption</title>
      <dc:creator>Wesley Conde</dc:creator>
      <pubDate>Sat, 21 Mar 2026 12:23:00 +0000</pubDate>
      <link>https://dev.to/wesleyskap/flow-control-and-backpressure-preventing-memory-saturation-in-event-consumption-5cb0</link>
      <guid>https://dev.to/wesleyskap/flow-control-and-backpressure-preventing-memory-saturation-in-event-consumption-5cb0</guid>
      <description>&lt;h2&gt;
  
  
  The phenomenon of consumer saturation
&lt;/h2&gt;

&lt;p&gt;In any distributed or event-driven architecture, mismatching speeds between producers and consumers is inevitable. If your producer service generates 50,000 messages per second, but your downstream database or API integrated into the consumer can only handle 5,000 requests per second, system memory will quickly become a point of failure. Without structured flow control, local buffers will accumulate data indefinitely until an Out Of Memory (OOM) crash occurs.&lt;/p&gt;

&lt;p&gt;To shield applications against these traffic surges, &lt;strong&gt;onkai-unified-bus&lt;/strong&gt; implements robust &lt;strong&gt;Backpressure&lt;/strong&gt; policies and non-blocking circular buffers.&lt;/p&gt;

&lt;h2&gt;
  
  
  Control policies: Blocking vs. dropping
&lt;/h2&gt;

&lt;p&gt;When an event reception channel reaches its maximum capacity (saturation), the event bus offers three configurable policies:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt; &lt;strong&gt;Block:&lt;/strong&gt; The producer goroutine is temporarily paused when attempting to publish new events. This naturally throttles the production rate until consumers drain the queue.&lt;/li&gt;
&lt;li&gt; &lt;strong&gt;Drop Oldest:&lt;/strong&gt; The bus discards the oldest message in the queue to insert the new one, prioritizing fresh events and avoiding unbounded latency.&lt;/li&gt;
&lt;li&gt; &lt;strong&gt;Drop Newest:&lt;/strong&gt; The current event is discarded immediately, returning a saturation error to the caller.
&lt;/li&gt;
&lt;/ol&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;BackpressurePolicy&lt;/span&gt; &lt;span class="kt"&gt;int&lt;/span&gt;

&lt;span class="k"&gt;const&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;PolicyBlock&lt;/span&gt; &lt;span class="n"&gt;BackpressurePolicy&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="no"&gt;iota&lt;/span&gt;
    &lt;span class="n"&gt;PolicyDropOldest&lt;/span&gt;
    &lt;span class="n"&gt;PolicyDropNewest&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;b&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;Bus&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;Publish&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;&lt;span class="kt"&gt;byte&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;policy&lt;/span&gt; &lt;span class="n"&gt;BackpressurePolicy&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="n"&gt;b&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;inbox&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
    &lt;span class="k"&gt;default&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;
        &lt;span class="k"&gt;switch&lt;/span&gt; &lt;span class="n"&gt;policy&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="n"&gt;PolicyBlock&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;
            &lt;span class="n"&gt;b&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;inbox&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="c"&gt;// Blocks the producer goroutine until space is freed&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
        &lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="n"&gt;PolicyDropOldest&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;select&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;-&lt;/span&gt;&lt;span class="n"&gt;b&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;inbox&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="c"&gt;// Discards the oldest item in the queue&lt;/span&gt;
            &lt;span class="k"&gt;default&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
            &lt;span class="n"&gt;b&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;inbox&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="n"&gt;data&lt;/span&gt; &lt;span class="c"&gt;// Insert the new data&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
        &lt;span class="k"&gt;case&lt;/span&gt; &lt;span class="n"&gt;PolicyDropNewest&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;
            &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;ErrQueueSaturated&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Channel-based circular buffers
&lt;/h2&gt;

&lt;p&gt;By leveraging Go's native buffered channels, the event bus performs these checks extremely efficiently without complex locks or slow custom queue structures. Using a simple &lt;code&gt;select&lt;/code&gt; block without a default channel allows backpressure decisions to occur at the CPU level in nanoseconds.&lt;/p&gt;

&lt;h3&gt;
  
  
  Technical terms demystified
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Backpressure:&lt;/strong&gt; The resistance or force exerted in opposition to data flow, forcing the sending system to throttle its transmission rate.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Out Of Memory (OOM):&lt;/strong&gt; A critical state where the operating system terminates a process because it has consumed all available physical memory.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Buffered Channels:&lt;/strong&gt; Native Go queues with pre-allocated capacity that allow sending messages without blocking the sender until the buffer is full.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;link: &lt;a href="https://github.com/wesleyskap/onkai-unified-bus" rel="noopener noreferrer"&gt;https://github.com/wesleyskap/onkai-unified-bus&lt;/a&gt;&lt;/p&gt;

</description>
      <category>csharp</category>
      <category>eventdriven</category>
      <category>channels</category>
      <category>backpressure</category>
    </item>
    <item>
      <title>From custom to industry standard: Histogram percentiles and the OpenTelemetry semantic bridge</title>
      <dc:creator>Wesley Conde</dc:creator>
      <pubDate>Sat, 14 Mar 2026 12:05:00 +0000</pubDate>
      <link>https://dev.to/wesleyskap/from-custom-to-industry-standard-histogram-percentiles-and-the-opentelemetry-semantic-bridge-873</link>
      <guid>https://dev.to/wesleyskap/from-custom-to-industry-standard-histogram-percentiles-and-the-opentelemetry-semantic-bridge-873</guid>
      <description>&lt;h2&gt;
  
  
  The statistical illusion of the mean and the mathematics of percentiles
&lt;/h2&gt;

&lt;p&gt;Metrics calculated solely on the arithmetic mean lie heavily to reliability engineers. A web server can display an excellent average latency of 15ms, masking that 1% of its users face catastrophic delays of 5 seconds due to Garbage Collector pauses. To map long-tail quality of service, we must compute precise statistical percentiles (p50, p90, p99).&lt;/p&gt;

&lt;p&gt;Computing exact percentiles over time would require storing all physical measurements indefinitely in RAM, leading to memory leaks and resource exhaustion in production.&lt;/p&gt;

&lt;p&gt;To solve this problem, Orkai utilizes a memory-bounded structure called a &lt;strong&gt;latencyReservoir&lt;/strong&gt;. It stores a fixed sliding window of samples (capped at 2000 float64 entries) in a concurrency-safe manner. To compute a percentile (e.g. p99), we isolate the internal slice using a thread-safe copy (&lt;code&gt;copy(sorted, r.samples)&lt;/code&gt;) to avoid race conditions, sort the copied slice, and extract the value at the proportional index.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;r&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;latencyReservoir&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;extractPercentile&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;p&lt;/span&gt; &lt;span class="kt"&gt;float64&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;float64&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Lock&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Unlock&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="n"&gt;n&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="nb"&gt;len&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;samples&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;n&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="m"&gt;0&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="m"&gt;0.0&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="c"&gt;// Create an isolated copy for safe sorting without corrupting concurrent memory&lt;/span&gt;
    &lt;span class="n"&gt;sorted&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="nb"&gt;make&lt;/span&gt;&lt;span class="p"&gt;([]&lt;/span&gt;&lt;span class="kt"&gt;float64&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;n&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="nb"&gt;copy&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sorted&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;samples&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;sort&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Float64s&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;sorted&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;idx&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="kt"&gt;int&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;math&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Ceil&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="kt"&gt;float64&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;n&lt;/span&gt;&lt;span class="p"&gt;)))&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;idx&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="m"&gt;0&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;idx&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="m"&gt;0&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;idx&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;=&lt;/span&gt; &lt;span class="n"&gt;n&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;idx&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;n&lt;/span&gt; &lt;span class="o"&gt;-&lt;/span&gt; &lt;span class="m"&gt;1&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;sorted&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="n"&gt;idx&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Semantic bridges and double routing in openTelemetry
&lt;/h2&gt;

&lt;p&gt;Most modern companies have standardized their observability stacks using the CNCF's &lt;strong&gt;OpenTelemetry (OTel)&lt;/strong&gt; specification. However, refactoring every single legacy application to interface with the verbose OTel APIs is economically and technically unviable.&lt;/p&gt;

&lt;p&gt;In Orkai, we solved this by developing lightweight semantic adapters (&lt;code&gt;NewOTelTracer&lt;/code&gt; and &lt;code&gt;NewOTelMetrics&lt;/code&gt;). They cleanly translate calls from our lightweight unified facades into native OpenTelemetry SDK structures.&lt;/p&gt;

&lt;p&gt;Furthermore, we introduced &lt;strong&gt;Double Routing&lt;/strong&gt;: when a latency metric is registered, our bridge forwards it simultaneously to both the remote OpenTelemetry Collector and the local &lt;code&gt;InMemoryMetrics&lt;/code&gt; engine. This design allows you to keep scraping traditional &lt;code&gt;/metrics&lt;/code&gt; via Prometheus for fast, low-overhead local dashboards, without degrading CPU performance or doubling processing latency.&lt;/p&gt;

&lt;p&gt;[DIAGRAM_DOUBLE_ROUTING]&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;otelMetrics&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;mu&lt;/span&gt;          &lt;span class="n"&gt;sync&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Mutex&lt;/span&gt;
    &lt;span class="n"&gt;meter&lt;/span&gt;       &lt;span class="n"&gt;metric&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Meter&lt;/span&gt;
    &lt;span class="n"&gt;instruments&lt;/span&gt; &lt;span class="k"&gt;map&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="k"&gt;interface&lt;/span&gt;&lt;span class="p"&gt;{}&lt;/span&gt;
    &lt;span class="n"&gt;localEngine&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;InMemoryMetrics&lt;/span&gt; &lt;span class="c"&gt;// Primary local Prometheus collector&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;o&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;otelMetrics&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;RecordLatency&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;val&lt;/span&gt; &lt;span class="kt"&gt;float64&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;labels&lt;/span&gt; &lt;span class="k"&gt;map&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;o&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Lock&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="c"&gt;// 1. Primary local routing&lt;/span&gt;
    &lt;span class="n"&gt;o&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;localEngine&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;RecordLatency&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;val&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;labels&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;histogram&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;o&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;getOrCreateHistogram&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;o&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;mu&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Unlock&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;

    &lt;span class="c"&gt;// 2. Concurrent propagation to the native OpenTelemetry SDK&lt;/span&gt;
    &lt;span class="n"&gt;ctx&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Background&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;histogram&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Record&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ctx&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;val&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;metric&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;WithAttributes&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;convertLabels&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;labels&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;&lt;span class="o"&gt;...&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Technical terms explained
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Outliers:&lt;/strong&gt; Experimental measurements or telemetry records that deviate drastically from the typical operational envelope (e.g. a 10s database query in a cluster of 2ms requests).&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Percentile:&lt;/strong&gt; A statistical measure representing the threshold below which a given percentage of data falls. For instance, a p95 latency of 200ms means 95% of requests completed under 200ms.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Double Routing:&lt;/strong&gt; The technique of multiplexing a single stream of telemetry data to multiple independent endpoints concurrently without blocking the main execution path.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;link: &lt;a href="https://github.com/wesleyskap/orkai-observability" rel="noopener noreferrer"&gt;https://github.com/wesleyskap/orkai-observability&lt;/a&gt;&lt;/p&gt;

</description>
      <category>go</category>
      <category>observability</category>
      <category>opentelemetry</category>
      <category>performance</category>
    </item>
  </channel>
</rss>
