<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Kafka BR</title>
    <description>The latest articles on DEV Community by Kafka BR (@kafkabr).</description>
    <link>https://dev.to/kafkabr</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Forganization%2Fprofile_image%2F2067%2F9ea29162-4d69-4692-b35b-dd86a00b6b5d.png</url>
      <title>DEV Community: Kafka BR</title>
      <link>https://dev.to/kafkabr</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/kafkabr"/>
    <language>en</language>
    <item>
      <title>Cotas de Cliente no Apache Kafka® 2.6.0</title>
      <dc:creator>Fabio José</dc:creator>
      <pubDate>Mon, 14 Sep 2020 08:53:39 +0000</pubDate>
      <link>https://dev.to/kafkabr/cotas-de-cliente-no-apache-kafka-2-6-0-o61</link>
      <guid>https://dev.to/kafkabr/cotas-de-cliente-no-apache-kafka-2-6-0-o61</guid>
      <description>&lt;h3&gt;
  
  
  Artigo publicado no Blog oficial Kafka BR
&lt;/h3&gt;

&lt;p&gt;Acesse: &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://blog.kafkabr.com/posts/kafka-260-client-quotas/"&gt;https://blog.kafkabr.com/posts/kafka-260-client-quotas/&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>kafka</category>
      <category>apachekafka</category>
      <category>kafkabr</category>
    </item>
    <item>
      <title>Transações no Apache Kafka®</title>
      <dc:creator>Fabio José</dc:creator>
      <pubDate>Mon, 14 Sep 2020 08:51:44 +0000</pubDate>
      <link>https://dev.to/kafkabr/transacoes-no-apache-kafka-1ik3</link>
      <guid>https://dev.to/kafkabr/transacoes-no-apache-kafka-1ik3</guid>
      <description>&lt;h2&gt;
  
  
  Artigo publicado no Blog oficial Kafka BR.
&lt;/h2&gt;

&lt;p&gt;Acesse:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://blog.kafkabr.com/posts/transacoes"&gt;https://blog.kafkabr.com/posts/transacoes&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>kafka</category>
      <category>kafkabr</category>
      <category>java</category>
    </item>
    <item>
      <title>Kafka: produtores idempotentes</title>
      <dc:creator>Fabio José</dc:creator>
      <pubDate>Thu, 28 May 2020 01:25:58 +0000</pubDate>
      <link>https://dev.to/kafkabr/kafka-produtores-idempotentes-12ok</link>
      <guid>https://dev.to/kafkabr/kafka-produtores-idempotentes-12ok</guid>
      <description>&lt;p&gt;&lt;em&gt;"...&lt;a href="https://pt.m.wikipedia.org/wiki/Idempot%C3%AAncia"&gt;idempotência&lt;/a&gt; é a propriedade que algumas operações têm de poderem ser aplicadas várias vezes sem que o valor do resultado se altere após a aplicação inicial."&lt;/em&gt;&lt;/p&gt;




&lt;p&gt;Através de uma configuração simples no produtor Kafka, pode-se alcançar a idempotência na produção de eventos, eliminando completamente a possibilidade de duplicatas nos tópicos.&lt;/p&gt;




&lt;p&gt;Uma das grandes qualidades do Kafka é a possibilidade de configurar a confiabilidade no produtor, que permite ajustes finos para atender com maestria aos requisitos não-funcionais dos casos de uso.&lt;/p&gt;

&lt;p&gt;Uma dessas possibilidades é o produtor idempotente. Com ele existe a garantia de que não haverá duplicatas em nível de partição.&lt;/p&gt;

&lt;p&gt;Essa garantia entra em ação quando a seguinte propriedade é definida como &lt;code&gt;true&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight properties"&gt;&lt;code&gt;&lt;span class="py"&gt;enable.idempotence&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;true&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Ao habilitar a idempotência no produtor, também será mandatório definir as seguintes configurações:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight properties"&gt;&lt;code&gt;&lt;span class="py"&gt;acks&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;all&lt;/span&gt;
&lt;span class="py"&gt;max.in.flight.requests.per.connection&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;5&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Com este setup, cada requisição que envia lotes de eventos ao Kafka terá um identificador único.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Cada lote na requisição tem como destino uma partição e um tópico&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Ao chegar no kafka, esta requisição será processada e cada lote enviado ao líderes das respectivas partições. Ao terminar isso, o &lt;code&gt;ack&lt;/code&gt; é devolvido ao produtor.&lt;/p&gt;

&lt;p&gt;Mas esta é uma comunicação em rede. Então o atraso em receber o &lt;code&gt;ack&lt;/code&gt; ou qualquer outro problema poderá provocar a retentativa padrão dos produtores Kafka. Assim, ele enviará novamente a requisição identificada.&lt;/p&gt;

&lt;p&gt;O broker ao receber novamente àquela requisição processada com sucesso, apenas devolve o &lt;code&gt;ack&lt;/code&gt;, sem persistir novamente e assim evitando a duplicação.&lt;/p&gt;

&lt;p&gt;Sem o produtor idempotente, o broker receberia a requisição da retentiva como se fosse uma nova, persistindo novamente o lote de eventos, causando duplicatas.&lt;/p&gt;




&lt;p&gt;É isso pessoal!&lt;/p&gt;

&lt;p&gt;Até o próximo!! &lt;/p&gt;

</description>
      <category>kafka</category>
      <category>kafkabr</category>
      <category>java</category>
      <category>apachekafka</category>
    </item>
    <item>
      <title>Apagar eventos do kafka?</title>
      <dc:creator>Fabio José</dc:creator>
      <pubDate>Mon, 25 May 2020 16:23:28 +0000</pubDate>
      <link>https://dev.to/kafkabr/apagar-eventos-do-kafka-jd4</link>
      <guid>https://dev.to/kafkabr/apagar-eventos-do-kafka-jd4</guid>
      <description>&lt;p&gt;Você já deve ter-se perguntado se é possível apagar eventos do Kafka, não? Bem, isso realmente não é possível.&lt;/p&gt;

&lt;p&gt;Mas existe uma forma especial utilizada principalmente para atender o direito ao esquecimento, requerido pela LGPD em alguns países.&lt;/p&gt;

&lt;p&gt;⚠️ Atenção! ⚠️ Entenda todos os impactos antes de alterar qualquer tópico que você já tenha ai no seu cluster Kafka.&lt;/p&gt;




&lt;p&gt;Isso não se trata de um hack ou algo do tipo, até porque ele está descrito no livro &lt;a href="https://www.confluent.io/resources/kafka-the-definitive-guide"&gt;"Kafka: The Definitive Guide"&lt;/a&gt; 😊.&lt;/p&gt;




&lt;p&gt;Primeiro, seu tópico deverá ser compactado, ou seja, a configuração &lt;code&gt;cleanup.policy&lt;/code&gt; deve ser igual a &lt;code&gt;compact&lt;/code&gt;. Ele garantirá que somente o evento mais recente será mantido.&lt;/p&gt;

&lt;p&gt;Em um tópico compactado é mandatório produzir eventos com chave de partição, o atributo &lt;code&gt;key&lt;/code&gt;. É com base nele que o Kafka manterá o evento mais recente.&lt;/p&gt;

&lt;p&gt;Digamos que existe o tópico &lt;code&gt;clientes&lt;/code&gt; com os seguintes eventos na partição &lt;code&gt;3&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CHAVE   VALOR DO EVENTO
C001    Cliente 1y
C002    Cliente 2+
C003    Cliente 3@
C001    Cliente 1π
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Veja que a chave &lt;code&gt;C001&lt;/code&gt; tem um evento mais recente, que será mantido após a compactação do tópico:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CHAVE   VALOR DO EVENTO
C002    Cliente 2+
C003    Cliente 3@
C001    Cliente 1π
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Agora que você entendeu a compactação, para "apagar" um evento, o &lt;code&gt;C003&lt;/code&gt; por exemplo, bastará produzir um evento sem valor com a mesma chave:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CHAVE   VALOR DO EVENTO
C001    Cliente 1y
C002    Cliente 2+
C003    Cliente 3@
C001    Cliente 1π
C003
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Então, após compactação do tópico, o cliente &lt;code&gt;C003&lt;/code&gt; deixará de existir:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CHAVE   VALOR DO EVENTO
C001    Cliente 1y
C002    Cliente 2+
C001    Cliente 1π
C003
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Aqui foi um exemplo, mas a chave de partição poderia ser o CPF, CNPJ ou qualquer outra valor pertinente ao caso se uso.&lt;/p&gt;




&lt;p&gt;Simples, não? Mas não se engane, porque empregar chave de partição não uma regra geral. Então, não mude as configurações do seu tópico antes de entender as consequências.&lt;/p&gt;

&lt;p&gt;Se você já tem um tópico com eventos produzidos sem chave, a primeira coisa que acontecerá quando mudar a &lt;code&gt;cleanup.policy&lt;/code&gt; para &lt;code&gt;compact&lt;/code&gt; é que todos eles serão eliminados 😬. Mudar configurações de tópicos só com muita análise dos impactos.&lt;/p&gt;




&lt;p&gt;É isso. &lt;/p&gt;

&lt;p&gt;Obrigado pelo leitura e até o próximo!&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>apachekafka</category>
      <category>kafkabr</category>
    </item>
    <item>
      <title>Como implementar Dead-letter Topic com Spring Kafka</title>
      <dc:creator>Fabio José</dc:creator>
      <pubDate>Mon, 18 May 2020 11:47:35 +0000</pubDate>
      <link>https://dev.to/kafkabr/como-implementar-dead-letter-topic-com-spring-kafka-585e</link>
      <guid>https://dev.to/kafkabr/como-implementar-dead-letter-topic-com-spring-kafka-585e</guid>
      <description>&lt;p&gt;&lt;em&gt;Dead-letter Topic&lt;/em&gt;, &lt;a href="https://en.m.wikipedia.org/wiki/Dead_letter_queue" rel="noopener noreferrer"&gt;&lt;em&gt;Dead-letter Queue&lt;/em&gt;&lt;/a&gt; ou em bom e velho português: &lt;strong&gt;Tópicos de mensagens não-entregues&lt;/strong&gt;. São tópicos necessários em sistemas distribuídos onde a comunicação é assíncrona e através de brokers como o Kafka.&lt;/p&gt;

&lt;p&gt;Os dados que chegam nestes tópicos passaram por todas as tentativas possíveis para tratamento de erros e já não resta mais nada a ser feito, se não, a intervenção humana. Assim, &lt;strong&gt;não será qualquer erro&lt;/strong&gt; que levará mensagens ou eventos a serem publicados em um tópico &lt;em&gt;dead-letter&lt;/em&gt;.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;No mundo Kafka um tópico dead-letter é destinado aos registros consumidos, que por algum erro irrecuperável não puderam ser processados com sucesso.&lt;/p&gt;
&lt;/blockquote&gt;




&lt;p&gt;Neste artigo será demonstrada uma abordagem para implementar DLT no Kafka, utilizando Java e Spring. Para os impacientes 🧐, estes são os fontes :&lt;/p&gt;


