<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Eder Matos</title>
    <description>The latest articles on DEV Community by Eder Matos (@ederfmatos).</description>
    <link>https://dev.to/ederfmatos</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F660661%2Fa95a1a2b-17b8-44ca-8cd6-240e7956f6aa.jpg</url>
      <title>DEV Community: Eder Matos</title>
      <link>https://dev.to/ederfmatos</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/ederfmatos"/>
    <language>en</language>
    <item>
      <title>Implementando Transactional Outbox com Go, DynamoDB, MongoDB, Kafka e RabbitMq</title>
      <dc:creator>Eder Matos</dc:creator>
      <pubDate>Wed, 10 Jul 2024 22:26:06 +0000</pubDate>
      <link>https://dev.to/ederfmatos/implementando-transactional-outbox-com-go-dynamodb-e-mongodb-1kn3</link>
      <guid>https://dev.to/ederfmatos/implementando-transactional-outbox-com-go-dynamodb-e-mongodb-1kn3</guid>
      <description>&lt;p&gt;&lt;strong&gt;Introdução&lt;/strong&gt;&lt;/p&gt;

&lt;p&gt;O padrão Transactional Outbox é uma solução de arquitetura que ajuda a garantir a consistência de dados entre um banco de dados e um sistema de mensageria. Ele é especialmente útil em sistemas distribuídos onde é necessário garantir que uma mensagem seja enviada somente se a transação do banco de dados for bem-sucedida. Este artigo tem como objetivo introduzir esse padrão e mostrar como implementar utilizando a linguagem Go, Apacke Kafka, DynamoDB ou MongoDB, sem grandes esforços.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Teoria por Trás do Padrão Transactional Outbox&lt;/strong&gt;&lt;br&gt;
O padrão Transactional Outbox resolve o problema de consistência de dados em sistemas distribuídos. Quando uma transação envolve tanto uma atualização no banco de dados quanto o envio de uma mensagem para um sistema de mensageria, o padrão Transactional Outbox garante que essas duas operações sejam executadas de forma atômica. Ou seja, ambas são concluídas com sucesso ou nenhuma delas é concluída. Isso é alcançado ao primeiro salvar o evento em uma tabela no banco de dados e, posteriormente, ler e processar esses eventos para enviá-los ao sistema de mensageria.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Apache Kafka&lt;/strong&gt;&lt;br&gt;
Apache Kafka é uma plataforma de streaming de eventos distribuída que permite publicar, armazenar e consumir fluxos de registros em tempo real. Ele é frequentemente usado para construir pipelines de dados e sistemas de mensageria resilientes. Kafka é conhecido por sua alta taxa de transferência, baixa latência e capacidade de armazenamento durável.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Amazon DynamoDB&lt;/strong&gt;&lt;br&gt;
Amazon DynamoDB é um serviço de banco de dados NoSQL totalmente gerenciado que oferece desempenho previsível e alta escalabilidade. DynamoDB Streams é um recurso que captura todas as alterações feitas nas tabelas do DynamoDB, permitindo que aplicações consumam e processem essas alterações em tempo real. Isso é especialmente útil para implementar o padrão Transactional Outbox, pois permite que as mudanças sejam monitoradas e processadas de forma assíncrona.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;MongoDB&lt;/strong&gt;&lt;br&gt;
MongoDB é um banco de dados NoSQL orientado a documentos que utiliza documentos semelhantes a JSON com esquemas. Ele é altamente escalável e flexível, permitindo que desenvolvedores armazenem dados em estruturas complexas de maneira eficiente. MongoDB é amplamente utilizado para aplicações que exigem grande flexibilidade e desempenho em consultas, como aplicações web e móveis.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;MongoDB Change Streams&lt;/strong&gt;&lt;br&gt;
O MongoDB Change Streams permite que as aplicações recebam notificações em tempo real sobre mudanças em documentos e coleções no banco de dados. Isso é particularmente útil para construir sistemas reativos e pipelines de dados que precisam processar eventos à medida que eles ocorrem no banco de dados.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Streams&lt;/strong&gt;&lt;br&gt;
Streams são fluxos de dados contínuos que podem ser processados em tempo real. Em sistemas de mensageria e banco de dados, streams permitem a captura e processamento de eventos, como alterações em registros de banco de dados ou mensagens publicadas em um tópico. &lt;/p&gt;
&lt;h2&gt;
  
  
  Implementação Prática
&lt;/h2&gt;

&lt;p&gt;Vamos implementar uma aplicação simples em Go que utiliza o padrão Transactional Outbox para garantir a consistência de dados. Nessa aplicação, teremos um caso de uso onde iremos processar um pagamento. Para realizar essa ação, vamos receber os dados de pagamento, como número de cartão de crédito, data de expiração, CVV, nome do dono do cartão e um ID da ordem de compra. Nesta aplicação, o que vamos fazer é chamar um gateway de pagamento, para realizar o processamento do pagamento.&lt;/p&gt;