&lt;div class="ltag-github-readme-tag"&gt;
  &lt;div class="readme-overview"&gt;
    &lt;h2&gt;
      &lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev.to%2Fassets%2Fgithub-logo-5a155e1f9a670af7944dd5e12375bc76ed542ea80224905ecaf878b9157cdefc.svg" alt="GitHub logo"&gt;
      &lt;a href="https://github.com/fabiojose" rel="noopener noreferrer"&gt;
        fabiojose
      &lt;/a&gt; / &lt;a href="https://github.com/fabiojose/spring-kafka-dlt-ex" rel="noopener noreferrer"&gt;
        spring-kafka-dlt-ex
      &lt;/a&gt;
    &lt;/h2&gt;
    &lt;h3&gt;
      Dead-letter Topic com Spring Kafka
    &lt;/h3&gt;
  &lt;/div&gt;
&lt;/div&gt;





&lt;h3&gt;
  
  
  Classificar Erros
&lt;/h3&gt;

&lt;p&gt;Antes de escrever qualquer linha de código é necessário classificar os erros e como serão tratados.&lt;/p&gt;

&lt;p&gt;Existem dois tipos de erros: &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;recuperáveis&lt;/li&gt;
&lt;li&gt;não-recuperáveis&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Os erros &lt;strong&gt;recuperáveis&lt;/strong&gt; ou que podem ser tratados, são aqueles onde  alguma abordagem será empregada para tentar finalizar o fluxo com sucesso.&lt;/p&gt;

&lt;p&gt;Por exemplo, no Java, os erros &lt;a href="https://docs.oracle.com/javase/8/docs/api/java/net/ConnectException.html" rel="noopener noreferrer"&gt;&lt;code&gt;ConnectException&lt;/code&gt;&lt;/a&gt; e &lt;a href="https://docs.oracle.com/javase/8/docs/api/java/net/UnknownHostException.html" rel="noopener noreferrer"&gt;&lt;code&gt;UnknownHostException&lt;/code&gt;&lt;/a&gt; podem ser recuperados através de retentativas, pois normalmente são causados por instabilidades momentâneas no serviço ou na rede.&lt;/p&gt;

&lt;p&gt;Já os &lt;strong&gt;não-recuperáveis&lt;/strong&gt; são aqueles que independente do que seja feito, não será possível finalizar o fluxo com sucesso. Logo, estes são candidatos a seguirem diretamente para o tópico dead-letter. E como exemplo, se você utiliza Avro, o erro &lt;a href="http://avro.apache.org/docs/1.9.2/api/java/index.html" rel="noopener noreferrer"&gt;&lt;code&gt;AvroMissingFieldException&lt;/code&gt;&lt;/a&gt; indica a ausência de um campo requerido, não havendo nada a fazer. Portanto será inútil a retentativa, por exemplo.&lt;/p&gt;




&lt;p&gt;&lt;strong&gt;E além dos erros técnicos, você também deverá classificar seus erros de negócio e escolher uma estratégia para tratá-los.&lt;/strong&gt;&lt;/p&gt;




&lt;p&gt;Bem, agora veremos como implementar DLT para problemas técnicos no Java. E é bem provável que você encontre equivalentes na sua linguagem ou framework.&lt;/p&gt;

&lt;h3&gt;
  
  
  Recuperáveis
&lt;/h3&gt;

&lt;p&gt;Aqui segue uma lista de erros técnicos recuperáveis que, em nome da simplicidade, serão tratados através de retentativas.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://docs.oracle.com/javase/8/docs/api/java/net/ConnectException.html" rel="noopener noreferrer"&gt;&lt;code&gt;ConnectException&lt;/code&gt;&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://docs.oracle.com/javase/8/docs/api/java/net/UnknownHostException.html" rel="noopener noreferrer"&gt;&lt;code&gt;UnknownHostException&lt;/code&gt;&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;ProductNotFoundException&lt;/code&gt;: a título de exemplo, este erro de negócio foi definido como tratável. Porque um serviço chamado &lt;em&gt;Catalogo&lt;/em&gt;, que faz &lt;a href="https://martinfowler.com/eaaDev/EventSourcing.html" rel="noopener noreferrer"&gt;event-sourcing&lt;/a&gt; das mudanças emitidas pelo serviço &lt;em&gt;Produto&lt;/em&gt;, ainda não processou o evento com o produto em questão. Mas através da retentativa seu fluxo poderá finalizar com sucesso.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Não-recuperáveis
&lt;/h3&gt;

&lt;p&gt;Estes são alguns dos erros não-recuperáveis, ou seja, se passarem pelo mesmo processo de retentativas somente consumiriam recursos, sem chances de finalizar com sucesso.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="http://avro.apache.org/docs/1.9.2/api/java/index.html" rel="noopener noreferrer"&gt;&lt;code&gt;AvroMissingFieldException&lt;/code&gt;&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://docs.oracle.com/javase/8/docs/api/java/lang/NullPointerException.html" rel="noopener noreferrer"&gt;&lt;code&gt;NullPointerException&lt;/code&gt;&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://docs.oracle.com/javase/8/docs/api/java/lang/ClassCastException.html" rel="noopener noreferrer"&gt;&lt;code&gt;ClassCastException&lt;/code&gt;&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/common/errors/RecordTooLargeException.html" rel="noopener noreferrer"&gt;&lt;code&gt;RecordTooLargeException&lt;/code&gt;&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;StockNotAvailableException&lt;/code&gt;: como exemplo, este erro de negócio foi classificado como irrecuperável. Porque a falta de produtos no estoque não será resolvida com retentativas, por exemplo.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Implementação
&lt;/h3&gt;

&lt;blockquote&gt;
&lt;p&gt;Existe um &lt;a href="https://eng.uber.com/reliable-reprocessing/" rel="noopener noreferrer"&gt;ótimo artigo&lt;/a&gt; no blog de engenharia do Uber que descreve uma arquitetura para dead-letter. Vale a leitura!&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Esta implementação foi dividida em três partes:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;tópicos&lt;/li&gt;
&lt;li&gt;construção&lt;/li&gt;
&lt;li&gt;tratamento&lt;/li&gt;
&lt;/ul&gt;

&lt;h4&gt;
  
  
  Tópicos
&lt;/h4&gt;

&lt;p&gt;Para cada processo que terá tratamento &lt;em&gt;dead-letter&lt;/em&gt;, será criado um tópico com o sufixo &lt;code&gt;-dlt&lt;/code&gt;. Tome como exemplo um processo que realiza a reserva de estoque a partir das ordens de compra publicadas no tópico &lt;code&gt;ordem-compra&lt;/code&gt;. Então, ao invés de criar um tópico ordem-compra-dlq, será utilizado um nome relevante ao processo que está falhando:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;reservar-estoque-dlt&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;E mais os tópicos para retentativas, que devem ser quantos forem necessários até finalmente o registro chegar ao tópico &lt;em&gt;dead-letter&lt;/em&gt;. Digamos que serão no máximo quatro retentativas além da inicial:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;reservar-estoque-retry-1&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;reservar-estoque-retry-2&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;reservar-estoque-retry-3&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;reservar-estoque-retry-4&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Os tópicos devem ter características que não interfiram no andamento das retentativas ou na publicação do tópico dead-letter. Um exemplo é a configuração &lt;code&gt;max.message.bytes&lt;/code&gt;, que pode acarretar um erro chamado &lt;code&gt;message too large&lt;/code&gt;. Portanto os tópicos deverão permitir mensagens maiores, caso contrário também não será possível utilizá-los porque são idênticos ao principal, inclusive com as mesmas limitações.&lt;/p&gt;

&lt;h4&gt;
  
  
  Construção
&lt;/h4&gt;

&lt;p&gt;A construção foi feita com Spring Kafka porque ele já vem com muitas utilidades para tratamento para &lt;em&gt;dead-letter&lt;/em&gt;, o que contribui muito com a produtividade se ela é o foco no seu projeto. Mas nada impede que você escreva a mesma solução com Clientes Kafka no Java ou na sua linguagem do seu projeto.&lt;/p&gt;

&lt;p&gt;Para utilizar os recursos destinados a dead-letter, é necessário customizar a fábrica de objetos encarregada de produzir &lt;em&gt;listeners&lt;/em&gt; Kafka no Spring.&lt;/p&gt;