&lt;p&gt;Caso o gateway retorne com sucesso, vamos emitir um evento informando que o pagamento foi processado com sucesso. Caso o gateway retorne com erro, vamos emitir outro evento dizendo que ocorreu uma falha no pagamento, informando no evento o motivo da falha, literalmente a mensagem de erro. Para isso, vamos iniciar com o seguinte código.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;ProcessPaymentUseCase&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;eventEmitter&lt;/span&gt;   &lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Emitter&lt;/span&gt;
        &lt;span class="n"&gt;paymentGateway&lt;/span&gt; &lt;span class="n"&gt;payment&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Gateway&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="n"&gt;Input&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;PurchaseId&lt;/span&gt;         &lt;span class="kt"&gt;string&lt;/span&gt;
        &lt;span class="n"&gt;Amount&lt;/span&gt;             &lt;span class="kt"&gt;float64&lt;/span&gt;
        &lt;span class="n"&gt;CardNumber&lt;/span&gt;         &lt;span class="kt"&gt;string&lt;/span&gt;
        &lt;span class="n"&gt;CardHolderName&lt;/span&gt;     &lt;span class="kt"&gt;string&lt;/span&gt;
        &lt;span class="n"&gt;CardExpirationDate&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;
        &lt;span class="n"&gt;CardCVV&lt;/span&gt;            &lt;span class="kt"&gt;string&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;New&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;eventEmitter&lt;/span&gt; &lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Emitter&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;paymentGateway&lt;/span&gt; &lt;span class="n"&gt;payment&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Gateway&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;ProcessPaymentUseCase&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;ProcessPaymentUseCase&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;eventEmitter&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;eventEmitter&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;paymentGateway&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;paymentGateway&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;uc&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;ProcessPaymentUseCase&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;Execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input&lt;/span&gt; &lt;span class="n"&gt;Input&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;paymentInput&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;payment&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Input&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;CardNumber&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;         &lt;span class="n"&gt;input&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CardNumber&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;CardHolderName&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;     &lt;span class="n"&gt;input&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CardHolderName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;CardExpirationDate&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;input&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CardExpirationDate&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;CardCVV&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;            &lt;span class="n"&gt;input&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;CardCVV&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;Amount&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;             &lt;span class="n"&gt;input&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Amount&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;paymentOutput&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;uc&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;paymentGateway&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Pay&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;paymentInput&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;uc&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;eventEmitter&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Emit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;NewPaymentFailedEvent&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;PurchaseId&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;()))&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;uc&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;eventEmitter&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Emit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;NewPaymentProcessedEvent&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;PurchaseId&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;paymentOutput&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TransactionId&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Temos o nosso caso de uso, onde ele recebe duas dependências: a primeira é o EventEmitter e a segunda é o PaymentGateway. O EventEmitter é uma interface com a seguinte forma:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;
&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;Emitter&lt;/span&gt; &lt;span class="k"&gt;interface&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;Emit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Event&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;Event&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;ID&lt;/span&gt;      &lt;span class="kt"&gt;string&lt;/span&gt;            &lt;span class="s"&gt;`json:"id,omitempty"`&lt;/span&gt;
    &lt;span class="n"&gt;Name&lt;/span&gt;    &lt;span class="kt"&gt;string&lt;/span&gt;            &lt;span class="s"&gt;`json:"name,omitempty"`&lt;/span&gt;
    &lt;span class="n"&gt;Payload&lt;/span&gt; &lt;span class="k"&gt;map&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt; &lt;span class="s"&gt;`json:"payload,omitempty"`&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Já o PaymentGateway tem a seguinte forma:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;Input&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;CardNumber&lt;/span&gt;         &lt;span class="kt"&gt;string&lt;/span&gt;
        &lt;span class="n"&gt;CardHolderName&lt;/span&gt;     &lt;span class="kt"&gt;string&lt;/span&gt;
        &lt;span class="n"&gt;CardExpirationDate&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;
        &lt;span class="n"&gt;CardCVV&lt;/span&gt;            &lt;span class="kt"&gt;string&lt;/span&gt;
        &lt;span class="n"&gt;Amount&lt;/span&gt;             &lt;span class="kt"&gt;float64&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="n"&gt;Output&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;TransactionId&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="n"&gt;Gateway&lt;/span&gt; &lt;span class="k"&gt;interface&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;Pay&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;payment&lt;/span&gt; &lt;span class="n"&gt;Input&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;Output&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Para criar uma implementação fictícia, vamos criar duas implementações do PaymentGateway. Uma que aceita pagamentos acima de R$ 20,00 e outra que aceita pagamentos apenas abaixo de R$ 20,00, como o código a seguir:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;