&lt;p&gt;Assim, a configuração programática foi sub-dividida em três partes:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;resolver&lt;/code&gt;: responsável por determinar o tópico destino do registro que está no contexto do erro.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;errorHandler&lt;/code&gt;: manipulador do erro&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;kafkaListernerContainerFactory&lt;/code&gt;: fabrica instâncias que são utilizadas nos métodos anotados com &lt;code&gt;@KafkaListener&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;E para cada uma das sub-divisões existe uma implementação &lt;strong&gt;Main&lt;/strong&gt; e outra &lt;strong&gt;Retry&lt;/strong&gt;. Como é possível imaginar, uma cuida das configurações para o processamento principal outro para as retentativas.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Main resolver&lt;/strong&gt;, responsável por determinar qual o tópico destino com base no erro-raiz lançado ao processar o registro consumido do tópico &lt;code&gt;ordem-compra&lt;/code&gt;:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@Bean&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;BiFunction&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;TopicPartition&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
 &lt;span class="n"&gt;mainResolver&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;BiFunction&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;TopicPartition&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
   &lt;span class="nd"&gt;@Override&lt;/span&gt;
   &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;TopicPartition&lt;/span&gt; &lt;span class="nf"&gt;apply&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="c1"&gt;// ####&lt;/span&gt;
    &lt;span class="c1"&gt;// Por padrão, quando é não-recuperável, segue diretamente p/ dead-letter&lt;/span&gt;
    &lt;span class="nc"&gt;TopicPartition&lt;/span&gt; &lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
     &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;TopicPartition&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dltTopic&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
      &lt;span class="no"&gt;QUALQUER_PARTICAO&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="c1"&gt;// ####&lt;/span&gt;
    &lt;span class="c1"&gt;// Trata-se de um erro recuperável?&lt;/span&gt;
    &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="kt"&gt;boolean&lt;/span&gt; &lt;span class="n"&gt;recuperavel&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;isRecuperavel&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;recuperavel&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

     &lt;span class="nc"&gt;Optional&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;origem&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
      &lt;span class="n"&gt;topicoOrigem&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;headers&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
      &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;or&lt;/span&gt;&lt;span class="o"&gt;(()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nc"&gt;Optional&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;of&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;NENHUM_CABECALHO&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;

     &lt;span class="c1"&gt;// ####&lt;/span&gt;
     &lt;span class="c1"&gt;// Se origem for outro tópico, segue para o primeiro retry&lt;/span&gt;
     &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;destino&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
      &lt;span class="n"&gt;origem&lt;/span&gt;
      &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;filter&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topico&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="n"&gt;topico&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;matches&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;retryTopicsPattern&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
      &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;t&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;retryFirstTopic&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
      &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;orElse&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dltTopic&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

     &lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;TopicPartition&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;destino&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;QUALQUER_PARTICAO&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
   &lt;span class="o"&gt;}&lt;/span&gt;
  &lt;span class="o"&gt;};&lt;/span&gt;
 &lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;&lt;strong&gt;Main error handler&lt;/strong&gt;, que utiliza o &lt;strong&gt;Main resolver&lt;/strong&gt; e é responsável por definir duas configurações essenciais para tratamento dos erros:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;a href="https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.html" rel="noopener noreferrer"&gt;&lt;code&gt;DeadLetterPublishingRecoverer&lt;/code&gt;&lt;/a&gt;: inicia o fluxo DLT, que é delegado pelo SeekToCurrentErrorHandler caso as retentativas locais não resolvam o erro.&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/SeekToCurrentErrorHandler.html" rel="noopener noreferrer"&gt;&lt;code&gt;SeekToCurrentErrorHandler&lt;/code&gt;&lt;/a&gt;: manipula qualquer erro que seja lançado no método anotado com &lt;code&gt;@KafkaListener&lt;/code&gt;. Naturalmente, se você capturá-los com &lt;code&gt;catch&lt;/code&gt; e não permitir que eles &lt;em&gt;subam na pilha&lt;/em&gt;, não será possível tratá-los.&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@Bean&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;SeekToCurrentErrorHandler&lt;/span&gt; &lt;span class="nf"&gt;mainErrorHandler&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
 &lt;span class="nd"&gt;@Qualifier&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"mainResolver"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
 &lt;span class="nc"&gt;BiFunction&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;TopicPartition&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;resolver&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
 &lt;span class="nc"&gt;KafkaTemplate&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;template&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

 &lt;span class="c1"&gt;// ####&lt;/span&gt;
 &lt;span class="c1"&gt;// Recuperação usando dead-letter &lt;/span&gt;
 &lt;span class="nc"&gt;DeadLetterPublishingRecoverer&lt;/span&gt; &lt;span class="n"&gt;recoverer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
  &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;DeadLetterPublishingRecoverer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;template&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;resolver&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

 &lt;span class="c1"&gt;// ####&lt;/span&gt;
 &lt;span class="c1"&gt;// Tentar 3x localmente antes de iniciar o fluxo dead-letter&lt;/span&gt;
 &lt;span class="nc"&gt;SeekToCurrentErrorHandler&lt;/span&gt; &lt;span class="n"&gt;handler&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
  &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;SeekToCurrentErrorHandler&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;recoverer&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;RETENTAR_3X&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

 &lt;span class="c1"&gt;// ####&lt;/span&gt;
 &lt;span class="c1"&gt;// Lista das exceções não-recuperáveis, para evitar o retry local&lt;/span&gt;
 &lt;span class="n"&gt;excecoes&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getNaoRecuperavies&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;forEach&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt;
  &lt;span class="n"&gt;handler&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;addNotRetryableException&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;

 &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;handler&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;&lt;strong&gt;Main listener factory&lt;/strong&gt;, encarregado de fabricar os consumidores que processam registros do tópíco &lt;code&gt;ordem-compra&lt;/code&gt;.&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@Bean&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;KafkaListenerContainerFactory&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="nc"&gt;ConcurrentMessageListenerContainer&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;GenericRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt;
 &lt;span class="n"&gt;mainKafkaListenerContainerFactory&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
  &lt;span class="nd"&gt;@Qualifier&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"mainErrorHandler"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="nc"&gt;SeekToCurrentErrorHandler&lt;/span&gt; &lt;span class="n"&gt;errorHandler&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
  &lt;span class="nc"&gt;KafkaProperties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
  &lt;span class="nc"&gt;ConsumerFactory&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;GenericRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;factory&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

  &lt;span class="nc"&gt;ConcurrentKafkaListenerContainerFactory&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;GenericRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;listener&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
   &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ConcurrentKafkaListenerContainerFactory&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;();&lt;/span&gt;

  &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setConsumerFactory&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;factory&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

  &lt;span class="c1"&gt;// ####&lt;/span&gt;
  &lt;span class="c1"&gt;// Utilizando o mainErrorHandler para tratar os erros&lt;/span&gt;
  &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setErrorHandler&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;errorHandler&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

  &lt;span class="c1"&gt;// Falhar, caso os tópicos não existam?&lt;/span&gt;
  &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getContainerProperties&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
   &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setMissingTopicsFatal&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;missingTopicsFatal&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

  &lt;span class="c1"&gt;// Commit do offset no registro, logo após processá-lo no listener&lt;/span&gt;
  &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getContainerProperties&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;setAckMode&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;AckMode&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;MANUAL&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

  &lt;span class="c1"&gt;// Commits síncronos&lt;/span&gt;
  &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getContainerProperties&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;setSyncCommits&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Boolean&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;TRUE&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
 &lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;Então, basta anotar o método como segue.&lt;/p&gt;

&lt;p&gt;Mas vale ressaltar que o offset sempre deverá ser confirmado, porque todo o fluxo que trata os erros será feito por retentativa e talvez culminando no tópico dead-letter. &lt;em&gt;Analise o seu caso-de-uso e entenda se esta abordagem também se aplicada.&lt;/em&gt;&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@KafkaListener&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
 &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"main-kafka-listener"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;topics&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"${app.kafka.consumer.topics}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;containerFactory&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"mainKafkaListenerContainerFactory"&lt;/span&gt;
&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;consume&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nd"&gt;@Payload&lt;/span&gt; &lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;GenericRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
 &lt;span class="nc"&gt;Acknowledgment&lt;/span&gt; &lt;span class="n"&gt;ack&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

 &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

  &lt;span class="c1"&gt;// #### &lt;/span&gt;
  &lt;span class="c1"&gt;// Processar&lt;/span&gt;
  &lt;span class="n"&gt;process&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

 &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;finally&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

  &lt;span class="c1"&gt;// ####&lt;/span&gt;
  &lt;span class="c1"&gt;// Sempre confirmar o offset&lt;/span&gt;
  &lt;span class="n"&gt;ack&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;acknowledge&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
 &lt;span class="o"&gt;}&lt;/span&gt;

&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;Já a configuração para o processamento das retentativas segue moldes similares, com algumas exceções:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;resolver&lt;/code&gt;: retorna qual o próximo tópico na sequência de retentativas ou se é o &lt;code&gt;reservar-estoque-dlt&lt;/code&gt;, caso já tenham passadas por todas.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;errorHandler&lt;/code&gt;: nenhuma retentiva local.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Este é o método anotado com &lt;code&gt;@KafkaListener&lt;/code&gt; que tratará os tópicos &lt;code&gt;reservar-estoque-retry&lt;/code&gt;:&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@KafkaListener&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
 &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"retry-kafka-listener"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;topicPattern&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"${app.kafka.dlt.retry.topics.pattern}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;containerFactory&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"retryKafkaListenerContainerFactory"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="s"&gt;"fetch.min.bytes=${app.kafka.dlt.retry.min.bytes}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
  &lt;span class="s"&gt;"fetch.max.wait.ms=${app.kafka.dlt.retry.max.wait.ms}"&lt;/span&gt;
 &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;retry&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nd"&gt;@Payload&lt;/span&gt; &lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;GenericRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
 &lt;span class="nc"&gt;Acknowledgment&lt;/span&gt; &lt;span class="n"&gt;ack&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

 &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

  &lt;span class="c1"&gt;// ####&lt;/span&gt;
  &lt;span class="c1"&gt;// Reprocessar&lt;/span&gt;
  &lt;span class="n"&gt;process&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

 &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;finally&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="c1"&gt;// ####&lt;/span&gt;
  &lt;span class="c1"&gt;// Sempre confirmar o offset&lt;/span&gt;
  &lt;span class="n"&gt;ack&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;acknowledge&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
 &lt;span class="o"&gt;}&lt;/span&gt;

&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;topicPattern&lt;/code&gt;: consumir todos os tópicos &lt;code&gt;reservar-estoque-retry&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;fetch.min.bytes&lt;/code&gt; e &lt;code&gt;fetch.max.wait.ms&lt;/code&gt;: utilizados para provocar um certo atraso no consumo dos registros, visto que sem eles o consumo seria praticamente instantâneo.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Por fim, o &lt;code&gt;application.properties&lt;/code&gt; será assim: &lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight properties"&gt;&lt;code&gt;

&lt;span class="py"&gt;app.kafka.dlt.retry.topics&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;4&lt;/span&gt;
&lt;span class="py"&gt;app.kafka.dlt.retry.topics.pattern&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;reservar-estoque-retry-[0-9]+&lt;/span&gt;
&lt;span class="py"&gt;app.kafka.dlt.retry.topic.first&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;reservar-estoque-retry-1&lt;/span&gt;
&lt;span class="py"&gt;app.kafka.dlt.topic&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;reservar-estoque-dlt&lt;/span&gt;

&lt;span class="c"&gt;# Lista de exceções recuperáveis
&lt;/span&gt;&lt;span class="err"&gt;app.kafka.dlt.excecoes.recuperaveis[0]=java.net.ConnectException&lt;/span&gt;
&lt;span class="err"&gt;app.kafka.dlt.excecoes.recuperaveis[1]=java.net.UnknownHostException&lt;/span&gt;

&lt;span class="c"&gt;# Lista de exceções não-recuperáveis
&lt;/span&gt;&lt;span class="err"&gt;app.kafka.dlt.excecoes.naoRecuperaveis[0]=org.apache.avro.AvroMissingFieldException&lt;/span&gt;
&lt;span class="err"&gt;app.kafka.dlt.excecoes.naoRecuperaveis[1]=java.lang.NullPointerException&lt;/span&gt;

&lt;span class="c"&gt;# Provocar atraso no processmento de retentativa
&lt;/span&gt;&lt;span class="py"&gt;app.kafka.dlt.retry.max.wait.ms&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;20000&lt;/span&gt;
&lt;span class="py"&gt;app.kafka.dlt.retry.min.bytes&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;52428800&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;Todo a implementação está disponível no Github, &lt;em&gt;clone it and have fun!&lt;/em&gt;&lt;/p&gt;


&lt;div class="ltag-github-readme-tag"&gt;
  &lt;div class="readme-overview"&gt;
    &lt;h2&gt;
      &lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev.to%2Fassets%2Fgithub-logo-5a155e1f9a670af7944dd5e12375bc76ed542ea80224905ecaf878b9157cdefc.svg" alt="GitHub logo"&gt;
      &lt;a href="https://github.com/fabiojose" rel="noopener noreferrer"&gt;
        fabiojose
      &lt;/a&gt; / &lt;a href="https://github.com/fabiojose/spring-kafka-dlt-ex" rel="noopener noreferrer"&gt;
        spring-kafka-dlt-ex
      &lt;/a&gt;
    &lt;/h2&gt;
    &lt;h3&gt;
      Dead-letter Topic com Spring Kafka
    &lt;/h3&gt;
  &lt;/div&gt;
  &lt;div class="ltag-github-body"&gt;
    
&lt;div id="readme" class="md"&gt;
&lt;div class="markdown-heading"&gt;
&lt;h1 class="heading-element"&gt;spring-kafka-dlt-ex&lt;/h1&gt;
&lt;/div&gt;

&lt;p&gt;Dead-letter Topico com Spring Kafka e formato de dados Avro.&lt;/p&gt;

&lt;div class="markdown-heading"&gt;
&lt;h2 class="heading-element"&gt;Requerimentos&lt;/h2&gt;
&lt;/div&gt;