&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;MasterCardPaymentGateway&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt;&lt;span class="p"&gt;{}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;m&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;MasterCardPaymentGateway&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;Pay&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input&lt;/span&gt; &lt;span class="n"&gt;payment&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Input&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;payment&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Output&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;input&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Amount&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;=&lt;/span&gt; &lt;span class="m"&gt;20&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;errors&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;New&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"amount too high"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;payment&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Output&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;TransactionId&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;uuid&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;NewString&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
    &lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;VisaPaymentGateway&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt;&lt;span class="p"&gt;{}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;v&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;VisaPaymentGateway&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;Pay&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input&lt;/span&gt; &lt;span class="n"&gt;payment&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Input&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;payment&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Output&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;input&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Amount&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="m"&gt;20&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;errors&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;New&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"amount too high"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;payment&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Output&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;TransactionId&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;uuid&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;NewString&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
    &lt;span class="p"&gt;},&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Então, como o intuito não é realmente enviar essa mensagem para o sistema de mensageria aqui, mas apenas salvar essa mensagem no banco de dados, temos uma implementação que faz exatamente isso. Simplesmente salvamos esse evento no banco. Utilizando o padrão repositório, temos o OutboxRepository, com duas implementações para este nosso artigo. A primeira utilizando MongoDB, a segunda utilizando DynamoDB. Abaixo, então, vemos o código tanto da implementação do EventEmitter como do OutboxRepository.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;OutboxEventEmitter&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;outboxRepository&lt;/span&gt; &lt;span class="n"&gt;repository&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;OutboxRepository&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;NewOutboxEventEmitter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;outboxRepository&lt;/span&gt; &lt;span class="n"&gt;repository&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;OutboxRepository&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;OutboxEventEmitter&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;OutboxEventEmitter&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;outboxRepository&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;outboxRepository&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;d&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;OutboxEventEmitter&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;Emit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Event&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Marshal&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;outbox&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;repository&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;NewOutbox&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ID&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;d&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;outboxRepository&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Save&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;outbox&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Outbox&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
    &lt;span class="n"&gt;Outbox&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;Id&lt;/span&gt;          &lt;span class="kt"&gt;string&lt;/span&gt;     &lt;span class="s"&gt;`json:"id" bson:"_id"`&lt;/span&gt;
        &lt;span class="n"&gt;Name&lt;/span&gt;        &lt;span class="kt"&gt;string&lt;/span&gt;     &lt;span class="s"&gt;`json:"name" bson:"name"`&lt;/span&gt;
        &lt;span class="n"&gt;Payload&lt;/span&gt;     &lt;span class="kt"&gt;string&lt;/span&gt;     &lt;span class="s"&gt;`json:"payload" bson:"payload"`&lt;/span&gt;
        &lt;span class="n"&gt;Status&lt;/span&gt;      &lt;span class="kt"&gt;string&lt;/span&gt;     &lt;span class="s"&gt;`json:"status" bson:"status"`&lt;/span&gt;
        &lt;span class="n"&gt;CreatedAt&lt;/span&gt;   &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Time&lt;/span&gt;  &lt;span class="s"&gt;`json:"created_at" bson:"created_at"`&lt;/span&gt;
        &lt;span class="n"&gt;ProcessedAt&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Time&lt;/span&gt; &lt;span class="s"&gt;`json:"processed_at" bson:"processed_at"`&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="n"&gt;OutboxRepository&lt;/span&gt; &lt;span class="k"&gt;interface&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;Save&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;outbox&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;Outbox&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;NewOutbox&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;payload&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;Outbox&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;Outbox&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;Id&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;        &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;Name&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;      &lt;span class="n"&gt;name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;   &lt;span class="n"&gt;payload&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;Status&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;    &lt;span class="s"&gt;"PENDING"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;CreatedAt&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Now&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Implementação do outbox repository com MongoDB&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;mongoOutboxRepository&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;collection&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;mongo&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Collection&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;NewMongoOutboxRepository&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;collection&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;mongo&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Collection&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;OutboxRepository&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;mongoOutboxRepository&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;r&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;mongoOutboxRepository&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;Save&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;outbox&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;Outbox&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;InsertOne&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TODO&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;outbox&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Implementação do outbox repository com DynamoDB&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;
&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;dynamoDBOutboxRepository&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;tableName&lt;/span&gt;    &lt;span class="kt"&gt;string&lt;/span&gt;
    &lt;span class="n"&gt;dynamoClient&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;dynamodb&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DynamoDB&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;NewDynamoDBOutboxRepository&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;tableName&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;dynamoClient&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;dynamodb&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DynamoDB&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;OutboxRepository&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;dynamoDBOutboxRepository&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;tableName&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;tableName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;dynamoClient&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;dynamoClient&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;r&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;dynamoDBOutboxRepository&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;Save&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;outbox&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;Outbox&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;item&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;dynamodbattribute&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;MarshalMap&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;outbox&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;input&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;dynamodb&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;PutItemInput&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;TableName&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;aws&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tableName&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="n"&gt;Item&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;      &lt;span class="n"&gt;item&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dynamoClient&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;PutItem&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;E com isso, temos o nosso serviço pronto. Dentro das suas responsabilidades, ele fez tudo o que estava proposto. Para verificarmos, temos o seguinte código de exemplo:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;main&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;outboxRepository&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;mongoOutboxRepository&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;outboxEventEmitter&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;NewOutboxEventEmitter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;outboxRepository&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;paymentGateway&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;gateway&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;VisaPaymentGateway&lt;/span&gt;&lt;span class="p"&gt;{}&lt;/span&gt;
    &lt;span class="n"&gt;processPayment&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;process_payment&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;New&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;outboxEventEmitter&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;paymentGateway&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;input&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;process_payment&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Input&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;PurchaseId&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;         &lt;span class="n"&gt;uuid&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;NewString&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt;
        &lt;span class="n"&gt;Amount&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;             &lt;span class="m"&gt;10&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;CardNumber&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;         &lt;span class="s"&gt;"1234123412341234"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;CardHolderName&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;     &lt;span class="s"&gt;"Any name"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;CardExpirationDate&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="s"&gt;"10/2024"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;CardCVV&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;            &lt;span class="s"&gt;"123"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;processPayment&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Execute&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;input&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;slog&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Payment process is failed"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;slog&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Info&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Payment process is done"&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Agora então, caso executemos o programa utilizando o MongoDB, podemos ver o registro inserido na coleção do MongoDB. O mesmo vale para o DynamoDB.&lt;/p&gt;

&lt;h2&gt;
  
  
  Envio das mensagens para o serviço de mensageria
&lt;/h2&gt;

&lt;p&gt;Agora, então, temos que ter alguma forma de ler esses dados da tabela e enviá-los realmente para o serviço de mensageria. Neste caso, vamos utilizar o Kafka e o RabbitMQ. Para isso, vamos criar uma outra aplicação que também terá o Outbox repositório. Terá um emissor de evento para o RabbitMQ e um para o Kafka, e terá que, de alguma forma, obter os dados tanto do MongoDB quanto do DynamoDB, dependendo da estratégia.&lt;/p&gt;

&lt;p&gt;Para isso, vamos definir uma interface chamada OutboxStream. Essa interface terá o método FetchEvents, que nos devolverá um canal do Go, onde, nesse canal, teremos o ID do registro que foi inserido ou do registro que deve ser processado neste momento. E, então, teremos uma implementação dessa interface para o MongoDBStreams e uma para o DynamoDBStreams.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;OutboxStream&lt;/span&gt; &lt;span class="k"&gt;interface&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;FetchEvents&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;chan&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Uma vez que temos este canal de eventos, podemos percorrer este canal e, para cada item que estiver no canal, podemos obter o registro que está no valor de dados a partir do repositório. Então, começamos o processamento, verificando se o registro já foi processado. Se já foi processado, simplesmente o ignoramos. Agora, se ele não foi processado ou foi processado com erro, seguimos com o processamento.&lt;/p&gt;