&lt;ul&gt;
&lt;li&gt;JDK 11&lt;/li&gt;
&lt;li&gt;Apache Maven 3.6+&lt;/li&gt;
&lt;li&gt;Docker 19+&lt;/li&gt;
&lt;li&gt;Acesso ao repositório &lt;a href="https://repo.maven.apache.org/maven2/" rel="nofollow noopener noreferrer"&gt;https://repo.maven.apache.org/maven2/&lt;/a&gt; ou uma
alternativa com acesso às dependências presentes no &lt;code&gt;pom.xml&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Schema Registry&lt;/li&gt;
&lt;li&gt;Kafka&lt;/li&gt;
&lt;li&gt;Testcontainers&lt;/li&gt;
&lt;li&gt;Lombok&lt;/li&gt;
&lt;li&gt;Commons Lang3&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="markdown-heading"&gt;
&lt;h2 class="heading-element"&gt;Configurações&lt;/h2&gt;
&lt;/div&gt;

&lt;p&gt;Não se preocupe, pois apesar de existirem atalhos pelas variávies
de ambiente, você pode utilizar tranquilamente aquilo que o Spring Boot
oferece. Então veja todos as propriedades no
&lt;a href="https://github.com/fabiojose/spring-kafka-dlt-ex./src/main/resources/application.properties" rel="noopener noreferrer"&gt;application.properties&lt;/a&gt;&lt;/p&gt;
&lt;p&gt;No caso do Kafka, utilizamos Spring Kafka, então você utilizar
o modo Spring para configurações.&lt;/p&gt;
&lt;div class="markdown-heading"&gt;
&lt;h3 class="heading-element"&gt;Variáveis de Ambiente&lt;/h3&gt;

&lt;/div&gt;
&lt;ul&gt;
&lt;li&gt;
&lt;p&gt;&lt;code&gt;APP_KAFKA_CONSUMER_TOPICS&lt;/code&gt;: tópicos para consumir, ou expressão.&lt;/p&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;code&gt;KAFKA_CLIENT_ID&lt;/code&gt;: nome do cliente Kafka, usado pelos brokers para logs e
métricas. Utilize um &lt;a href="https://medium.com/coding-skills/clean-code-101-meaningful-names-and-functions-bf450456d90c" rel="nofollow noopener noreferrer"&gt;nome clean&lt;/a&gt;, não genérico.&lt;/p&gt;
&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;spring.kafka.producer.client-id&lt;/code&gt;, &lt;code&gt;spring.kafka.consumer.client-id&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;code&gt;KAFKA_CONSUMER_GROUP&lt;/code&gt;: nome do grupo de consumo que esta aplicação pertence&lt;/p&gt;
&lt;ul&gt;
&lt;li&gt;&lt;code&gt;spring.kafka.consumer.group-id&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;code&gt;KAFKA_BOOTSTRAP_SERVERS&lt;/code&gt;: lista de brokers para o cluster Kafka&lt;/p&gt;
&lt;ul&gt;
&lt;li&gt;&lt;code&gt;spring.kafka.bootstrap-servers&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;code&gt;SCHEMA_REGISTRY_URL&lt;/code&gt;: url para o registro de esquemas Avro&lt;/p&gt;
&lt;ul&gt;
&lt;li&gt;&lt;code&gt;spring.kafka.properties.schema.registry.url&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;&lt;p&gt;…&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;
&lt;/div&gt;
  &lt;/div&gt;
  &lt;div class="gh-btn-container"&gt;&lt;a class="gh-btn" href="https://github.com/fabiojose/spring-kafka-dlt-ex" rel="noopener noreferrer"&gt;View on GitHub&lt;/a&gt;&lt;/div&gt;
&lt;/div&gt;





&lt;p&gt;Quando o registro atinge o último tópico de retentativa, que neste caso é o &lt;code&gt;reservar-estoque-4&lt;/code&gt; e não seja processado com sucesso, finalmente ele será publicado no tópico &lt;em&gt;dead-letter&lt;/em&gt;, então a equipe deverá estar preparada para o tratamento adequado.&lt;/p&gt;

&lt;h4&gt;
  
  
  Tratamento
&lt;/h4&gt;

&lt;p&gt;Bem, neste momento o registro com problemas já desembarcou no tópico &lt;code&gt;reservar-estoque-dlt&lt;/code&gt; e bem antes disso acontecer um sistema preciso de monitoramento dos tópicos deveria ter alertado sobre o uso dos tópicos para retentativas, principalmente se os registros estão atingindo o &lt;code&gt;reservar-estoque-retry-4&lt;/code&gt;.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Veja &lt;a href="https://medium.com/@alvarobacelar/monitorando-um-cluster-kafka-com-ferramentas-open-source-a4032836dc79" rel="noopener noreferrer"&gt;neste artigo&lt;/a&gt; como monitorar seu cluster Kafka.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;A maneira mais prudente de tratamento, além do monitoramento e um ótimo sistema de rastreamento distribuído, será construir um processo em que para cada registro publicado no DLT, tickets e notificações ChatOps deveram ser enviados para a equipe responsável.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Também são necessárias ferramentas adequadas para, por exemplo, editar registros e colocá-los novamente no tópico original.&lt;/p&gt;
&lt;/blockquote&gt;




&lt;p&gt;Bem, é isso! Até o próximo.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>kafkabr</category>
      <category>java</category>
      <category>spring</category>
    </item>
    <item>
      <title>Kafka: consumindo registros com Spring</title>
      <dc:creator>Fabio José</dc:creator>
      <pubDate>Tue, 28 Apr 2020 11:48:36 +0000</pubDate>
      <link>https://dev.to/kafkabr/kafka-consumindo-registros-com-spring-40h8</link>
      <guid>https://dev.to/kafkabr/kafka-consumindo-registros-com-spring-40h8</guid>
      <description>&lt;p&gt;O consumo de dados do Kafka, dependendo do caso de uso, requer alguns cuidados no commit. Fazê-lo de forma correta é importante para garantir que nada seja perdido.&lt;/p&gt;

&lt;p&gt;Com Clientes Kafka, em especial a versão para Java, é muito simples e intuitivo, mas no Spring Kafka é necessária atenção com ajustes especiais.&lt;/p&gt;




&lt;p&gt;O foco neste artigo é mostrar como consumir dados com Spring Kafka, que é uma abstração sobre os &lt;a href="https://docs.confluent.io/current/clients/java.html#java-client" rel="noopener noreferrer"&gt;Clientes Kafka&lt;/a&gt; para Java. Fazendo isso com seguindo setup:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;enable.auto.commit=false&lt;/code&gt;. Que requer commit manual&lt;/li&gt;
&lt;li&gt;commit síncrono&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;Devido a semântica de entrega &lt;em&gt;at-least-once&lt;/em&gt;, um registro poderá ser consumido uma ou &lt;code&gt;n&lt;/code&gt; vezes, e este setup busca reduzir isso.&lt;/p&gt;
&lt;/blockquote&gt;




&lt;p&gt;Um consumidor típico no Spring Kafka é escrito assim:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@Component&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;SpringKafkaListener&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;@KafkaListener&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topics&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"topico"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;consume&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;valor&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="c1"&gt;// Processar valor do registro&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;E criado com as seguintes configurações, feitas no &lt;code&gt;application.properties&lt;/code&gt;:&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight properties"&gt;&lt;code&gt;

&lt;span class="py"&gt;spring.kafka.bootstrap-servers&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;configure-me_kafka-broker:9092&lt;/span&gt;
&lt;span class="py"&gt;spring.kafka.consumer.client-id&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;configure-me_client-id&lt;/span&gt;
&lt;span class="py"&gt;spring.kafka.consumer.group-id&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;configure-me_group-id&lt;/span&gt;
&lt;span class="py"&gt;spring.kafka.consumer.auto-offset-reset&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;earliest&lt;/span&gt;
&lt;span class="py"&gt;spring.kafka.consumer.key-deserializer&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;org.apache.kafka.common.serialization.StringDeserializer&lt;/span&gt;
&lt;span class="py"&gt;spring.kafka.consumer.value-deserializer&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;org.apache.kafka.common.serialization.StringDeserializer&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;Felizmente o Spring Kafka não redefine valores, portanto a configuração padrão é mantida assim como definida na &lt;a href="https://kafka.apache.org/documentation/#consumerconfigs" rel="noopener noreferrer"&gt;documentação oficial&lt;/a&gt;, porém, nela o commit é automático e assíncrono. Contudo, no Spring Kafka, todas as configurações necessárias para commit manual e síncrono não estão disponíveis através de propriedades no &lt;code&gt;application.properties&lt;/code&gt;.&lt;/p&gt;
&lt;h2&gt;
  
  
  Resolvendo
&lt;/h2&gt;

&lt;p&gt;Spring Kafka é uma abstração, logo o &lt;em&gt;poll loop&lt;/em&gt; e commit são transparentes. E como pode-se ver no exemplo, um consumidor recebe apenas o registro e por padrão não tem acesso ao &lt;em&gt;Consumer&lt;/em&gt;.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Primeiro&lt;/strong&gt; é necessário revisar as configurações para desligar o commit automático.&lt;/p&gt;

&lt;p&gt;Nova configuração:&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight properties"&gt;&lt;code&gt;

&lt;span class="c"&gt;# Nada de novo aqui
&lt;/span&gt;&lt;span class="py"&gt;spring.kafka.bootstrap-servers&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;configure-me_kafka-broker:9092&lt;/span&gt;
&lt;span class="py"&gt;spring.kafka.consumer.client-id&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;configure-me_client-id&lt;/span&gt;
&lt;span class="py"&gt;spring.kafka.consumer.group-id&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;configure-me_group-id&lt;/span&gt;
&lt;span class="py"&gt;spring.kafka.consumer.auto-offset-reset&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;earliest&lt;/span&gt;
&lt;span class="py"&gt;spring.kafka.consumer.key-deserializer&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;org.apache.kafka.common.serialization.StringDeserializer&lt;/span&gt;
&lt;span class="py"&gt;spring.kafka.consumer.value-deserializer&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;org.apache.kafka.common.serialization.StringDeserializer&lt;/span&gt;

&lt;span class="c"&gt;# Desliga o commit automático no Cliente Kafka
&lt;/span&gt;&lt;span class="py"&gt;spring.kafka.consumer.enable-auto-commit&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;false&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;blockquote&gt;
&lt;p&gt;Spring tem sua própria notação para a maioria das configurações presentes no Kafka Consumer, que são traduzidas em tempo de execução para o nome correto.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Agora que o commit automático foi desligado, são necessários alguns ajustes programáticos feitos ao customizar as fábricas de objetos:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;ackMode para &lt;code&gt;MANUAL&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;syncCommits como &lt;code&gt;true&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.beans.factory.annotation.Autowired&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.boot.autoconfigure.kafka.KafkaProperties&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.context.annotation.Bean&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.context.annotation.Configuration&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.kafka.annotation.EnableKafka&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.kafka.config.KafkaListenerContainerFactory&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.kafka.core.ConsumerFactory&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.kafka.core.DefaultKafkaConsumerFactory&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.kafka.listener.ConcurrentMessageListenerContainer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.kafka.listener.ContainerProperties.AckMode&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@EnableKafka&lt;/span&gt;
&lt;span class="nd"&gt;@Configuration&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;KafkaConfig&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

  &lt;span class="nd"&gt;@Autowired&lt;/span&gt;
  &lt;span class="nc"&gt;KafkaProperties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nd"&gt;@Bean&lt;/span&gt;
  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;ConsumerFactory&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;consumerFactory&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;DefaultKafkaConsumerFactory&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;
              &lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;buildConsumerProperties&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;

  &lt;span class="nd"&gt;@Bean&lt;/span&gt;
  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;KafkaListenerContainerFactory&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;ConcurrentMessageListenerContainer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt;
      &lt;span class="nf"&gt;kafkaListenerContainerFactory&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

      &lt;span class="nc"&gt;ConcurrentKafkaListenerContainerFactory&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;listener&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; 
            &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ConcurrentKafkaListenerContainerFactory&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;();&lt;/span&gt;

      &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setConsumerFactory&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;consumerFactory&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;

      &lt;span class="c1"&gt;// Não falhar, caso ainda não existam os tópicos para consumo&lt;/span&gt;
      &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getContainerProperties&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
          &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setMissingTopicsFatal&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

      &lt;span class="c1"&gt;// ### AQUI&lt;/span&gt;
      &lt;span class="c1"&gt;// Commit manual do offset&lt;/span&gt;
      &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getContainerProperties&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;setAckMode&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;AckMode&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;MANUAL&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

      &lt;span class="c1"&gt;// ### AQUI&lt;/span&gt;
      &lt;span class="c1"&gt;// Commits síncronos&lt;/span&gt;
      &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getContainerProperties&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;setSyncCommits&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Boolean&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;TRUE&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;Então o consumidor com Spring Kafka terá esta aparência:&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.clients.consumer.ConsumerRecord&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.beans.factory.annotation.Autowired&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.kafka.annotation.KafkaListener&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.kafka.support.Acknowledgment&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.messaging.handler.annotation.Payload&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.stereotype.Component&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Component&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;SpringKafkaListener&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

  &lt;span class="nd"&gt;@KafkaListener&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topics&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"topico"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;consume&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nd"&gt;@Payload&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;valor&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Acknowledgment&lt;/span&gt; &lt;span class="n"&gt;ack&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="c1"&gt;//TODO Processar registro&lt;/span&gt;
    &lt;span class="c1"&gt;// . . . &lt;/span&gt;

    &lt;span class="c1"&gt;// Commmit manual, que também será síncrono&lt;/span&gt;
    &lt;span class="n"&gt;ack&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;acknowledge&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;Note que mesmo assim não existe acesso ao &lt;em&gt;Consumer&lt;/em&gt;, ao invez disso, o Spring injeta uma instância de &lt;code&gt;Acknowledgment&lt;/code&gt; que faz o commit quando tem seu método &lt;code&gt;acknowledge()&lt;/code&gt; executado.&lt;/p&gt;

&lt;p&gt;Também neste exemplo o offset é confirmado a cada registro processado. Isso é algo que degrada a taxa de transferência, mas reduz ainda mais as chances de consumos duplicados. Bem, mas cada caso é um caso 😊.&lt;/p&gt;

&lt;p&gt;O exemplo completo está disponível no Github:&lt;/p&gt;


&lt;div class="ltag-github-readme-tag"&gt;
  &lt;div class="readme-overview"&gt;
    &lt;h2&gt;
      &lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev.to%2Fassets%2Fgithub-logo-5a155e1f9a670af7944dd5e12375bc76ed542ea80224905ecaf878b9157cdefc.svg" alt="GitHub logo"&gt;
      &lt;a href="https://github.com/fabiojose" rel="noopener noreferrer"&gt;
        fabiojose
      &lt;/a&gt; / &lt;a href="https://github.com/fabiojose/skc-ex" rel="noopener noreferrer"&gt;
        skc-ex
      &lt;/a&gt;
    &lt;/h2&gt;
    &lt;h3&gt;
      Spring Kafka Consumer
    &lt;/h3&gt;
  &lt;/div&gt;
  &lt;div class="ltag-github-body"&gt;
    
&lt;div id="readme" class="md"&gt;
&lt;div class="markdown-heading"&gt;
&lt;h1 class="heading-element"&gt;Spring Kafka Consumer Example&lt;/h1&gt;
&lt;/div&gt;

&lt;p&gt;Exemplo de consumer com Spring Kafka&lt;/p&gt;

&lt;div class="markdown-heading"&gt;
&lt;h2 class="heading-element"&gt;Requerimentos&lt;/h2&gt;
&lt;/div&gt;

&lt;ul&gt;
&lt;li&gt;JDK 1.8&lt;/li&gt;
&lt;li&gt;Acesso ao repositório &lt;a href="https://repo.maven.apache.org/maven2/" rel="nofollow noopener noreferrer"&gt;https://repo.maven.apache.org/maven2/&lt;/a&gt; ou uma
alternativa com acesso às dependências presentes no &lt;code&gt;pom.xml&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="markdown-heading"&gt;
&lt;h2 class="heading-element"&gt;Build &amp;amp; Run&lt;/h2&gt;
&lt;/div&gt;

&lt;div class="markdown-heading"&gt;
&lt;h3 class="heading-element"&gt;Maven&lt;/h3&gt;

&lt;/div&gt;

&lt;p&gt;Para montar o fatjar, execute o comando:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Linux&lt;/strong&gt;&lt;/p&gt;

&lt;div class="highlight highlight-source-shell notranslate position-relative overflow-auto js-code-highlight"&gt;
&lt;pre&gt;./mvnw clean package&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;&lt;strong&gt;Windows&lt;/strong&gt;&lt;/p&gt;

&lt;div class="highlight highlight-source-powershell notranslate position-relative overflow-auto js-code-highlight"&gt;
&lt;pre&gt;.\&lt;span class="pl-c1"&gt;mvnw.cmd&lt;/span&gt; &lt;span class="pl-k"&gt;clean&lt;/span&gt; package&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Para executar:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Você pode utilizar o &lt;a href="https://github.com/fabiojose/skc-ex./docker-compose.yaml" rel="noopener noreferrer"&gt;&lt;code&gt;docker-compose.yaml&lt;/code&gt;&lt;/a&gt; para
subir um Kafka em sua máquina&lt;/p&gt;
&lt;/blockquote&gt;

&lt;div class="highlight highlight-source-shell notranslate position-relative overflow-auto js-code-highlight"&gt;
&lt;pre&gt;java \
  -Dspring.kafka.bootstrap-servers=&lt;span class="pl-s"&gt;&lt;span class="pl-pds"&gt;'&lt;/span&gt;localhost:9092&lt;span class="pl-pds"&gt;'&lt;/span&gt;&lt;/span&gt; \
  -Dspring.kafka.consumer.client-id=&lt;span class="pl-s"&gt;&lt;span class="pl-pds"&gt;'&lt;/span&gt;spring-kafka-ex&lt;span class="pl-pds"&gt;'&lt;/span&gt;&lt;/span&gt; \
  -Dspring.kafka.consumer.group-id=&lt;span class="pl-s"&gt;&lt;span class="pl-pds"&gt;'&lt;/span&gt;meu-grupo&lt;span class="pl-pds"&gt;'&lt;/span&gt;&lt;/span&gt; \
  -jar target/app-spring-boot.jar&lt;/pre&gt;

&lt;/div&gt;
&lt;div class="markdown-heading"&gt;
&lt;h3 class="heading-element"&gt;Docker&lt;/h3&gt;

&lt;/div&gt;