&lt;p&gt;O processamento consiste basicamente em obter os dados do evento que estão no próprio registro, no campo payload, e, a partir desses dados, enviar ao sistema de mensageria normalmente. Caso o sistema de mensageria retorne algum erro, marcamos esse registro como falho, com erro, e salvamos novamente no banco para ser processado depois. Caso o envio da mensagem para o sistema de mensageria tenha sido bem-sucedido, marcamos esse registro como processado e salvamos no banco de dados.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;main&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;eventEmitter&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;NewRabbitMqEventEmitter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;RabbitMqServer&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;outboxRepository&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;outboxStream&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;mongoOutbox&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;outboxHandler&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;NewOutboxHandler&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;outboxRepository&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;eventEmitter&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;outboxStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;FetchEvents&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nb"&gt;panic&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="k"&gt;range&lt;/span&gt; &lt;span class="n"&gt;events&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;outbox&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;outboxRepository&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Get&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;continue&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="n"&gt;outboxHandler&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Handle&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;outbox&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;Struct responsável pelo processamento do registro&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;
&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;OutboxHandler&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;outboxRepository&lt;/span&gt; &lt;span class="n"&gt;OutboxRepository&lt;/span&gt;
    &lt;span class="n"&gt;eventEmitter&lt;/span&gt;     &lt;span class="n"&gt;EventEmitter&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;NewOutboxHandler&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;outboxRepository&lt;/span&gt; &lt;span class="n"&gt;OutboxRepository&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;eventEmitter&lt;/span&gt; &lt;span class="n"&gt;EventEmitter&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;OutboxHandler&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;OutboxHandler&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;outboxRepository&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;outboxRepository&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;eventEmitter&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;eventEmitter&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;handler&lt;/span&gt; &lt;span class="n"&gt;OutboxHandler&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;Handle&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;outbox&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;Outbox&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;outbox&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="o"&gt;||&lt;/span&gt; &lt;span class="n"&gt;outbox&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Status&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="s"&gt;"PROCESSED"&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="n"&gt;messageEvent&lt;/span&gt; &lt;span class="n"&gt;Event&lt;/span&gt;
    &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Unmarshal&lt;/span&gt;&lt;span class="p"&gt;([]&lt;/span&gt;&lt;span class="kt"&gt;byte&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;outbox&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Payload&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;messageEvent&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;_&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;handler&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;outboxRepository&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Update&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;outbox&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="n"&gt;slog&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Error unmarshalling message event: "&lt;/span&gt; &lt;span class="o"&gt;+&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;handler&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;eventEmitter&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Emit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;messageEvent&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;outbox&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;MarkAsError&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
        &lt;span class="n"&gt;_&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;handler&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;outboxRepository&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Update&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;outbox&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;outbox&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;MarkAsProcessed&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="n"&gt;_&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;handler&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;outboxRepository&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Update&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;outbox&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Portanto, agora, a única coisa que nos resta é criar a implementação para o Outbox Stream, tanto para o DynamoDB Stream quanto para o MongoDB Stream. Para isso, vamos começar, primeiramente, com o DynamoDB.&lt;/p&gt;

&lt;h2&gt;
  
  
  DynamoDB Streams
&lt;/h2&gt;

&lt;p&gt;Aqui, basicamente, temos que entender, então, como o SDK da AWS para Go nos permite consumir os dados do stream. Abaixo, temos o código para consumir esses dados do stream&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;DynamoStream&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;dynamoStreamClient&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;dynamodbstreams&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DynamoDBStreams&lt;/span&gt;
    &lt;span class="n"&gt;awsSession&lt;/span&gt;         &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Session&lt;/span&gt;
    &lt;span class="n"&gt;tableName&lt;/span&gt;          &lt;span class="kt"&gt;string&lt;/span&gt;
    &lt;span class="n"&gt;dynamoDB&lt;/span&gt;           &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;dynamodb&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DynamoDB&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;NewDynamoStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;awsSession&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;session&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Session&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;tableName&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;dynamoDB&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;dynamodb&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DynamoDB&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;OutboxStream&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;DynamoStream&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;dynamoStreamClient&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;dynamodbstreams&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;New&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;awsSession&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="n"&gt;dynamoDB&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;           &lt;span class="n"&gt;dynamoDB&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;awsSession&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;         &lt;span class="n"&gt;awsSession&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;tableName&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;          &lt;span class="n"&gt;tableName&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stream&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;DynamoStream&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;getStreamArn&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dynamoDB&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DescribeTable&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;dynamodb&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DescribeTableInput&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;TableName&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;aws&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tableName&lt;/span&gt;&lt;span class="p"&gt;)})&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="s"&gt;""&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Table&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;StreamSpecification&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Table&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;StreamSpecification&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;StreamEnabled&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Table&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;LatestStreamArn&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="s"&gt;""&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;fmt&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Errorf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"streams not enabled for table %s"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;tableName&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stream&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;DynamoStream&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;FetchEvents&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;chan&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;streamArn&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;getStreamArn&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;events&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="nb"&gt;make&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;chan&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;describeStreamInput&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;dynamodbstreams&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DescribeStreamInput&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;StreamArn&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;aws&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;streamArn&lt;/span&gt;&lt;span class="p"&gt;)}&lt;/span&gt;
    &lt;span class="n"&gt;describeStreamOutput&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dynamoStreamClient&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DescribeStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;describeStreamInput&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;shard&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="k"&gt;range&lt;/span&gt; &lt;span class="n"&gt;describeStreamOutput&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;StreamDescription&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Shards&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;go&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;processShard&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;shard&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ShardId&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;streamArn&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;events&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stream&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;DynamoStream&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;processShard&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;shardID&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;events&lt;/span&gt; &lt;span class="k"&gt;chan&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;streamArn&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;shardIteratorInput&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;dynamodbstreams&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;GetShardIteratorInput&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;StreamArn&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;         &lt;span class="n"&gt;aws&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;streamArn&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="n"&gt;ShardId&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;           &lt;span class="n"&gt;aws&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;shardID&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
        &lt;span class="n"&gt;ShardIteratorType&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;aws&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;String&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dynamodbstreams&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ShardIteratorTypeTrimHorizon&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="n"&gt;shardIteratorOutput&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dynamoStreamClient&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;GetShardIterator&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;shardIteratorInput&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="n"&gt;ShardIterator&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;shardIteratorOutput&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ShardIterator&lt;/span&gt;
    &lt;span class="n"&gt;backoff&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Second&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;getRecordsInput&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;dynamodbstreams&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;GetRecordsInput&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;ShardIterator&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;ShardIterator&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;dynamoStreamClient&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;GetRecords&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;getRecordsInput&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="k"&gt;continue&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;

        &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;_&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="k"&gt;range&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Records&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Dynamodb&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;NewImage&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;S&lt;/span&gt;
            &lt;span class="n"&gt;status&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Dynamodb&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;NewImage&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"status"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;S&lt;/span&gt;
            &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;EventName&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="s"&gt;"INSERT"&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="n"&gt;backoff&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Second&lt;/span&gt;
                &lt;span class="n"&gt;events&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;EventName&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="s"&gt;"MODIFY"&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&amp;amp;&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;status&lt;/span&gt; &lt;span class="o"&gt;==&lt;/span&gt; &lt;span class="s"&gt;"ERROR"&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                &lt;span class="n"&gt;backoff&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Second&lt;/span&gt;
                &lt;span class="k"&gt;go&lt;/span&gt; &lt;span class="k"&gt;func&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
                    &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="m"&gt;5&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Second&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
                    &lt;span class="n"&gt;events&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="n"&gt;id&lt;/span&gt;
                &lt;span class="p"&gt;}(&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="n"&gt;ShardIterator&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;records&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;NextShardIterator&lt;/span&gt;
        &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;backoff&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;backoff&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="m"&gt;30&lt;/span&gt;&lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Second&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;backoff&lt;/span&gt; &lt;span class="o"&gt;*=&lt;/span&gt; &lt;span class="m"&gt;2&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt; &lt;span class="k"&gt;else&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;backoff&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="m"&gt;30&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Second&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Analisando o código, primeiramente recebemos o nome da tabela do DynamoDB e, a partir disso, obtemos o ARN do stream. Para isso, temos que habilitar o uso do stream dentro do DynamoDB. A partir daí, temos basicamente um loop infinito que fica obtendo novos registros que podem existir no DynamoDB Stream. Podemos percorrer esses registros obtendo informações sobre o registro em si.&lt;/p&gt;

&lt;p&gt;No nosso caso, temos o ID do registro e o status. Além disso, podemos obter a informação sobre o que aconteceu com o registro, tendo as opções de Insert, Modify e Remove. No nosso caso, sempre que um novo registro for inserido, devemos pegar o ID desse registro e adicioná-lo ao nosso canal. E caso o registro tenha sido modificado para o status de Error, também devemos pegar o ID desse registro e inseri-lo no nosso canal.&lt;/p&gt;

&lt;p&gt;Um detalhe é que, sempre que um registro for atualizado para o status de Error, neste nosso evento, eu optei por esperar 5 segundos fixos antes de uma nova tentativa, mas podemos evoluir para estratégias de retentativas mais elaboradas.&lt;/p&gt;

&lt;p&gt;Mas para também implementar uma regra de busca, como estamos em um loop infinito, fazer essa consulta sem um delay pode fazer com que a própria AWS nos bloqueie por questões de rate limit. Então optei por utilizar uma estratégia de back-off, onde inicialmente temos um delay de um segundo, e a cada iteração que não encontra nenhum dado no stream, multiplicamos esse delay por dois.&lt;/p&gt;

&lt;p&gt;Na primeira iteração o delay será de um segundo, depois dois, quatro, oito, dezesseis, trinta e dois. Esse é o tempo que será esperado em segundos. Assim que esse valor ultrapassar trinta segundos, então mantemos um back-off fixo de trinta segundos. Portanto, vamos esperar exponencialmente, com um tempo máximo de trinta segundos.&lt;/p&gt;

&lt;h2&gt;
  
  
  MongoDB Streams
&lt;/h2&gt;

&lt;p&gt;Agora, para o MongoDB, podemos utilizar o método Watch vindo da Collection para sermos notificados sempre que algo acontecer. Esse "algo" nós definimos como filtro do Watch através de uma Pipeline do MongoDB. No nosso caso, vamos tratar de duas Pipelines.&lt;/p&gt;

&lt;p&gt;A primeira Pipeline é onde o tipo da operação seja Insert. Então, sempre que houver a inserção de um novo registro nessa Collection, vamos ser notificados. A segunda Pipeline é onde o tipo de operação seja Update e o status do registro que foi atualizado seja Error. Isso indica que o registro foi atualizado para Error, e então vamos ser notificados também. Seguimos a mesma regra que temos na implementação do DynamoDB, esperando 5 segundos no caso de Error antes de mandar para o processamento.&lt;/p&gt;

&lt;p&gt;Além disso, ao contrário do DynamoDB Streams, o MongoDB Streams não reprocessa todos os eventos que já aconteceram, apenas novos eventos ou novas operações. Isso significa que dados que estavam esperando para serem processados no banco antes da aplicação subir não serão levados em consideração pelo Stream.&lt;/p&gt;

&lt;p&gt;Portanto, assim que a aplicação subir, vamos fazer uma consulta na tabela buscando pelos registros que não foram processados ainda. Esses registros serão então adicionados ao canal.&lt;/p&gt;

&lt;p&gt;Com isso, temos o código a seguir:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;MongoStream&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;collection&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;mongo&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Collection&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;NewMongoStream&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;collection&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;mongo&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Collection&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;MongoStream&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;MongoStream&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stream&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;MongoStream&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;FetchEvents&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;chan&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;ch&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="nb"&gt;make&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;chan&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;go&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consumeExistingEvents&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ch&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;go&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consumeErrorEvents&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ch&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;go&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;consumeNewEvents&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ch&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;ch&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stream&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;MongoStream&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;consumeExistingEvents&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ch&lt;/span&gt; &lt;span class="k"&gt;chan&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;cursor&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Find&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TODO&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;bson&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;M&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s"&gt;"status"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;bson&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;M&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s"&gt;"$ne"&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="s"&gt;"PROCESSED"&lt;/span&gt;&lt;span class="p"&gt;}})&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Fatalf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Failed to find existing events: %v"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="n"&gt;cursor&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Close&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TODO&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;cursor&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Next&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TODO&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="n"&gt;outbox&lt;/span&gt; &lt;span class="n"&gt;Outbox&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;cursor&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Decode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;outbox&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Printf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Failed to decode existing outbox: %v"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;continue&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="n"&gt;ch&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="n"&gt;outbox&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Id&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;cursor&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Err&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Printf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Cursor error: %v"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stream&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;MongoStream&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;consumeNewEvents&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ch&lt;/span&gt; &lt;span class="k"&gt;chan&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;pipeline&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;mongo&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Pipeline&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;bson&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;D&lt;/span&gt;&lt;span class="p"&gt;{{&lt;/span&gt;&lt;span class="s"&gt;"$match"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;bson&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;D&lt;/span&gt;&lt;span class="p"&gt;{{&lt;/span&gt;&lt;span class="s"&gt;"operationType"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"insert"&lt;/span&gt;&lt;span class="p"&gt;}}}}}&lt;/span&gt;
    &lt;span class="n"&gt;opts&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;options&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ChangeStream&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;SetFullDocument&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;options&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;UpdateLookup&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;changeStream&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Watch&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TODO&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;pipeline&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;opts&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Fatalf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Failed to start change stream: %v"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="n"&gt;changeStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Close&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TODO&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
    &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="nb"&gt;close&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ch&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;changeStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Next&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TODO&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="n"&gt;changeEvent&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;DocumentKey&lt;/span&gt; &lt;span class="n"&gt;primitive&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;M&lt;/span&gt; &lt;span class="s"&gt;`bson:"documentKey,omitempty"`&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;changeStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Decode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;changeEvent&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Printf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Failed to decode change stream document: %v"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;continue&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="n"&gt;ch&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="n"&gt;changeEvent&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DocumentKey&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"_id"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;changeStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Err&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Printf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Change stream error: %v"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;stream&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;MongoStream&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;consumeErrorEvents&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ch&lt;/span&gt; &lt;span class="k"&gt;chan&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;pipeline&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;mongo&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Pipeline&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;bson&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;D&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s"&gt;"$match"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;bson&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;D&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s"&gt;"operationType"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"update"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
            &lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="s"&gt;"fullDocument.status"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"ERROR"&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
        &lt;span class="p"&gt;}},&lt;/span&gt;
    &lt;span class="p"&gt;}}&lt;/span&gt;
    &lt;span class="n"&gt;opts&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;options&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ChangeStream&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;SetFullDocument&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;options&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;UpdateLookup&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="n"&gt;changeStream&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;stream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;collection&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Watch&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TODO&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;pipeline&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;opts&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Fatalf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Failed to start change stream: %v"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="n"&gt;changeStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Close&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TODO&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt;
    &lt;span class="k"&gt;defer&lt;/span&gt; &lt;span class="nb"&gt;close&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;ch&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;

    &lt;span class="k"&gt;for&lt;/span&gt; &lt;span class="n"&gt;changeStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Next&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TODO&lt;/span&gt;&lt;span class="p"&gt;())&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="k"&gt;var&lt;/span&gt; &lt;span class="n"&gt;changeEvent&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;DocumentKey&lt;/span&gt; &lt;span class="n"&gt;primitive&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;M&lt;/span&gt; &lt;span class="s"&gt;`bson:"documentKey,omitempty"`&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;changeStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Decode&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;changeEvent&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Printf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Failed to decode change stream document: %v"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="k"&gt;continue&lt;/span&gt;
        &lt;span class="p"&gt;}&lt;/span&gt;
        &lt;span class="k"&gt;go&lt;/span&gt; &lt;span class="k"&gt;func&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Sleep&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;time&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Second&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="m"&gt;5&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
            &lt;span class="n"&gt;ch&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;-&lt;/span&gt; &lt;span class="n"&gt;changeEvent&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;DocumentKey&lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="s"&gt;"_id"&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="p"&gt;}()&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;changeStream&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Err&lt;/span&gt;&lt;span class="p"&gt;();&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;log&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Printf&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Change stream error: %v"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;E por fim, para fecharmos o ciclo, temos a implementação para o envio do evento tanto para o Kafka quanto para o RabbitMQ.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;KafkaEventEmitter&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;KafkaEventEmitter&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;writer&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;kafka&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Writer&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;NewKafkaEventEmitter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;brokers&lt;/span&gt; &lt;span class="p"&gt;[]&lt;/span&gt;&lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;topic&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;KafkaEventEmitter&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;KafkaEventEmitter&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;writer&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;kafka&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Writer&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
            &lt;span class="n"&gt;Addr&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;     &lt;span class="n"&gt;kafka&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;TCP&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;brokers&lt;/span&gt;&lt;span class="o"&gt;...&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
            &lt;span class="n"&gt;Topic&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;    &lt;span class="n"&gt;topic&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
            &lt;span class="n"&gt;Balancer&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;kafka&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;LeastBytes&lt;/span&gt;&lt;span class="p"&gt;{},&lt;/span&gt;
        &lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;k&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;KafkaEventEmitter&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;Emit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;Event&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;eventBytes&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Marshal&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;slog&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Error on emit event"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"event"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"error"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;kafka&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Message&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;Value&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;eventBytes&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;Topic&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;Key&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;   &lt;span class="p"&gt;[]&lt;/span&gt;&lt;span class="kt"&gt;byte&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;ID&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;k&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;writer&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;WriteMessages&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;context&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Background&lt;/span&gt;&lt;span class="p"&gt;(),&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;strong&gt;RabbitMqEventEmitter&lt;/strong&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight go"&gt;&lt;code&gt;&lt;span class="k"&gt;type&lt;/span&gt; &lt;span class="n"&gt;RabbitMqEventEmitter&lt;/span&gt; &lt;span class="k"&gt;struct&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;connection&lt;/span&gt;      &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;amqp&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Connection&lt;/span&gt;
    &lt;span class="n"&gt;producerChannel&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;amqp&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Channel&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="n"&gt;NewRabbitMqEventEmitter&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;server&lt;/span&gt; &lt;span class="kt"&gt;string&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;EventEmitter&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;connection&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;amqp&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Dial&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;server&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nb"&gt;panic&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;producerChannel&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;connection&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Channel&lt;/span&gt;&lt;span class="p"&gt;()&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="nb"&gt;panic&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="o"&gt;&amp;amp;&lt;/span&gt;&lt;span class="n"&gt;RabbitMqEventEmitter&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;connection&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt;      &lt;span class="n"&gt;connection&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;producerChannel&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;producerChannel&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;

&lt;span class="k"&gt;func&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;RabbitMqEventEmitter&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="n"&gt;Emit&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt;&lt;span class="n"&gt;Event&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="kt"&gt;error&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
    &lt;span class="n"&gt;eventBytes&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;:=&lt;/span&gt; &lt;span class="n"&gt;json&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Marshal&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;slog&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Error on emit event"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"event"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"error"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;producerChannel&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Publish&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;
        &lt;span class="s"&gt;"amq.direct"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="no"&gt;false&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="no"&gt;false&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
        &lt;span class="n"&gt;amqp&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Publishing&lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="n"&gt;ContentType&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="s"&gt;"text/plain"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;Body&lt;/span&gt;&lt;span class="o"&gt;:&lt;/span&gt; &lt;span class="n"&gt;eventBytes&lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;
    &lt;span class="p"&gt;)&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt; &lt;span class="o"&gt;!=&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt; &lt;span class="p"&gt;{&lt;/span&gt;
        &lt;span class="n"&gt;slog&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="n"&gt;Error&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"Error on publish event"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"event"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;event&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"error"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
        &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;err&lt;/span&gt;
    &lt;span class="p"&gt;}&lt;/span&gt;
    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="no"&gt;nil&lt;/span&gt;
&lt;span class="p"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;E com isso, finalizamos a aplicação que irá fazer o processamento dos eventos que foram inseridos na nossa tabela. Em caso de falha na publicação no serviço de mensageria, o evento permanece com status de erro para ser processado posteriormente. Podemos ainda definir estratégias para um número limite de tentativas de publicação do evento e, caso esse número seja atingido, emitir notificações via e-mail ou algum alerta em um sistema de monitoramento para que uma equipe veja o motivo do erro.&lt;/p&gt;