&lt;p&gt;A definição &lt;a href="https://github.com/fabiojose/skc-ex./Dockerfile" rel="noopener noreferrer"&gt;Dockerfile&lt;/a&gt; desta aplicação emprega
&lt;a href="https://docs.docker.com/develop/develop-images/multistage-build/" rel="nofollow noopener noreferrer"&gt;multi-stage builds&lt;/a&gt;.
Isso significa que nela acontece o build da aplicação e a criação da imagem.&lt;/p&gt;
&lt;p&gt;Se for necessário somente a criar a imagem, pode-se utilizar a definição
&lt;a href="https://github.com/fabiojose/skc-ex./Dockerfile-image" rel="noopener noreferrer"&gt;Dockerfile-image&lt;/a&gt;. Mas antes é necessário montar
o fatjar através do maven.&lt;/p&gt;
&lt;p&gt;Para build do fatjar e montar a imagem, execute o comando:&lt;/p&gt;
&lt;div class="highlight highlight-source-shell notranslate position-relative overflow-auto js-code-highlight"&gt;
&lt;pre&gt;docker build &lt;span class="pl-c1"&gt;.&lt;/span&gt; -t sk-consumer-ex:1.0&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;Para montar apenas a imagem (antes…&lt;/p&gt;
&lt;/div&gt;
  &lt;/div&gt;
  &lt;div class="gh-btn-container"&gt;&lt;a class="gh-btn" href="https://github.com/fabiojose/skc-ex" rel="noopener noreferrer"&gt;View on GitHub&lt;/a&gt;&lt;/div&gt;
&lt;/div&gt;


&lt;p&gt;Photo by Paweł Czerwiński on Unsplash&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>kafkabr</category>
      <category>spring</category>
      <category>java</category>
    </item>
    <item>
      <title>Kafka: compactação e compressão</title>
      <dc:creator>Fabio José</dc:creator>
      <pubDate>Mon, 20 Apr 2020 14:58:36 +0000</pubDate>
      <link>https://dev.to/kafkabr/kafka-compactacao-e-compressao-1o2i</link>
      <guid>https://dev.to/kafkabr/kafka-compactacao-e-compressao-1o2i</guid>
      <description>&lt;p&gt;Compactar e comprimir tem algo em comum:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;reduzir o espaço ocupado pelos dados&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Seja removendo partes desnecessárias através da compactação ou tornado-as menores com algoritmos de compressão.&lt;/p&gt;

&lt;p&gt;No Kafka podemos utilizar ambas.&lt;/p&gt;




&lt;h2&gt;
  
  
  Compactação
&lt;/h2&gt;

&lt;p&gt;A compactação no Kafka pode ser ativada através de uma configuração feita nos tópicos:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;cleanup.policy=compact&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;Por padrão ela é definida como &lt;code&gt;delete&lt;/code&gt;. Veja outros detalhes na &lt;a href="https://kafka.apache.org/documentation/#topicconfigs"&gt;documentação oficial&lt;/a&gt;.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Quando a política de limpeza for igual a &lt;code&gt;compact&lt;/code&gt;, periodicamente os tópicos são varridos em busca de registros repetidos. E para encontrá-los, o Kafka utiliza a &lt;code&gt;key&lt;/code&gt; presente em cada um deles.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Registros sem chave, são imediatamente eliminados.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Acompanhe este exemplo, onde os registros estão dispotos dos mais antigos para os mais novos;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Key  Value
c1   Graziela Maciel, MS
c3   Carlos Chagas, MG
c5   Bertha Lutz, SP
c1   Graziela Maciel, RJ
c3   Carlos Chagas, RJ
c5   Bertha Luttz, SP
c7   Suzana Herculano, RJ
c5   Bertha Lutz, RJ
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Estes são os registros mantidos após o processo de compactação:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Key  Value
c1   Graziela Maciel, RJ
c3   Carlos Chagas, RJ
c7   Suzana Herculano, RJ
c5   Bertha Lutz, RJ
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;É assim que funciona a compactação no Kafka, somente o registro mais recente é mantido. Todos os outros mais antigos e com a mesma &lt;code&gt;key&lt;/code&gt; são eliminados.&lt;/p&gt;

&lt;p&gt;Por exemplo, existem três registros com a chave &lt;code&gt;c5&lt;/code&gt; e o mais recente é &lt;code&gt;Bertha Lutz, RJ&lt;/code&gt;, que foi mantido pela compactação. Note que haviam 8 registros, restando apenas 4.&lt;/p&gt;

&lt;h2&gt;
  
  
  Compressão
&lt;/h2&gt;

&lt;p&gt;A compressão pode ser feita através de uma configuração nos tópicos ou nos produtores.&lt;/p&gt;

&lt;p&gt;No &lt;strong&gt;tópico&lt;/strong&gt; a compressão é ativada através da configuração &lt;code&gt;compression.type&lt;/code&gt;, que por padrão está definida como &lt;code&gt;producer&lt;/code&gt;. Isso significa que a compressão empregada pelos produtores será mantida e repassada aos consumidores, mesmo que não exista compressão alguma.&lt;/p&gt;

&lt;p&gt;Os valores possíveis para &lt;code&gt;compression.type&lt;/code&gt; são:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;uncompressed&lt;/code&gt;: mesmo que os produtores enviem dados comprimidos, eles serão persistidos no seu tamanho original, ou seja, sem compressão.&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://facebook.github.io/zstd/"&gt;&lt;code&gt;zstd&lt;/code&gt;&lt;/a&gt;: algoritmo de compressão criado pelo Facebook.&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://github.com/lz4/lz4"&gt;&lt;code&gt;lz4&lt;/code&gt;&lt;/a&gt;: a compressão mais recomendada pelos especialistas.&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://github.com/google/snappy"&gt;&lt;code&gt;snappy&lt;/code&gt;&lt;/a&gt;: algoritmo criado pelo Google.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;gzip&lt;/code&gt;: ótimos níveis de compressão, usando com mais intensidade a cpu.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;producer&lt;/code&gt;: mantém a compressão do produtor, se existir.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;E no &lt;strong&gt;produtor&lt;/strong&gt; também existe a configuração &lt;code&gt;compression.type&lt;/code&gt;, que possuí os mesmos valores, com exceção de &lt;code&gt;uncompressed&lt;/code&gt; e &lt;code&gt;producer&lt;/code&gt;. Adicionalmente, existe a opção &lt;code&gt;none&lt;/code&gt; para não aplicar compressão, que também é o valor padrão.&lt;/p&gt;




&lt;p&gt;Comprimir dados no produtor é interessante porque reduz o uso da banda de rede e de espaço em disco e cpu nos brokers.&lt;/p&gt;

&lt;p&gt;E a compressão traz bons resultados quando são produzidos lotes de dados, pois ela não é aplicada em nível de registro. Ou seja, tanto no produtor, quando no Kafka, a compressão não é em cima de cada mensagem ou evento.&lt;/p&gt;




&lt;p&gt;Já nos consumidores, nenhuma configuração especial é necessária, porque através do protocolo Kafka ele verifica se há compressão e emprega o mesmo algoritmo.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>kafkabr</category>
      <category>confluent</category>
    </item>
    <item>
      <title>Kafka: Garantia de Ordem</title>
      <dc:creator>Fabio José</dc:creator>
      <pubDate>Sat, 11 Apr 2020 11:09:24 +0000</pubDate>
      <link>https://dev.to/kafkabr/garantia-de-ordem-6b3</link>
      <guid>https://dev.to/kafkabr/garantia-de-ordem-6b3</guid>
      <description>&lt;p&gt;Existem casos de uso onde a manutenção da ordem em que os dados foram produzidos é um requisito. Há outros em que isso não importa.&lt;/p&gt;




&lt;p&gt;&lt;a href="https://martinfowler.com/eaaDev/EventSourcing.html"&gt;Event Sourcing&lt;/a&gt; é um exemplo onde a manutenção da ordem importa, caso contrário o estado final seria inconsistente.&lt;/p&gt;




&lt;p&gt;Kafka, através das configurações corretas, pode nos ajudar em qualquer um deles.&lt;/p&gt;

&lt;h2&gt;
  
  
  Como é a ordenação no Kafka
&lt;/h2&gt;

&lt;p&gt;A ordem é garantida em nível de partição, ou seja, tópicos com várias partições não possuem ordenação global dos dados.&lt;/p&gt;

&lt;p&gt;Bem, dado um lote de registros produzidos pelo produtor &lt;code&gt;p1&lt;/code&gt; que tem como destino a partição &lt;code&gt;4&lt;/code&gt; no tópico &lt;code&gt;t1&lt;/code&gt;, haverá garantia de que a ordem produzida será mantida pelo Kafka e repassada aos consumidores.&lt;/p&gt;

&lt;p&gt;Desse modo se houver um produtor &lt;code&gt;p2&lt;/code&gt; também produzindo lotes de registros no mesmo destino de &lt;code&gt;p1&lt;/code&gt;, nenhuma verificação de ordem será realizada entre os lotes destes produtores, prevalecendo àquele que primeiro chegar.&lt;/p&gt;

&lt;p&gt;Mas existem situações e configurações que podem interferir e mudar a ordenação dos registros criados por um determinado produtor &lt;code&gt;p&lt;/code&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  O que pode mudar a ordenação
&lt;/h2&gt;

&lt;p&gt;Há uma dezena de configurações que podem ser ajustadas para determinar como será o comportamento do nosso produtor. E muitas já possuem valores padrão que atendem alguns casos de uso, como:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;retries: 2&lt;sup&gt;16&lt;/sup&gt;

&lt;ul&gt;
&lt;li&gt;número de retentativas&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;max.in.flight.requests.per.connection: 5 

&lt;ul&gt;
&lt;li&gt;requisições que ainda não receberam ack, ou seja, estão em voo.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;acks: 1

&lt;ul&gt;
&lt;li&gt;reconhecimento de recebimento e não&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;O valor padrão de retentativas, definido por &lt;code&gt;retries&lt;/code&gt;, é bem grande e elas são realizadas para todos os problemas relativamente recuperáveis, como uma falha na rede.&lt;/p&gt;

&lt;p&gt;E o número de requisições em voo é igual a &lt;code&gt;5&lt;/code&gt;. E isso poderá causar a troca de ordem.&lt;/p&gt;

&lt;p&gt;Digamos que se produz o primeiro lote de registros, &lt;code&gt;l1&lt;/code&gt;, que fica em voo aguardando reconhecimento. Em seguida o mesmo produtor envia o segundo lote, &lt;code&gt;l2&lt;/code&gt;. Pela ordenação, &lt;code&gt;l1&lt;/code&gt; contém registros que estão antes daqueles em &lt;code&gt;l2&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;Essa é a ordem criada pelo produtor:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;l1-&amp;gt;1,2,3,4
l2-&amp;gt;5,6,7
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Espera-se que seja mantida:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;1,2,3,4,5,6,7
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Porém, se &lt;code&gt;l1&lt;/code&gt; for para retentativa,  &lt;code&gt;l2&lt;/code&gt; for efetivado e &lt;code&gt;l1&lt;/code&gt; efetivado em seguida, a ordenação muda, não prevalecendo àquela criada no produtor.&lt;/p&gt;

&lt;p&gt;Realidade como descrito acima:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;5,6,7,1,2,3,4
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h2&gt;
  
  
  Configuração correta
&lt;/h2&gt;

&lt;p&gt;Felizmente é possível adequar as configurações do produtor e eliminar as chances da perda de ordem:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;max.in.flight.requests.per.connection&lt;/code&gt; deverá ser configurado para que apenas uma requisição fique em voo, ou seja, &lt;code&gt;1&lt;/code&gt;.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Isso certamente irá diminuir a taxa de transferência, não de maneira drástica. Mas agora a ordenação está garantida, mesmo em situações de retentativas.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>kafkabr</category>
      <category>confluent</category>
    </item>
    <item>
      <title>Monitorando Consumer Lag do Apache Kafka</title>
      <dc:creator>Álvaro Bacelar</dc:creator>
      <pubDate>Sun, 29 Mar 2020 20:07:05 +0000</pubDate>
      <link>https://dev.to/kafkabr/monitorando-consumer-lag-do-apache-kafka-2o1d</link>
      <guid>https://dev.to/kafkabr/monitorando-consumer-lag-do-apache-kafka-2o1d</guid>
      <description>&lt;p&gt;O Apache Kafka tem milhares de métricas que podem ser acessadas via interface JMX (&lt;a href="https://medium.com/@alvarobacelar/monitorando-um-cluster-kafka-com-ferramentas-open-source-a4032836dc79?source=friends_link&amp;amp;sk=2e2ae34d66935565a59932b80099dfc1" rel="noopener noreferrer"&gt;nesse artigo&lt;/a&gt; eu mostro como instrumentar tais métricas utilizando ferramentas Open Source). Contudo há uma métrica, não menos importante, que não está disponível via JMX: O Consumer Lag.&lt;/p&gt;

&lt;p&gt;Mensurada em número de mensagem, basicamente o Lag é a diferença entre a última mensagem produzida em uma partição especifica e a última mensagem processada (&lt;em&gt;committed&lt;/em&gt;) pelo consumidor.&lt;/p&gt;

&lt;p&gt;Segundo o livro &lt;a href="https://www.confluent.io/resources/kafka-the-definitive-guide/" rel="noopener noreferrer"&gt;&lt;em&gt;"kafka - The Definitive Guide"&lt;/em&gt;&lt;/a&gt; o método preferido para monitorar o Lag é através de uma aplicação externa que possa visualizar o estado de uma partição no broker, rastreando o offset mais recente da mensagem produzida e o último offset processado pelo consumidor, atraves do topico interno de controle: __consumer_offsets&lt;/p&gt;

&lt;p&gt;Levando isso em consideração, o LinkedIn (empresa responsável pela criação do Apache Kafka) desenvolveu (também) uma aplicação que realiza essa rastreabilidade dos offsets das partições dos brokers. Essa aplicação é o &lt;a href="https://github.com/linkedin/Burrow" rel="noopener noreferrer"&gt;Burrow&lt;/a&gt;.&lt;/p&gt;

&lt;p&gt;O Burrow é uma ferramenta que foi escrita na linguagem &lt;a href="https://golang.org/" rel="noopener noreferrer"&gt;Go&lt;/a&gt; e ela nos provê tanto informações de Consumer Lag quanto informações diversas do cluster e disponibiliza tais valores via REST API. Com o Burrow ainda é possível enviar notificações via email ou WebHook com um threshold definido.&lt;/p&gt;

&lt;p&gt;Como falei acima, o Burrow provê essas informações via REST API e nosso objetivo aqui é fazer essa informação chegar no Prometheus e consequentemente no Grafana.&lt;/p&gt;

&lt;p&gt;Então irei mostrar como instalar e configurar o Burrow juntamente com o &lt;a href="https://github.com/alvarobacelar/burrow_exporter" rel="noopener noreferrer"&gt;burrow_exporter&lt;/a&gt; (também escrito em Go) para monitorar o Consumer Lag de um cluster Apache Kafka e ter todas as informações do Lag, entre outras, no Prometheus.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Para realizar todos os testes desse artigo, eu realizei o setup de um cluster Apache Kafka com o Ansible. &lt;a href="https://medium.com/@alvarobacelar/setup-de-cluster-apache-kafka-com-ansible-df62c8b1017b?source=friends_link&amp;amp;sk=5839522ba2bea31a39d527d523592f97" rel="noopener noreferrer"&gt;Clicando aqui&lt;/a&gt; você verá um artigo que escrevi mostrando como realizar um setup de um cluster Apache Kafka e Zookeeper com o Ansible. &lt;/p&gt;
&lt;/blockquote&gt;

&lt;h3&gt;
  
  
  Instalando e Configurando o Prometheus
&lt;/h3&gt;

&lt;p&gt;Para instalar e configurar o Prometheus vamos seguir os seguintes passos: &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Baixar a versão mais estável do &lt;a href="https://prometheus.io/download/" rel="noopener noreferrer"&gt;Prometheus&lt;/a&gt;
```
&lt;/li&gt;
&lt;/ul&gt;

&lt;h1&gt;
  
  
  wget &lt;a href="https://github.com/prometheus/prometheus/releases/download/v2.12.0/prometheus-2.12.0.linux-amd64.tar.gz" rel="noopener noreferrer"&gt;https://github.com/prometheus/prometheus/releases/download/v2.12.0/prometheus-2.12.0.linux-amd64.tar.gz&lt;/a&gt;
&lt;/h1&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;- Descompactando no diretório /srv
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;h1&gt;
  
  
  tar -zxvf prometheus-2.12.0.linux-amd64.tar.gz -C /srv/
&lt;/h1&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;- Adicionar usuário de serviço do prometheus
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;h1&gt;
  
  
  useradd -s /usr/sbin/nologin prometheus
&lt;/h1&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;- Mudar o dono da pasta do prometheus
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;h1&gt;
  
  
  chown prometheus:prometheus /srv/prometheus-2.12.0.linux-amd64/ -R
&lt;/h1&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;- Adicionar o arquivo de service do prometheus
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;h1&gt;
  
  
  vim /etc/systemd/system/prometheus.service
&lt;/h1&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;[Unit]&lt;br&gt;
Description=Prometheus&lt;br&gt;
After=network-online.target&lt;br&gt;
[Service]&lt;br&gt;
Type=simple&lt;br&gt;
User=prometheus&lt;br&gt;
Group=prometheus&lt;br&gt;
ExecReload=/bin/kill -HUP $MAINPID&lt;br&gt;
ExecStart=/srv/prometheus-2.12.0.linux-amd64/prometheus --config.file=/srv/prometheus-2.12.0.linux-amd64/prometheus.yml --storage.tsdb.path=/srv/prometheus-2.12.0.linux-amd64/data --web.listen-address=0.0.0.0:9090&lt;br&gt;
LimitNOFILE=65000&lt;br&gt;
LockPersonality=true&lt;br&gt;
NoNewPrivileges=true&lt;br&gt;
MemoryDenyWriteExecute=true&lt;br&gt;
PrivateDevices=true&lt;br&gt;
PrivateTmp=true&lt;br&gt;
ProtectHome=true&lt;br&gt;
RemoveIPC=true&lt;br&gt;
RestrictSUIDSGID=true&lt;br&gt;
ProtectSystem=full&lt;br&gt;
SyslogIdentifier=prometheus&lt;br&gt;
Restart=always&lt;br&gt;
[Install]&lt;br&gt;
WantedBy=multi-user.targe&lt;/p&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;- Executar o daemon-reload e iniciar o serviço
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;h1&gt;
  
  
  systemctl daemon-reload
&lt;/h1&gt;
&lt;h1&gt;
  
  
  systemctl start prometheus
&lt;/h1&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
Se tudo tiver dado certo o serviço subiu na porta 9090 e podemos acessar nessa porta: 
http://127.0.0.1:9090

### Instalando o Grafana

A instalação do Grafana é mais fácil, para instala-lo basta acessar o seguinte link https://grafana.com/grafana/download e seguir os passos descrito no link de acordo com seu S.O. 

Depois de instalado suba o serviço
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;h1&gt;
  
  
  systemctl start grafana-server
&lt;/h1&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
O Grafana por padrão sobe na porta 3000, então vamos acessa-lo:
http://127.0.0.1:3000

O usuário e senha default do Grafana é admin, no seu primeiro acesso é solicitado para trocar.

Após acessar o Grafana, você deve adicionar o datasource do Prometheus.

### Instalando e configurando o Burrow

O primeiro passo para instalar o Burrow é realizar o download do código fonte, para isso vá ao repositório oficial do Burrow no link abaixo:

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;$ git clone &lt;a href="https://github.com/linkedin/Burrow.git" rel="noopener noreferrer"&gt;https://github.com/linkedin/Burrow.git&lt;/a&gt;&lt;/p&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
Com o código fonte do Burrow, vamos entrar no diretório e *buildar* a imagem Docker: 
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;


&lt;p&gt;$ cd ${PWD}/Burrow&lt;br&gt;
$ docker build -t burrow-api .&lt;/p&gt;


&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;


&amp;gt;Nesse post irei focar na instalação do Burrow via [Docker](https://www.docker.com/). Para aqueles que preferirem executar o Burrow sem o uso do Docker, é só seguir os passos que estão descritos no [README.rd](https://github.com/linkedin/Burrow#build-and-install) do repositório do Burrow.

Com a imagem do Burrow *buildada* vamos agora realizar o mesmo procedimento com a imagem do burrow_exporter. Baixe o código fonte no link abaixo: 

&lt;div class="ltag-github-readme-tag"&gt;
  &lt;div class="readme-overview"&gt;
    &lt;h2&gt;
      &lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev.to%2Fassets%2Fgithub-logo-5a155e1f9a670af7944dd5e12375bc76ed542ea80224905ecaf878b9157cdefc.svg" alt="GitHub logo"&gt;
      &lt;a href="https://github.com/alvarobacelar" rel="noopener noreferrer"&gt;
        alvarobacelar
      &lt;/a&gt; / &lt;a href="https://github.com/alvarobacelar/burrow_exporter" rel="noopener noreferrer"&gt;
        burrow_exporter
      &lt;/a&gt;
    &lt;/h2&gt;
    &lt;h3&gt;
      A Prometheus Exporter for gathering Kafka consumer group info from Burrow
    &lt;/h3&gt;
  &lt;/div&gt;
&lt;/div&gt;

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;git clone &lt;a href="https://github.com/alvarobacelar/burrow_exporter.git" rel="noopener noreferrer"&gt;https://github.com/alvarobacelar/burrow_exporter.git&lt;/a&gt;&lt;/p&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
&amp;gt; O repositório acima é um fork do projeto [burrow_exporter](https://github.com/shamil/burrow_exporter) criado por Shamil. Eu solicitei um [Pull Request](https://github.com/shamil/burrow_exporter/pull/1) há um tempo para adicionar alguns recursos extras. Mas o dono do repositório nunca respondeu, então vamos seguir usando o projeto que *forkei*, pois ele retorna um valor a mais que a API do Burrow nos disponibiliza (iremos ver isso logo mais na frente).

Entrando na pasta do do burrow_exporter, que acabamos de baixar, vamos *buildar* a imagem Docker:

&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;$ docker build -t burrow-exporter . &lt;/p&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;
Agora que temos as imagens do Burrow e burrow_exporter criadas, vamos 
criar o arquivo docker-compose.yml para subirmos as duas aplicações posteriormente. O arquivo deve conter o seguinte conteúdo:
```yaml


version: "2"
services:
  burrow:
    image: burrow-api
    volumes:
      - ${PWD}/config:/etc/burrow/
    container_name: burrow_api

  burrow_exporter:
    image: burrow-exporter
    container_name: burrow_exporter
    ports:
      - 8090:8237
    depends_on:
      - burrow
    command: --burrow.address http://burrow:8000


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Antes de subir os containers precisamos adicionar os servidores do kafka e zookeeper no arquivo de configuração do Burrow. Dentro do diretório da API do Burrow vamos editar o arquivo &lt;em&gt;burrow.toml&lt;/em&gt;&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;

&lt;span class="nv"&gt;$ &lt;/span&gt;vim &lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;PWD&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;/Burrow/config/burrow.toml


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;No arquivo vamos alterar os endereços dos brokers, Zookeeper e outros parâmetros:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight toml"&gt;&lt;code&gt;

&lt;span class="nn"&gt;[general]&lt;/span&gt;
&lt;span class="py"&gt;pidfile&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"burrow.pid"&lt;/span&gt;
&lt;span class="py"&gt;stdout-logfile&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"burrow.out"&lt;/span&gt;
&lt;span class="py"&gt;access-control-allow-origin&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"mysite.example.com"&lt;/span&gt;

&lt;span class="nn"&gt;[logging]&lt;/span&gt;
&lt;span class="py"&gt;filename&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"logs/burrow.log"&lt;/span&gt;
&lt;span class="py"&gt;level&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"info"&lt;/span&gt;
&lt;span class="py"&gt;maxsize&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;100&lt;/span&gt;
&lt;span class="py"&gt;maxbackups&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;30&lt;/span&gt;
&lt;span class="py"&gt;maxage&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;
&lt;span class="py"&gt;use-localtime&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="kc"&gt;false&lt;/span&gt;
&lt;span class="py"&gt;use-compression&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="kc"&gt;true&lt;/span&gt;

&lt;span class="nn"&gt;[zookeeper]&lt;/span&gt;
&lt;span class="py"&gt;servers&lt;/span&gt;&lt;span class="p"&gt;=[&lt;/span&gt; &lt;span class="s"&gt;"zkhost01.example.com:2181"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"zkhost02.example.com:2181"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"zkhost03.example.com:2181"&lt;/span&gt; &lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="c"&gt;# altere para os edereços de seus zookeepers&lt;/span&gt;
&lt;span class="py"&gt;timeout&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;6&lt;/span&gt;
&lt;span class="py"&gt;root-path&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"/burrow"&lt;/span&gt;

&lt;span class="c"&gt;############################&lt;/span&gt;
&lt;span class="c"&gt;####### CLUSTER zoom #######&lt;/span&gt;
&lt;span class="c"&gt;############################&lt;/span&gt;
&lt;span class="c"&gt;# altere o nome do profile do cliente caso queira&lt;/span&gt;
&lt;span class="nn"&gt;[client-profile.post]&lt;/span&gt;
&lt;span class="py"&gt;client-id&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"burrow-monitor"&lt;/span&gt;
&lt;span class="py"&gt;kafka-version&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"2.3.0"&lt;/span&gt; &lt;span class="c"&gt;# altere para a versão 2.3.0 do Kafka&lt;/span&gt;

&lt;span class="c"&gt;# Dê um nome para o seu cluster [cluster.nome]&lt;/span&gt;
&lt;span class="nn"&gt;[cluster.zoom]&lt;/span&gt;
&lt;span class="py"&gt;class-name&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"kafka"&lt;/span&gt;
&lt;span class="py"&gt;servers&lt;/span&gt;&lt;span class="p"&gt;=[&lt;/span&gt; &lt;span class="s"&gt;"kafka01.example.com:10251"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"kafka02.example.com:10251"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"kafka03.example.com:10251"&lt;/span&gt; &lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="c"&gt;# altere para os endereços dos seus brokers&lt;/span&gt;
&lt;span class="py"&gt;client-profile&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"post"&lt;/span&gt; &lt;span class="c"&gt;# coloque aqui o nome do client-profile definido acima &lt;/span&gt;
&lt;span class="py"&gt;topic-refresh&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;120&lt;/span&gt;
&lt;span class="py"&gt;offset-refresh&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;30&lt;/span&gt;

&lt;span class="c"&gt;# Dê um nome para o seu cluster [cluster.nome]&lt;/span&gt;
&lt;span class="nn"&gt;[consumer.zoom]&lt;/span&gt;
&lt;span class="py"&gt;class-name&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"kafka"&lt;/span&gt;
&lt;span class="py"&gt;cluster&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"zoom"&lt;/span&gt; &lt;span class="c"&gt;# coloque aqui o nome do cluster definido&lt;/span&gt;
&lt;span class="py"&gt;servers&lt;/span&gt;&lt;span class="p"&gt;=[&lt;/span&gt; &lt;span class="s"&gt;"kafka01.example.com:10251"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"kafka02.example.com:10251"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s"&gt;"kafka03.example.com:10251"&lt;/span&gt; &lt;span class="p"&gt;]&lt;/span&gt; &lt;span class="c"&gt;# altere para os endereços dos seus brokers&lt;/span&gt;
&lt;span class="py"&gt;client-profile&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"post"&lt;/span&gt; &lt;span class="c"&gt;# coloque aqui o nome do client-profile definido &lt;/span&gt;
&lt;span class="py"&gt;group-blacklist&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"^(console-consumer-|python-kafka-consumer-|quick-).*$"&lt;/span&gt; &lt;span class="c"&gt;# coloque aqui os nomes dos consumer groups que não quer que apareça&lt;/span&gt;
&lt;span class="py"&gt;group-whitelist&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;""&lt;/span&gt;
&lt;span class="c"&gt;################################&lt;/span&gt;
&lt;span class="c"&gt;####### FIM CLUSTER zoom #######&lt;/span&gt;
&lt;span class="c"&gt;################################&lt;/span&gt;
&lt;span class="c"&gt;# Repita isso para quantos clusters tiver&lt;/span&gt;

&lt;span class="nn"&gt;[httpserver.default]&lt;/span&gt;
&lt;span class="py"&gt;address&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;":8000"&lt;/span&gt;

&lt;span class="nn"&gt;[storage.default]&lt;/span&gt;
&lt;span class="py"&gt;class-name&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;"inmemory"&lt;/span&gt;
&lt;span class="py"&gt;workers&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;20&lt;/span&gt;
&lt;span class="py"&gt;intervals&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;15&lt;/span&gt;
&lt;span class="py"&gt;expire-group&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;604800&lt;/span&gt;
&lt;span class="py"&gt;min-distance&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;

&lt;span class="c"&gt;## Vamos deixar comentado por enquanto&lt;/span&gt;
&lt;span class="c"&gt;# [notifier.default]&lt;/span&gt;
&lt;span class="c"&gt;# class-name="http"&lt;/span&gt;
&lt;span class="c"&gt;# url-open="http://someservice.example.com:1467/v1/event"&lt;/span&gt;
&lt;span class="c"&gt;# interval=60&lt;/span&gt;
&lt;span class="c"&gt;# timeout=5&lt;/span&gt;
&lt;span class="c"&gt;# keepalive=30&lt;/span&gt;
&lt;span class="c"&gt;# extras={ api_key="REDACTED", app="burrow", tier="STG", fabric="mydc" }&lt;/span&gt;
&lt;span class="c"&gt;# template-open="conf/default-http-post.tmpl"&lt;/span&gt;
&lt;span class="c"&gt;# template-close="conf/default-http-delete.tmpl"&lt;/span&gt;
&lt;span class="c"&gt;# method-close="DELETE"&lt;/span&gt;
&lt;span class="c"&gt;# send-close=true&lt;/span&gt;
&lt;span class="c"&gt;# threshold=1&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Com o arquivo docker-compose.yml criado e o arquivo de configuração burrow.toml alterado com os endereços de seus clusters, podemos subir as duas aplicações: &lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;

&lt;span class="nv"&gt;$ &lt;/span&gt;docker-compose &lt;span class="nt"&gt;-f&lt;/span&gt; docker-compose up &lt;span class="nt"&gt;-d&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Agora que temos os dois containers rodando, vamos configurar o Prometheus para que este capture as métricas.&lt;/p&gt;

&lt;p&gt;No arquivo de configuração do Prometheus (vim /srv/prometheus-2.12.0.linux-amd64/prometheus.yml), você deve adicionar as seguintes linhas:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;

&lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;job_name&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;burrow'&lt;/span&gt;
    &lt;span class="na"&gt;static_configs&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="na"&gt;targets&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="s1"&gt;'&lt;/span&gt;&lt;span class="s"&gt;127.0.0.1:8090'&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;blockquote&gt;
&lt;p&gt;O IP &lt;em&gt;127.0.0.1&lt;/em&gt; deve ser substituído pelo o IP do servidor que está executando o container que subimos acima.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Para o Prometheus reconhecer tais configurações precisamos realizar um reload no serviço:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;

# systemctl reload prometheus


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Agora vamos importar o dashboard no Grafana. O link abaixo é o dash específico para essas métricas:&lt;br&gt;
&lt;a href="https://grafana.com/grafana/dashboards/11963" rel="noopener noreferrer"&gt;https://grafana.com/grafana/dashboards/11963&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Quando tiver importando o dash acima e tudo tiver ocorrido bem você verá algo parecido com imagem abaixo:&lt;br&gt;
&lt;a href="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2F9ssukt9dwyzfwqfq1wsm.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fi%2F9ssukt9dwyzfwqfq1wsm.png" alt="Alt Text"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Lembram que eu havia comentado acima que iriamos utilizar o repositório que eu &lt;em&gt;forkei&lt;/em&gt;? Pois bem, se você analisar o dash que importamos vai ver que tem um campo na tabela chamado &lt;em&gt;Consumer client&lt;/em&gt;. Foi esse campo que adicionei no PR que abri e o dono do repositório nunca aceitou. Com essa informação sabemos qual é o IP do servidor que está consumindo naquela especifica partição.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Conclusão
&lt;/h2&gt;

&lt;p&gt;O sucesso para que o seu cluster Apache Kafka (ou qualquer outra aplicação) esteja 100% disponível e seja 100% confiável é ter uma stack madura de monitoração. Como mostrado nesse artigo, não é necessário gastar uma &lt;em&gt;bala&lt;/em&gt; com licenças em softwares de monitoração achando que tal software vai fazer mágica para você. Com ferramentas Open Source somos capaz de tirar o melhor que cada uma tem a oferecer e montar uma stack muito madura de monitoração e alertas. &lt;/p&gt;

</description>
      <category>kafka</category>
      <category>monitoring</category>
    </item>
    <item>
      <title>Kafka: quantas partições por tópico?</title>
      <dc:creator>Fabio José</dc:creator>
      <pubDate>Sun, 01 Mar 2020 19:19:22 +0000</pubDate>
      <link>https://dev.to/kafkabr/kafka-quantas-particoes-por-topico-537c</link>
      <guid>https://dev.to/kafkabr/kafka-quantas-particoes-por-topico-537c</guid>
      <description>&lt;p&gt;Umas das grandes dúvidas ao utilizar o Kafka é saber quantas partições são necessárias ao criar novos tópicos. Bem, não existe uma fórmula geral, o que temos é uma aproximação detalhada neste excelente artigo &lt;a href="https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster/"&gt;How to choose the number of topics/partitions in a Kafka cluster?&lt;/a&gt;, escrito por Jun Rao. Outro artigo muito relevante é o &lt;a href="https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines"&gt;Benchmarking Apache Kafka: 2 Million Writes Per Second&lt;/a&gt;, escrito por Jay Kreps. Nele são revelados resultados importantes sobre a taxa de transferência para &lt;em&gt;producers&lt;/em&gt; e &lt;em&gt;consumers&lt;/em&gt;.&lt;/p&gt;

&lt;p&gt;Então, com base no artigo de Jun Rao, temos a fórmula aproximada para determinar o número de partições:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;  MAX(t/p, t/c)
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Onde:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;t&lt;/code&gt;: taxa de transferência desejada&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;p&lt;/code&gt;: taxa de transferência do &lt;em&gt;producer&lt;/em&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;c&lt;/code&gt;: taxa de transferência do &lt;em&gt;consumer&lt;/em&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Como o artigo sugere, o valor para a taxa de transferência do &lt;em&gt;consumer&lt;/em&gt; depende de como ele processa os registros e por esse motivo devemos realizar nossas próprias medições ao invés de utilizar o valor-base descrito no artigo de Jay Kreps. Já para o valor referente ao &lt;em&gt;producer&lt;/em&gt;, podemos tomar como base àquele revelado pelo artigo.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Uma dica é você realizar todas as medições, assim você também entenderá como é a sua infra kafka.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Aplicando a fórmula
&lt;/h2&gt;

&lt;p&gt;Primeiro temos de definir qual é a unidade da nossa taxa de transferência, que poderia ser MB/s ou Mensagens/s. Mas as mensagens tem tamanhos variados e utilizá-las nas medições talvez não conduza a resultados realistas, então, &lt;code&gt;MB/s&lt;/code&gt; é uma boa unidade de medida para aplicação da fórmula.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Unidade: &lt;code&gt;MB/s&lt;/code&gt; (megabytes por segundo)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;A medição é realizada empregando um &lt;em&gt;producer&lt;/em&gt; e um tópico com apenas uma partição. Então, digamos que o resultado para nosso &lt;code&gt;p&lt;/code&gt;, foi:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;89 MB/s
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;A medição para taxa de transferência do &lt;em&gt;consumer&lt;/em&gt; é similar, ou seja, apenas um tópico com uma única partição. Então, digamos que o valor de &lt;code&gt;c&lt;/code&gt; é:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;75 MB/s
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Imagine que nosso alvo com relação a taxa de transferência seja &lt;code&gt;450 MB/s&lt;/code&gt;. Aplicando a fórmula de aproximação temos:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;p = 89
c = 75
t = 450

  MAX(450/89, 450/75)

  MAX(5.1, 6) = 6
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Portanto, o número de partições é &lt;code&gt;6&lt;/code&gt;.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Notadamente, não trata-se de uma fórmula para qualquer caso de uso, porém, agora temos um ponto de partida e não somente números mágicos.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Realize testes e coloque nos comentários quais foram os resultados, suas observações são valiósas.&lt;/p&gt;

&lt;p&gt;Até o próximo artigo!&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>confluent</category>
      <category>brasil</category>
      <category>kafkabr</category>
    </item>
  </channel>
</rss>