&lt;p&gt;Pensando em uma grande empresa, pode ser desenvolvido um SDK interno para abstrair a publicação do evento, que no nosso caso é o envio do evento para uma tabela, para que todos os micro-serviços não precisem conhecer os detalhes do DynamoDB, MongoDB ou do lugar onde realmente está sendo salvo.&lt;/p&gt;

&lt;h2&gt;
  
  
  Evoluções
&lt;/h2&gt;

&lt;p&gt;Uma evolução interessante a ser feita é no próprio registro do evento, tendo também dados referentes ao serviço de mensageria específico. Por exemplo, se estamos utilizando um tópico no Kafka, a informação do nome do tópico poderia estar no evento. Ou, se estamos utilizando o RabbitMQ, no evento poderíamos ter o nome da Exchange e a routing key, que seriam utilizadas para o envio da mensagem. Se o intuito da mensagem é ser publicada no SQS, por exemplo, podemos ter o ARN ou qualquer informação que identifique a fila SQS onde a mensagem deve ser publicada.&lt;/p&gt;

&lt;p&gt;Neste artigo abordamos isso de forma bem simples, onde o processador identifica qual o destino da mensagem, mas poderíamos evoluir para que essas informações estivessem no registro que foi salvo no banco de dados também.&lt;/p&gt;

&lt;h2&gt;
  
  
  Conclusão
&lt;/h2&gt;

&lt;p&gt;O uso do padrão Transactional Outbox traz uma segurança muito maior para a resiliência das transações. Com o uso deste padrão, temos a opção de manter um registro de todos os eventos que ocorreram, o que pode ser extremamente útil para fins de auditoria.&lt;/p&gt;

&lt;p&gt;A capacidade de armazenar de maneira consistente os eventos e garantir que eles sejam processados corretamente pelos sistemas de mensageria contribui para a robustez do sistema. Além disso, esse padrão permite a recuperação e reprocessamento de eventos em caso de falhas temporárias, aumentando a confiabilidade da aplicação.&lt;/p&gt;

&lt;p&gt;A escolha entre DynamoDB, MongoDB ou qualquer outro serviço para implementar esse padrão pode depender de vários fatores, incluindo a familiaridade da equipe com a tecnologia, requisitos de escalabilidade e o ecossistema ao redor. Ambas as soluções oferecem mecanismos de streaming que podem ser utilizados para garantir a consistência de dados e facilitar a construção de sistemas resilientes.&lt;/p&gt;

&lt;p&gt;Espero que este artigo tenha fornecido uma visão clara e detalhada sobre como implementar o padrão Transactional Outbox, juntamente com exemplos de código e considerações práticas para diferentes cenários de uso.&lt;/p&gt;

&lt;p&gt;&lt;a href="https://github.com/ederfmatos/transactional-outbox" rel="noopener noreferrer"&gt;Repositório do Github&lt;/a&gt;&lt;/p&gt;

</description>
      <category>microservices</category>
      <category>dynamodb</category>
      <category>mongodb</category>
      <category>transactionaloutbox</category>
    </item>
  </channel>
</rss>
