<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Fabio José</title>
    <description>The latest articles on DEV Community by Fabio José (@fabiojose).</description>
    <link>https://dev.to/fabiojose</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F49441%2F45b3991e-47d4-4c58-a857-76a451674f6b.jpeg</url>
      <title>DEV Community: Fabio José</title>
      <link>https://dev.to/fabiojose</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/fabiojose"/>
    <language>en</language>
    <item>
      <title>Cotas de Cliente no Apache Kafka® 2.6.0</title>
      <dc:creator>Fabio José</dc:creator>
      <pubDate>Mon, 14 Sep 2020 08:53:39 +0000</pubDate>
      <link>https://dev.to/kafkabr/cotas-de-cliente-no-apache-kafka-2-6-0-o61</link>
      <guid>https://dev.to/kafkabr/cotas-de-cliente-no-apache-kafka-2-6-0-o61</guid>
      <description>&lt;h3&gt;
  
  
  Artigo publicado no Blog oficial Kafka BR
&lt;/h3&gt;

&lt;p&gt;Acesse: &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://blog.kafkabr.com/posts/kafka-260-client-quotas/"&gt;https://blog.kafkabr.com/posts/kafka-260-client-quotas/&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>kafka</category>
      <category>apachekafka</category>
      <category>kafkabr</category>
    </item>
    <item>
      <title>Transações no Apache Kafka®</title>
      <dc:creator>Fabio José</dc:creator>
      <pubDate>Mon, 14 Sep 2020 08:51:44 +0000</pubDate>
      <link>https://dev.to/kafkabr/transacoes-no-apache-kafka-1ik3</link>
      <guid>https://dev.to/kafkabr/transacoes-no-apache-kafka-1ik3</guid>
      <description>&lt;h2&gt;
  
  
  Artigo publicado no Blog oficial Kafka BR.
&lt;/h2&gt;

&lt;p&gt;Acesse:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://blog.kafkabr.com/posts/transacoes"&gt;https://blog.kafkabr.com/posts/transacoes&lt;/a&gt;&lt;/li&gt;
&lt;/ul&gt;

</description>
      <category>kafka</category>
      <category>kafkabr</category>
      <category>java</category>
    </item>
    <item>
      <title>Kafka: produtores idempotentes</title>
      <dc:creator>Fabio José</dc:creator>
      <pubDate>Thu, 28 May 2020 01:25:58 +0000</pubDate>
      <link>https://dev.to/kafkabr/kafka-produtores-idempotentes-12ok</link>
      <guid>https://dev.to/kafkabr/kafka-produtores-idempotentes-12ok</guid>
      <description>&lt;p&gt;&lt;em&gt;"...&lt;a href="https://pt.m.wikipedia.org/wiki/Idempot%C3%AAncia"&gt;idempotência&lt;/a&gt; é a propriedade que algumas operações têm de poderem ser aplicadas várias vezes sem que o valor do resultado se altere após a aplicação inicial."&lt;/em&gt;&lt;/p&gt;




&lt;p&gt;Através de uma configuração simples no produtor Kafka, pode-se alcançar a idempotência na produção de eventos, eliminando completamente a possibilidade de duplicatas nos tópicos.&lt;/p&gt;




&lt;p&gt;Uma das grandes qualidades do Kafka é a possibilidade de configurar a confiabilidade no produtor, que permite ajustes finos para atender com maestria aos requisitos não-funcionais dos casos de uso.&lt;/p&gt;

&lt;p&gt;Uma dessas possibilidades é o produtor idempotente. Com ele existe a garantia de que não haverá duplicatas em nível de partição.&lt;/p&gt;

&lt;p&gt;Essa garantia entra em ação quando a seguinte propriedade é definida como &lt;code&gt;true&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight properties"&gt;&lt;code&gt;&lt;span class="py"&gt;enable.idempotence&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;true&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Ao habilitar a idempotência no produtor, também será mandatório definir as seguintes configurações:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight properties"&gt;&lt;code&gt;&lt;span class="py"&gt;acks&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;all&lt;/span&gt;
&lt;span class="py"&gt;max.in.flight.requests.per.connection&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;5&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Com este setup, cada requisição que envia lotes de eventos ao Kafka terá um identificador único.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Cada lote na requisição tem como destino uma partição e um tópico&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Ao chegar no kafka, esta requisição será processada e cada lote enviado ao líderes das respectivas partições. Ao terminar isso, o &lt;code&gt;ack&lt;/code&gt; é devolvido ao produtor.&lt;/p&gt;

&lt;p&gt;Mas esta é uma comunicação em rede. Então o atraso em receber o &lt;code&gt;ack&lt;/code&gt; ou qualquer outro problema poderá provocar a retentativa padrão dos produtores Kafka. Assim, ele enviará novamente a requisição identificada.&lt;/p&gt;

&lt;p&gt;O broker ao receber novamente àquela requisição processada com sucesso, apenas devolve o &lt;code&gt;ack&lt;/code&gt;, sem persistir novamente e assim evitando a duplicação.&lt;/p&gt;

&lt;p&gt;Sem o produtor idempotente, o broker receberia a requisição da retentiva como se fosse uma nova, persistindo novamente o lote de eventos, causando duplicatas.&lt;/p&gt;




&lt;p&gt;É isso pessoal!&lt;/p&gt;

&lt;p&gt;Até o próximo!! &lt;/p&gt;

</description>
      <category>kafka</category>
      <category>kafkabr</category>
      <category>java</category>
      <category>apachekafka</category>
    </item>
    <item>
      <title>Apagar eventos do kafka?</title>
      <dc:creator>Fabio José</dc:creator>
      <pubDate>Mon, 25 May 2020 16:23:28 +0000</pubDate>
      <link>https://dev.to/kafkabr/apagar-eventos-do-kafka-jd4</link>
      <guid>https://dev.to/kafkabr/apagar-eventos-do-kafka-jd4</guid>
      <description>&lt;p&gt;Você já deve ter-se perguntado se é possível apagar eventos do Kafka, não? Bem, isso realmente não é possível.&lt;/p&gt;

&lt;p&gt;Mas existe uma forma especial utilizada principalmente para atender o direito ao esquecimento, requerido pela LGPD em alguns países.&lt;/p&gt;

&lt;p&gt;⚠️ Atenção! ⚠️ Entenda todos os impactos antes de alterar qualquer tópico que você já tenha ai no seu cluster Kafka.&lt;/p&gt;




&lt;p&gt;Isso não se trata de um hack ou algo do tipo, até porque ele está descrito no livro &lt;a href="https://www.confluent.io/resources/kafka-the-definitive-guide"&gt;"Kafka: The Definitive Guide"&lt;/a&gt; 😊.&lt;/p&gt;




&lt;p&gt;Primeiro, seu tópico deverá ser compactado, ou seja, a configuração &lt;code&gt;cleanup.policy&lt;/code&gt; deve ser igual a &lt;code&gt;compact&lt;/code&gt;. Ele garantirá que somente o evento mais recente será mantido.&lt;/p&gt;

&lt;p&gt;Em um tópico compactado é mandatório produzir eventos com chave de partição, o atributo &lt;code&gt;key&lt;/code&gt;. É com base nele que o Kafka manterá o evento mais recente.&lt;/p&gt;

&lt;p&gt;Digamos que existe o tópico &lt;code&gt;clientes&lt;/code&gt; com os seguintes eventos na partição &lt;code&gt;3&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CHAVE   VALOR DO EVENTO
C001    Cliente 1y
C002    Cliente 2+
C003    Cliente 3@
C001    Cliente 1π
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Veja que a chave &lt;code&gt;C001&lt;/code&gt; tem um evento mais recente, que será mantido após a compactação do tópico:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CHAVE   VALOR DO EVENTO
C002    Cliente 2+
C003    Cliente 3@
C001    Cliente 1π
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Agora que você entendeu a compactação, para "apagar" um evento, o &lt;code&gt;C003&lt;/code&gt; por exemplo, bastará produzir um evento sem valor com a mesma chave:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CHAVE   VALOR DO EVENTO
C001    Cliente 1y
C002    Cliente 2+
C003    Cliente 3@
C001    Cliente 1π
C003
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Então, após compactação do tópico, o cliente &lt;code&gt;C003&lt;/code&gt; deixará de existir:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;CHAVE   VALOR DO EVENTO
C001    Cliente 1y
C002    Cliente 2+
C001    Cliente 1π
C003
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Aqui foi um exemplo, mas a chave de partição poderia ser o CPF, CNPJ ou qualquer outra valor pertinente ao caso se uso.&lt;/p&gt;




&lt;p&gt;Simples, não? Mas não se engane, porque empregar chave de partição não uma regra geral. Então, não mude as configurações do seu tópico antes de entender as consequências.&lt;/p&gt;

&lt;p&gt;Se você já tem um tópico com eventos produzidos sem chave, a primeira coisa que acontecerá quando mudar a &lt;code&gt;cleanup.policy&lt;/code&gt; para &lt;code&gt;compact&lt;/code&gt; é que todos eles serão eliminados 😬. Mudar configurações de tópicos só com muita análise dos impactos.&lt;/p&gt;




&lt;p&gt;É isso. &lt;/p&gt;

&lt;p&gt;Obrigado pelo leitura e até o próximo!&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>apachekafka</category>
      <category>kafkabr</category>
    </item>
    <item>
      <title>Como implementar Dead-letter Topic com Spring Kafka</title>
      <dc:creator>Fabio José</dc:creator>
      <pubDate>Mon, 18 May 2020 11:47:35 +0000</pubDate>
      <link>https://dev.to/kafkabr/como-implementar-dead-letter-topic-com-spring-kafka-585e</link>
      <guid>https://dev.to/kafkabr/como-implementar-dead-letter-topic-com-spring-kafka-585e</guid>
      <description>&lt;p&gt;&lt;em&gt;Dead-letter Topic&lt;/em&gt;, &lt;a href="https://en.m.wikipedia.org/wiki/Dead_letter_queue" rel="noopener noreferrer"&gt;&lt;em&gt;Dead-letter Queue&lt;/em&gt;&lt;/a&gt; ou em bom e velho português: &lt;strong&gt;Tópicos de mensagens não-entregues&lt;/strong&gt;. São tópicos necessários em sistemas distribuídos onde a comunicação é assíncrona e através de brokers como o Kafka.&lt;/p&gt;

&lt;p&gt;Os dados que chegam nestes tópicos passaram por todas as tentativas possíveis para tratamento de erros e já não resta mais nada a ser feito, se não, a intervenção humana. Assim, &lt;strong&gt;não será qualquer erro&lt;/strong&gt; que levará mensagens ou eventos a serem publicados em um tópico &lt;em&gt;dead-letter&lt;/em&gt;.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;No mundo Kafka um tópico dead-letter é destinado aos registros consumidos, que por algum erro irrecuperável não puderam ser processados com sucesso.&lt;/p&gt;
&lt;/blockquote&gt;




&lt;p&gt;Neste artigo será demonstrada uma abordagem para implementar DLT no Kafka, utilizando Java e Spring. Para os impacientes 🧐, estes são os fontes :&lt;/p&gt;


&lt;div class="ltag-github-readme-tag"&gt;
  &lt;div class="readme-overview"&gt;
    &lt;h2&gt;
      &lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev.to%2Fassets%2Fgithub-logo-5a155e1f9a670af7944dd5e12375bc76ed542ea80224905ecaf878b9157cdefc.svg" alt="GitHub logo"&gt;
      &lt;a href="https://github.com/fabiojose" rel="noopener noreferrer"&gt;
        fabiojose
      &lt;/a&gt; / &lt;a href="https://github.com/fabiojose/spring-kafka-dlt-ex" rel="noopener noreferrer"&gt;
        spring-kafka-dlt-ex
      &lt;/a&gt;
    &lt;/h2&gt;
    &lt;h3&gt;
      Dead-letter Topic com Spring Kafka
    &lt;/h3&gt;
  &lt;/div&gt;
&lt;/div&gt;





&lt;h3&gt;
  
  
  Classificar Erros
&lt;/h3&gt;

&lt;p&gt;Antes de escrever qualquer linha de código é necessário classificar os erros e como serão tratados.&lt;/p&gt;

&lt;p&gt;Existem dois tipos de erros: &lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;recuperáveis&lt;/li&gt;
&lt;li&gt;não-recuperáveis&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Os erros &lt;strong&gt;recuperáveis&lt;/strong&gt; ou que podem ser tratados, são aqueles onde  alguma abordagem será empregada para tentar finalizar o fluxo com sucesso.&lt;/p&gt;

&lt;p&gt;Por exemplo, no Java, os erros &lt;a href="https://docs.oracle.com/javase/8/docs/api/java/net/ConnectException.html" rel="noopener noreferrer"&gt;&lt;code&gt;ConnectException&lt;/code&gt;&lt;/a&gt; e &lt;a href="https://docs.oracle.com/javase/8/docs/api/java/net/UnknownHostException.html" rel="noopener noreferrer"&gt;&lt;code&gt;UnknownHostException&lt;/code&gt;&lt;/a&gt; podem ser recuperados através de retentativas, pois normalmente são causados por instabilidades momentâneas no serviço ou na rede.&lt;/p&gt;

&lt;p&gt;Já os &lt;strong&gt;não-recuperáveis&lt;/strong&gt; são aqueles que independente do que seja feito, não será possível finalizar o fluxo com sucesso. Logo, estes são candidatos a seguirem diretamente para o tópico dead-letter. E como exemplo, se você utiliza Avro, o erro &lt;a href="http://avro.apache.org/docs/1.9.2/api/java/index.html" rel="noopener noreferrer"&gt;&lt;code&gt;AvroMissingFieldException&lt;/code&gt;&lt;/a&gt; indica a ausência de um campo requerido, não havendo nada a fazer. Portanto será inútil a retentativa, por exemplo.&lt;/p&gt;




&lt;p&gt;&lt;strong&gt;E além dos erros técnicos, você também deverá classificar seus erros de negócio e escolher uma estratégia para tratá-los.&lt;/strong&gt;&lt;/p&gt;




&lt;p&gt;Bem, agora veremos como implementar DLT para problemas técnicos no Java. E é bem provável que você encontre equivalentes na sua linguagem ou framework.&lt;/p&gt;

&lt;h3&gt;
  
  
  Recuperáveis
&lt;/h3&gt;

&lt;p&gt;Aqui segue uma lista de erros técnicos recuperáveis que, em nome da simplicidade, serão tratados através de retentativas.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="https://docs.oracle.com/javase/8/docs/api/java/net/ConnectException.html" rel="noopener noreferrer"&gt;&lt;code&gt;ConnectException&lt;/code&gt;&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://docs.oracle.com/javase/8/docs/api/java/net/UnknownHostException.html" rel="noopener noreferrer"&gt;&lt;code&gt;UnknownHostException&lt;/code&gt;&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;ProductNotFoundException&lt;/code&gt;: a título de exemplo, este erro de negócio foi definido como tratável. Porque um serviço chamado &lt;em&gt;Catalogo&lt;/em&gt;, que faz &lt;a href="https://martinfowler.com/eaaDev/EventSourcing.html" rel="noopener noreferrer"&gt;event-sourcing&lt;/a&gt; das mudanças emitidas pelo serviço &lt;em&gt;Produto&lt;/em&gt;, ainda não processou o evento com o produto em questão. Mas através da retentativa seu fluxo poderá finalizar com sucesso.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Não-recuperáveis
&lt;/h3&gt;

&lt;p&gt;Estes são alguns dos erros não-recuperáveis, ou seja, se passarem pelo mesmo processo de retentativas somente consumiriam recursos, sem chances de finalizar com sucesso.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;a href="http://avro.apache.org/docs/1.9.2/api/java/index.html" rel="noopener noreferrer"&gt;&lt;code&gt;AvroMissingFieldException&lt;/code&gt;&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://docs.oracle.com/javase/8/docs/api/java/lang/NullPointerException.html" rel="noopener noreferrer"&gt;&lt;code&gt;NullPointerException&lt;/code&gt;&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://docs.oracle.com/javase/8/docs/api/java/lang/ClassCastException.html" rel="noopener noreferrer"&gt;&lt;code&gt;ClassCastException&lt;/code&gt;&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;&lt;a href="https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/common/errors/RecordTooLargeException.html" rel="noopener noreferrer"&gt;&lt;code&gt;RecordTooLargeException&lt;/code&gt;&lt;/a&gt;&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;StockNotAvailableException&lt;/code&gt;: como exemplo, este erro de negócio foi classificado como irrecuperável. Porque a falta de produtos no estoque não será resolvida com retentativas, por exemplo.&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Implementação
&lt;/h3&gt;

&lt;blockquote&gt;
&lt;p&gt;Existe um &lt;a href="https://eng.uber.com/reliable-reprocessing/" rel="noopener noreferrer"&gt;ótimo artigo&lt;/a&gt; no blog de engenharia do Uber que descreve uma arquitetura para dead-letter. Vale a leitura!&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Esta implementação foi dividida em três partes:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;tópicos&lt;/li&gt;
&lt;li&gt;construção&lt;/li&gt;
&lt;li&gt;tratamento&lt;/li&gt;
&lt;/ul&gt;

&lt;h4&gt;
  
  
  Tópicos
&lt;/h4&gt;

&lt;p&gt;Para cada processo que terá tratamento &lt;em&gt;dead-letter&lt;/em&gt;, será criado um tópico com o sufixo &lt;code&gt;-dlt&lt;/code&gt;. Tome como exemplo um processo que realiza a reserva de estoque a partir das ordens de compra publicadas no tópico &lt;code&gt;ordem-compra&lt;/code&gt;. Então, ao invés de criar um tópico ordem-compra-dlq, será utilizado um nome relevante ao processo que está falhando:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;reservar-estoque-dlt&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;E mais os tópicos para retentativas, que devem ser quantos forem necessários até finalmente o registro chegar ao tópico &lt;em&gt;dead-letter&lt;/em&gt;. Digamos que serão no máximo quatro retentativas além da inicial:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;reservar-estoque-retry-1&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;reservar-estoque-retry-2&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;reservar-estoque-retry-3&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;reservar-estoque-retry-4&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Os tópicos devem ter características que não interfiram no andamento das retentativas ou na publicação do tópico dead-letter. Um exemplo é a configuração &lt;code&gt;max.message.bytes&lt;/code&gt;, que pode acarretar um erro chamado &lt;code&gt;message too large&lt;/code&gt;. Portanto os tópicos deverão permitir mensagens maiores, caso contrário também não será possível utilizá-los porque são idênticos ao principal, inclusive com as mesmas limitações.&lt;/p&gt;

&lt;h4&gt;
  
  
  Construção
&lt;/h4&gt;

&lt;p&gt;A construção foi feita com Spring Kafka porque ele já vem com muitas utilidades para tratamento para &lt;em&gt;dead-letter&lt;/em&gt;, o que contribui muito com a produtividade se ela é o foco no seu projeto. Mas nada impede que você escreva a mesma solução com Clientes Kafka no Java ou na sua linguagem do seu projeto.&lt;/p&gt;

&lt;p&gt;Para utilizar os recursos destinados a dead-letter, é necessário customizar a fábrica de objetos encarregada de produzir &lt;em&gt;listeners&lt;/em&gt; Kafka no Spring.&lt;/p&gt;

&lt;p&gt;Assim, a configuração programática foi sub-dividida em três partes:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;resolver&lt;/code&gt;: responsável por determinar o tópico destino do registro que está no contexto do erro.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;errorHandler&lt;/code&gt;: manipulador do erro&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;kafkaListernerContainerFactory&lt;/code&gt;: fabrica instâncias que são utilizadas nos métodos anotados com &lt;code&gt;@KafkaListener&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;E para cada uma das sub-divisões existe uma implementação &lt;strong&gt;Main&lt;/strong&gt; e outra &lt;strong&gt;Retry&lt;/strong&gt;. Como é possível imaginar, uma cuida das configurações para o processamento principal outro para as retentativas.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Main resolver&lt;/strong&gt;, responsável por determinar qual o tópico destino com base no erro-raiz lançado ao processar o registro consumido do tópico &lt;code&gt;ordem-compra&lt;/code&gt;:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@Bean&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;BiFunction&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;TopicPartition&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt;
 &lt;span class="n"&gt;mainResolver&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;BiFunction&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;TopicPartition&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
   &lt;span class="nd"&gt;@Override&lt;/span&gt;
   &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;TopicPartition&lt;/span&gt; &lt;span class="nf"&gt;apply&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="c1"&gt;// ####&lt;/span&gt;
    &lt;span class="c1"&gt;// Por padrão, quando é não-recuperável, segue diretamente p/ dead-letter&lt;/span&gt;
    &lt;span class="nc"&gt;TopicPartition&lt;/span&gt; &lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
     &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;TopicPartition&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dltTopic&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
      &lt;span class="no"&gt;QUALQUER_PARTICAO&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

    &lt;span class="c1"&gt;// ####&lt;/span&gt;
    &lt;span class="c1"&gt;// Trata-se de um erro recuperável?&lt;/span&gt;
    &lt;span class="kd"&gt;final&lt;/span&gt; &lt;span class="kt"&gt;boolean&lt;/span&gt; &lt;span class="n"&gt;recuperavel&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;isRecuperavel&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="k"&gt;if&lt;/span&gt; &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;recuperavel&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

     &lt;span class="nc"&gt;Optional&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;origem&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
      &lt;span class="n"&gt;topicoOrigem&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;r&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;headers&lt;/span&gt;&lt;span class="o"&gt;())&lt;/span&gt;
      &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;or&lt;/span&gt;&lt;span class="o"&gt;(()&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="nc"&gt;Optional&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;of&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="no"&gt;NENHUM_CABECALHO&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;

     &lt;span class="c1"&gt;// ####&lt;/span&gt;
     &lt;span class="c1"&gt;// Se origem for outro tópico, segue para o primeiro retry&lt;/span&gt;
     &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;destino&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
      &lt;span class="n"&gt;origem&lt;/span&gt;
      &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;filter&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topico&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;!&lt;/span&gt;&lt;span class="n"&gt;topico&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;matches&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;retryTopicsPattern&lt;/span&gt;&lt;span class="o"&gt;))&lt;/span&gt;
      &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;map&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;t&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;retryFirstTopic&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
      &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;orElse&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;dltTopic&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

     &lt;span class="n"&gt;result&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;TopicPartition&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;destino&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;QUALQUER_PARTICAO&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;

    &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;result&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
   &lt;span class="o"&gt;}&lt;/span&gt;
  &lt;span class="o"&gt;};&lt;/span&gt;
 &lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;&lt;strong&gt;Main error handler&lt;/strong&gt;, que utiliza o &lt;strong&gt;Main resolver&lt;/strong&gt; e é responsável por definir duas configurações essenciais para tratamento dos erros:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;a href="https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.html" rel="noopener noreferrer"&gt;&lt;code&gt;DeadLetterPublishingRecoverer&lt;/code&gt;&lt;/a&gt;: inicia o fluxo DLT, que é delegado pelo SeekToCurrentErrorHandler caso as retentativas locais não resolvam o erro.&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/SeekToCurrentErrorHandler.html" rel="noopener noreferrer"&gt;&lt;code&gt;SeekToCurrentErrorHandler&lt;/code&gt;&lt;/a&gt;: manipula qualquer erro que seja lançado no método anotado com &lt;code&gt;@KafkaListener&lt;/code&gt;. Naturalmente, se você capturá-los com &lt;code&gt;catch&lt;/code&gt; e não permitir que eles &lt;em&gt;subam na pilha&lt;/em&gt;, não será possível tratá-los.&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@Bean&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;SeekToCurrentErrorHandler&lt;/span&gt; &lt;span class="nf"&gt;mainErrorHandler&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
 &lt;span class="nd"&gt;@Qualifier&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"mainResolver"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
 &lt;span class="nc"&gt;BiFunction&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;TopicPartition&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;resolver&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
 &lt;span class="nc"&gt;KafkaTemplate&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="o"&gt;?&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;template&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

 &lt;span class="c1"&gt;// ####&lt;/span&gt;
 &lt;span class="c1"&gt;// Recuperação usando dead-letter &lt;/span&gt;
 &lt;span class="nc"&gt;DeadLetterPublishingRecoverer&lt;/span&gt; &lt;span class="n"&gt;recoverer&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
  &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;DeadLetterPublishingRecoverer&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;template&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="n"&gt;resolver&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

 &lt;span class="c1"&gt;// ####&lt;/span&gt;
 &lt;span class="c1"&gt;// Tentar 3x localmente antes de iniciar o fluxo dead-letter&lt;/span&gt;
 &lt;span class="nc"&gt;SeekToCurrentErrorHandler&lt;/span&gt; &lt;span class="n"&gt;handler&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
  &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nf"&gt;SeekToCurrentErrorHandler&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;recoverer&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="no"&gt;RETENTAR_3X&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

 &lt;span class="c1"&gt;// ####&lt;/span&gt;
 &lt;span class="c1"&gt;// Lista das exceções não-recuperáveis, para evitar o retry local&lt;/span&gt;
 &lt;span class="n"&gt;excecoes&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getNaoRecuperavies&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;forEach&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt; &lt;span class="o"&gt;-&amp;gt;&lt;/span&gt;
  &lt;span class="n"&gt;handler&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;addNotRetryableException&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;e&lt;/span&gt;&lt;span class="o"&gt;));&lt;/span&gt;

 &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;handler&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;&lt;strong&gt;Main listener factory&lt;/strong&gt;, encarregado de fabricar os consumidores que processam registros do tópíco &lt;code&gt;ordem-compra&lt;/code&gt;.&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@Bean&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;KafkaListenerContainerFactory&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="nc"&gt;ConcurrentMessageListenerContainer&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;GenericRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt;
 &lt;span class="n"&gt;mainKafkaListenerContainerFactory&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
  &lt;span class="nd"&gt;@Qualifier&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="s"&gt;"mainErrorHandler"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="nc"&gt;SeekToCurrentErrorHandler&lt;/span&gt; &lt;span class="n"&gt;errorHandler&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
  &lt;span class="nc"&gt;KafkaProperties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
  &lt;span class="nc"&gt;ConsumerFactory&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;GenericRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;factory&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

  &lt;span class="nc"&gt;ConcurrentKafkaListenerContainerFactory&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;GenericRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;listener&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt;
   &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ConcurrentKafkaListenerContainerFactory&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="o"&gt;();&lt;/span&gt;

  &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setConsumerFactory&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;factory&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

  &lt;span class="c1"&gt;// ####&lt;/span&gt;
  &lt;span class="c1"&gt;// Utilizando o mainErrorHandler para tratar os erros&lt;/span&gt;
  &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setErrorHandler&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;errorHandler&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

  &lt;span class="c1"&gt;// Falhar, caso os tópicos não existam?&lt;/span&gt;
  &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getContainerProperties&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
   &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setMissingTopicsFatal&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;missingTopicsFatal&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

  &lt;span class="c1"&gt;// Commit do offset no registro, logo após processá-lo no listener&lt;/span&gt;
  &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getContainerProperties&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;setAckMode&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;AckMode&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;MANUAL&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

  &lt;span class="c1"&gt;// Commits síncronos&lt;/span&gt;
  &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getContainerProperties&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;setSyncCommits&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Boolean&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;TRUE&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

  &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
 &lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;Então, basta anotar o método como segue.&lt;/p&gt;

&lt;p&gt;Mas vale ressaltar que o offset sempre deverá ser confirmado, porque todo o fluxo que trata os erros será feito por retentativa e talvez culminando no tópico dead-letter. &lt;em&gt;Analise o seu caso-de-uso e entenda se esta abordagem também se aplicada.&lt;/em&gt;&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@KafkaListener&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
 &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"main-kafka-listener"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;topics&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"${app.kafka.consumer.topics}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;containerFactory&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"mainKafkaListenerContainerFactory"&lt;/span&gt;
&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;consume&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nd"&gt;@Payload&lt;/span&gt; &lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;GenericRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
 &lt;span class="nc"&gt;Acknowledgment&lt;/span&gt; &lt;span class="n"&gt;ack&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

 &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

  &lt;span class="c1"&gt;// #### &lt;/span&gt;
  &lt;span class="c1"&gt;// Processar&lt;/span&gt;
  &lt;span class="n"&gt;process&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

 &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;finally&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

  &lt;span class="c1"&gt;// ####&lt;/span&gt;
  &lt;span class="c1"&gt;// Sempre confirmar o offset&lt;/span&gt;
  &lt;span class="n"&gt;ack&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;acknowledge&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
 &lt;span class="o"&gt;}&lt;/span&gt;

&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;Já a configuração para o processamento das retentativas segue moldes similares, com algumas exceções:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;resolver&lt;/code&gt;: retorna qual o próximo tópico na sequência de retentativas ou se é o &lt;code&gt;reservar-estoque-dlt&lt;/code&gt;, caso já tenham passadas por todas.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;errorHandler&lt;/code&gt;: nenhuma retentiva local.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Este é o método anotado com &lt;code&gt;@KafkaListener&lt;/code&gt; que tratará os tópicos &lt;code&gt;reservar-estoque-retry&lt;/code&gt;:&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@KafkaListener&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;
 &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"retry-kafka-listener"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;topicPattern&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"${app.kafka.dlt.retry.topics.pattern}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;containerFactory&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"retryKafkaListenerContainerFactory"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
 &lt;span class="n"&gt;properties&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="s"&gt;"fetch.min.bytes=${app.kafka.dlt.retry.min.bytes}"&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
  &lt;span class="s"&gt;"fetch.max.wait.ms=${app.kafka.dlt.retry.max.wait.ms}"&lt;/span&gt;
 &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;)&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;retry&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nd"&gt;@Payload&lt;/span&gt; &lt;span class="nc"&gt;ConsumerRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;lt;&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;GenericRecord&lt;/span&gt; &lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt;
 &lt;span class="nc"&gt;Acknowledgment&lt;/span&gt; &lt;span class="n"&gt;ack&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="kd"&gt;throws&lt;/span&gt; &lt;span class="nc"&gt;Exception&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

 &lt;span class="k"&gt;try&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

  &lt;span class="c1"&gt;// ####&lt;/span&gt;
  &lt;span class="c1"&gt;// Reprocessar&lt;/span&gt;
  &lt;span class="n"&gt;process&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;record&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

 &lt;span class="o"&gt;}&lt;/span&gt; &lt;span class="k"&gt;finally&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="c1"&gt;// ####&lt;/span&gt;
  &lt;span class="c1"&gt;// Sempre confirmar o offset&lt;/span&gt;
  &lt;span class="n"&gt;ack&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;acknowledge&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;
 &lt;span class="o"&gt;}&lt;/span&gt;

&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;topicPattern&lt;/code&gt;: consumir todos os tópicos &lt;code&gt;reservar-estoque-retry&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;fetch.min.bytes&lt;/code&gt; e &lt;code&gt;fetch.max.wait.ms&lt;/code&gt;: utilizados para provocar um certo atraso no consumo dos registros, visto que sem eles o consumo seria praticamente instantâneo.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Por fim, o &lt;code&gt;application.properties&lt;/code&gt; será assim: &lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight properties"&gt;&lt;code&gt;

&lt;span class="py"&gt;app.kafka.dlt.retry.topics&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;4&lt;/span&gt;
&lt;span class="py"&gt;app.kafka.dlt.retry.topics.pattern&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;reservar-estoque-retry-[0-9]+&lt;/span&gt;
&lt;span class="py"&gt;app.kafka.dlt.retry.topic.first&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;reservar-estoque-retry-1&lt;/span&gt;
&lt;span class="py"&gt;app.kafka.dlt.topic&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;reservar-estoque-dlt&lt;/span&gt;

&lt;span class="c"&gt;# Lista de exceções recuperáveis
&lt;/span&gt;&lt;span class="err"&gt;app.kafka.dlt.excecoes.recuperaveis[0]=java.net.ConnectException&lt;/span&gt;
&lt;span class="err"&gt;app.kafka.dlt.excecoes.recuperaveis[1]=java.net.UnknownHostException&lt;/span&gt;

&lt;span class="c"&gt;# Lista de exceções não-recuperáveis
&lt;/span&gt;&lt;span class="err"&gt;app.kafka.dlt.excecoes.naoRecuperaveis[0]=org.apache.avro.AvroMissingFieldException&lt;/span&gt;
&lt;span class="err"&gt;app.kafka.dlt.excecoes.naoRecuperaveis[1]=java.lang.NullPointerException&lt;/span&gt;

&lt;span class="c"&gt;# Provocar atraso no processmento de retentativa
&lt;/span&gt;&lt;span class="py"&gt;app.kafka.dlt.retry.max.wait.ms&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;20000&lt;/span&gt;
&lt;span class="py"&gt;app.kafka.dlt.retry.min.bytes&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;52428800&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;Todo a implementação está disponível no Github, &lt;em&gt;clone it and have fun!&lt;/em&gt;&lt;/p&gt;


&lt;div class="ltag-github-readme-tag"&gt;
  &lt;div class="readme-overview"&gt;
    &lt;h2&gt;
      &lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev.to%2Fassets%2Fgithub-logo-5a155e1f9a670af7944dd5e12375bc76ed542ea80224905ecaf878b9157cdefc.svg" alt="GitHub logo"&gt;
      &lt;a href="https://github.com/fabiojose" rel="noopener noreferrer"&gt;
        fabiojose
      &lt;/a&gt; / &lt;a href="https://github.com/fabiojose/spring-kafka-dlt-ex" rel="noopener noreferrer"&gt;
        spring-kafka-dlt-ex
      &lt;/a&gt;
    &lt;/h2&gt;
    &lt;h3&gt;
      Dead-letter Topic com Spring Kafka
    &lt;/h3&gt;
  &lt;/div&gt;
  &lt;div class="ltag-github-body"&gt;
    
&lt;div id="readme" class="md"&gt;
&lt;div class="markdown-heading"&gt;
&lt;h1 class="heading-element"&gt;spring-kafka-dlt-ex&lt;/h1&gt;
&lt;/div&gt;

&lt;p&gt;Dead-letter Topico com Spring Kafka e formato de dados Avro.&lt;/p&gt;

&lt;div class="markdown-heading"&gt;
&lt;h2 class="heading-element"&gt;Requerimentos&lt;/h2&gt;
&lt;/div&gt;

&lt;ul&gt;
&lt;li&gt;JDK 11&lt;/li&gt;
&lt;li&gt;Apache Maven 3.6+&lt;/li&gt;
&lt;li&gt;Docker 19+&lt;/li&gt;
&lt;li&gt;Acesso ao repositório &lt;a href="https://repo.maven.apache.org/maven2/" rel="nofollow noopener noreferrer"&gt;https://repo.maven.apache.org/maven2/&lt;/a&gt; ou uma
alternativa com acesso às dependências presentes no &lt;code&gt;pom.xml&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;Schema Registry&lt;/li&gt;
&lt;li&gt;Kafka&lt;/li&gt;
&lt;li&gt;Testcontainers&lt;/li&gt;
&lt;li&gt;Lombok&lt;/li&gt;
&lt;li&gt;Commons Lang3&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="markdown-heading"&gt;
&lt;h2 class="heading-element"&gt;Configurações&lt;/h2&gt;
&lt;/div&gt;

&lt;p&gt;Não se preocupe, pois apesar de existirem atalhos pelas variávies
de ambiente, você pode utilizar tranquilamente aquilo que o Spring Boot
oferece. Então veja todos as propriedades no
&lt;a href="https://github.com/fabiojose/spring-kafka-dlt-ex./src/main/resources/application.properties" rel="noopener noreferrer"&gt;application.properties&lt;/a&gt;&lt;/p&gt;
&lt;p&gt;No caso do Kafka, utilizamos Spring Kafka, então você utilizar
o modo Spring para configurações.&lt;/p&gt;
&lt;div class="markdown-heading"&gt;
&lt;h3 class="heading-element"&gt;Variáveis de Ambiente&lt;/h3&gt;

&lt;/div&gt;
&lt;ul&gt;
&lt;li&gt;
&lt;p&gt;&lt;code&gt;APP_KAFKA_CONSUMER_TOPICS&lt;/code&gt;: tópicos para consumir, ou expressão.&lt;/p&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;code&gt;KAFKA_CLIENT_ID&lt;/code&gt;: nome do cliente Kafka, usado pelos brokers para logs e
métricas. Utilize um &lt;a href="https://medium.com/coding-skills/clean-code-101-meaningful-names-and-functions-bf450456d90c" rel="nofollow noopener noreferrer"&gt;nome clean&lt;/a&gt;, não genérico.&lt;/p&gt;
&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;spring.kafka.producer.client-id&lt;/code&gt;, &lt;code&gt;spring.kafka.consumer.client-id&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;code&gt;KAFKA_CONSUMER_GROUP&lt;/code&gt;: nome do grupo de consumo que esta aplicação pertence&lt;/p&gt;
&lt;ul&gt;
&lt;li&gt;&lt;code&gt;spring.kafka.consumer.group-id&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;code&gt;KAFKA_BOOTSTRAP_SERVERS&lt;/code&gt;: lista de brokers para o cluster Kafka&lt;/p&gt;
&lt;ul&gt;
&lt;li&gt;&lt;code&gt;spring.kafka.bootstrap-servers&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;p&gt;&lt;code&gt;SCHEMA_REGISTRY_URL&lt;/code&gt;: url para o registro de esquemas Avro&lt;/p&gt;
&lt;ul&gt;
&lt;li&gt;&lt;code&gt;spring.kafka.properties.schema.registry.url&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;
&lt;/li&gt;
&lt;li&gt;&lt;p&gt;…&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;
&lt;/div&gt;
  &lt;/div&gt;
  &lt;div class="gh-btn-container"&gt;&lt;a class="gh-btn" href="https://github.com/fabiojose/spring-kafka-dlt-ex" rel="noopener noreferrer"&gt;View on GitHub&lt;/a&gt;&lt;/div&gt;
&lt;/div&gt;





&lt;p&gt;Quando o registro atinge o último tópico de retentativa, que neste caso é o &lt;code&gt;reservar-estoque-4&lt;/code&gt; e não seja processado com sucesso, finalmente ele será publicado no tópico &lt;em&gt;dead-letter&lt;/em&gt;, então a equipe deverá estar preparada para o tratamento adequado.&lt;/p&gt;

&lt;h4&gt;
  
  
  Tratamento
&lt;/h4&gt;

&lt;p&gt;Bem, neste momento o registro com problemas já desembarcou no tópico &lt;code&gt;reservar-estoque-dlt&lt;/code&gt; e bem antes disso acontecer um sistema preciso de monitoramento dos tópicos deveria ter alertado sobre o uso dos tópicos para retentativas, principalmente se os registros estão atingindo o &lt;code&gt;reservar-estoque-retry-4&lt;/code&gt;.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Veja &lt;a href="https://medium.com/@alvarobacelar/monitorando-um-cluster-kafka-com-ferramentas-open-source-a4032836dc79" rel="noopener noreferrer"&gt;neste artigo&lt;/a&gt; como monitorar seu cluster Kafka.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;A maneira mais prudente de tratamento, além do monitoramento e um ótimo sistema de rastreamento distribuído, será construir um processo em que para cada registro publicado no DLT, tickets e notificações ChatOps deveram ser enviados para a equipe responsável.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Também são necessárias ferramentas adequadas para, por exemplo, editar registros e colocá-los novamente no tópico original.&lt;/p&gt;
&lt;/blockquote&gt;




&lt;p&gt;Bem, é isso! Até o próximo.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>kafkabr</category>
      <category>java</category>
      <category>spring</category>
    </item>
    <item>
      <title>Kafka: consumindo registros com Spring</title>
      <dc:creator>Fabio José</dc:creator>
      <pubDate>Tue, 28 Apr 2020 11:48:36 +0000</pubDate>
      <link>https://dev.to/kafkabr/kafka-consumindo-registros-com-spring-40h8</link>
      <guid>https://dev.to/kafkabr/kafka-consumindo-registros-com-spring-40h8</guid>
      <description>&lt;p&gt;O consumo de dados do Kafka, dependendo do caso de uso, requer alguns cuidados no commit. Fazê-lo de forma correta é importante para garantir que nada seja perdido.&lt;/p&gt;

&lt;p&gt;Com Clientes Kafka, em especial a versão para Java, é muito simples e intuitivo, mas no Spring Kafka é necessária atenção com ajustes especiais.&lt;/p&gt;




&lt;p&gt;O foco neste artigo é mostrar como consumir dados com Spring Kafka, que é uma abstração sobre os &lt;a href="https://docs.confluent.io/current/clients/java.html#java-client" rel="noopener noreferrer"&gt;Clientes Kafka&lt;/a&gt; para Java. Fazendo isso com seguindo setup:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;enable.auto.commit=false&lt;/code&gt;. Que requer commit manual&lt;/li&gt;
&lt;li&gt;commit síncrono&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;Devido a semântica de entrega &lt;em&gt;at-least-once&lt;/em&gt;, um registro poderá ser consumido uma ou &lt;code&gt;n&lt;/code&gt; vezes, e este setup busca reduzir isso.&lt;/p&gt;
&lt;/blockquote&gt;




&lt;p&gt;Um consumidor típico no Spring Kafka é escrito assim:&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="nd"&gt;@Component&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;SpringKafkaListener&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nd"&gt;@KafkaListener&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topics&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"topico"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;consume&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;valor&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
    &lt;span class="c1"&gt;// Processar valor do registro&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;E criado com as seguintes configurações, feitas no &lt;code&gt;application.properties&lt;/code&gt;:&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight properties"&gt;&lt;code&gt;

&lt;span class="py"&gt;spring.kafka.bootstrap-servers&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;configure-me_kafka-broker:9092&lt;/span&gt;
&lt;span class="py"&gt;spring.kafka.consumer.client-id&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;configure-me_client-id&lt;/span&gt;
&lt;span class="py"&gt;spring.kafka.consumer.group-id&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;configure-me_group-id&lt;/span&gt;
&lt;span class="py"&gt;spring.kafka.consumer.auto-offset-reset&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;earliest&lt;/span&gt;
&lt;span class="py"&gt;spring.kafka.consumer.key-deserializer&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;org.apache.kafka.common.serialization.StringDeserializer&lt;/span&gt;
&lt;span class="py"&gt;spring.kafka.consumer.value-deserializer&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;org.apache.kafka.common.serialization.StringDeserializer&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;Felizmente o Spring Kafka não redefine valores, portanto a configuração padrão é mantida assim como definida na &lt;a href="https://kafka.apache.org/documentation/#consumerconfigs" rel="noopener noreferrer"&gt;documentação oficial&lt;/a&gt;, porém, nela o commit é automático e assíncrono. Contudo, no Spring Kafka, todas as configurações necessárias para commit manual e síncrono não estão disponíveis através de propriedades no &lt;code&gt;application.properties&lt;/code&gt;.&lt;/p&gt;
&lt;h2&gt;
  
  
  Resolvendo
&lt;/h2&gt;

&lt;p&gt;Spring Kafka é uma abstração, logo o &lt;em&gt;poll loop&lt;/em&gt; e commit são transparentes. E como pode-se ver no exemplo, um consumidor recebe apenas o registro e por padrão não tem acesso ao &lt;em&gt;Consumer&lt;/em&gt;.&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Primeiro&lt;/strong&gt; é necessário revisar as configurações para desligar o commit automático.&lt;/p&gt;

&lt;p&gt;Nova configuração:&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight properties"&gt;&lt;code&gt;

&lt;span class="c"&gt;# Nada de novo aqui
&lt;/span&gt;&lt;span class="py"&gt;spring.kafka.bootstrap-servers&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;configure-me_kafka-broker:9092&lt;/span&gt;
&lt;span class="py"&gt;spring.kafka.consumer.client-id&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;configure-me_client-id&lt;/span&gt;
&lt;span class="py"&gt;spring.kafka.consumer.group-id&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;configure-me_group-id&lt;/span&gt;
&lt;span class="py"&gt;spring.kafka.consumer.auto-offset-reset&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;earliest&lt;/span&gt;
&lt;span class="py"&gt;spring.kafka.consumer.key-deserializer&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;org.apache.kafka.common.serialization.StringDeserializer&lt;/span&gt;
&lt;span class="py"&gt;spring.kafka.consumer.value-deserializer&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;org.apache.kafka.common.serialization.StringDeserializer&lt;/span&gt;

&lt;span class="c"&gt;# Desliga o commit automático no Cliente Kafka
&lt;/span&gt;&lt;span class="py"&gt;spring.kafka.consumer.enable-auto-commit&lt;/span&gt;&lt;span class="p"&gt;=&lt;/span&gt;&lt;span class="s"&gt;false&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;blockquote&gt;
&lt;p&gt;Spring tem sua própria notação para a maioria das configurações presentes no Kafka Consumer, que são traduzidas em tempo de execução para o nome correto.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Agora que o commit automático foi desligado, são necessários alguns ajustes programáticos feitos ao customizar as fábricas de objetos:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;ackMode para &lt;code&gt;MANUAL&lt;/code&gt;
&lt;/li&gt;
&lt;li&gt;syncCommits como &lt;code&gt;true&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.beans.factory.annotation.Autowired&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.boot.autoconfigure.kafka.KafkaProperties&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.context.annotation.Bean&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.context.annotation.Configuration&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.kafka.annotation.EnableKafka&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.kafka.config.KafkaListenerContainerFactory&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.kafka.core.ConsumerFactory&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.kafka.core.DefaultKafkaConsumerFactory&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.kafka.listener.ConcurrentMessageListenerContainer&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.kafka.listener.ContainerProperties.AckMode&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@EnableKafka&lt;/span&gt;
&lt;span class="nd"&gt;@Configuration&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;KafkaConfig&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

  &lt;span class="nd"&gt;@Autowired&lt;/span&gt;
  &lt;span class="nc"&gt;KafkaProperties&lt;/span&gt; &lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

  &lt;span class="nd"&gt;@Bean&lt;/span&gt;
  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;ConsumerFactory&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="nf"&gt;consumerFactory&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;DefaultKafkaConsumerFactory&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;(&lt;/span&gt;
              &lt;span class="n"&gt;properties&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;buildConsumerProperties&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;
  &lt;span class="o"&gt;}&lt;/span&gt;

  &lt;span class="nd"&gt;@Bean&lt;/span&gt;
  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="nc"&gt;KafkaListenerContainerFactory&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;ConcurrentMessageListenerContainer&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&amp;gt;&lt;/span&gt;
      &lt;span class="nf"&gt;kafkaListenerContainerFactory&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

      &lt;span class="nc"&gt;ConcurrentKafkaListenerContainerFactory&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&lt;/span&gt;&lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt;&lt;span class="o"&gt;&amp;gt;&lt;/span&gt; &lt;span class="n"&gt;listener&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; 
            &lt;span class="k"&gt;new&lt;/span&gt; &lt;span class="nc"&gt;ConcurrentKafkaListenerContainerFactory&lt;/span&gt;&lt;span class="o"&gt;&amp;lt;&amp;gt;();&lt;/span&gt;

      &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setConsumerFactory&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;consumerFactory&lt;/span&gt;&lt;span class="o"&gt;());&lt;/span&gt;

      &lt;span class="c1"&gt;// Não falhar, caso ainda não existam os tópicos para consumo&lt;/span&gt;
      &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getContainerProperties&lt;/span&gt;&lt;span class="o"&gt;()&lt;/span&gt;
          &lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;setMissingTopicsFatal&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="kc"&gt;false&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

      &lt;span class="c1"&gt;// ### AQUI&lt;/span&gt;
      &lt;span class="c1"&gt;// Commit manual do offset&lt;/span&gt;
      &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getContainerProperties&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;setAckMode&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;AckMode&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;MANUAL&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

      &lt;span class="c1"&gt;// ### AQUI&lt;/span&gt;
      &lt;span class="c1"&gt;// Commits síncronos&lt;/span&gt;
      &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;getContainerProperties&lt;/span&gt;&lt;span class="o"&gt;().&lt;/span&gt;&lt;span class="na"&gt;setSyncCommits&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nc"&gt;Boolean&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;TRUE&lt;/span&gt;&lt;span class="o"&gt;);&lt;/span&gt;

      &lt;span class="k"&gt;return&lt;/span&gt; &lt;span class="n"&gt;listener&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
    &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;Então o consumidor com Spring Kafka terá esta aparência:&lt;/p&gt;
&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight java"&gt;&lt;code&gt;

&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.apache.kafka.clients.consumer.ConsumerRecord&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.beans.factory.annotation.Autowired&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.kafka.annotation.KafkaListener&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.kafka.support.Acknowledgment&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.messaging.handler.annotation.Payload&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;
&lt;span class="kn"&gt;import&lt;/span&gt; &lt;span class="nn"&gt;org.springframework.stereotype.Component&lt;/span&gt;&lt;span class="o"&gt;;&lt;/span&gt;

&lt;span class="nd"&gt;@Component&lt;/span&gt;
&lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kd"&gt;class&lt;/span&gt; &lt;span class="nc"&gt;SpringKafkaListener&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

  &lt;span class="nd"&gt;@KafkaListener&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="n"&gt;topics&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s"&gt;"topico"&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;
  &lt;span class="kd"&gt;public&lt;/span&gt; &lt;span class="kt"&gt;void&lt;/span&gt; &lt;span class="nf"&gt;consume&lt;/span&gt;&lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nd"&gt;@Payload&lt;/span&gt; &lt;span class="nc"&gt;String&lt;/span&gt; &lt;span class="n"&gt;valor&lt;/span&gt;&lt;span class="o"&gt;,&lt;/span&gt; &lt;span class="nc"&gt;Acknowledgment&lt;/span&gt; &lt;span class="n"&gt;ack&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;

    &lt;span class="c1"&gt;//TODO Processar registro&lt;/span&gt;
    &lt;span class="c1"&gt;// . . . &lt;/span&gt;

    &lt;span class="c1"&gt;// Commmit manual, que também será síncrono&lt;/span&gt;
    &lt;span class="n"&gt;ack&lt;/span&gt;&lt;span class="o"&gt;.&lt;/span&gt;&lt;span class="na"&gt;acknowledge&lt;/span&gt;&lt;span class="o"&gt;();&lt;/span&gt;

  &lt;span class="o"&gt;}&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;


&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;Note que mesmo assim não existe acesso ao &lt;em&gt;Consumer&lt;/em&gt;, ao invez disso, o Spring injeta uma instância de &lt;code&gt;Acknowledgment&lt;/code&gt; que faz o commit quando tem seu método &lt;code&gt;acknowledge()&lt;/code&gt; executado.&lt;/p&gt;

&lt;p&gt;Também neste exemplo o offset é confirmado a cada registro processado. Isso é algo que degrada a taxa de transferência, mas reduz ainda mais as chances de consumos duplicados. Bem, mas cada caso é um caso 😊.&lt;/p&gt;

&lt;p&gt;O exemplo completo está disponível no Github:&lt;/p&gt;


&lt;div class="ltag-github-readme-tag"&gt;
  &lt;div class="readme-overview"&gt;
    &lt;h2&gt;
      &lt;img src="https://media.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev.to%2Fassets%2Fgithub-logo-5a155e1f9a670af7944dd5e12375bc76ed542ea80224905ecaf878b9157cdefc.svg" alt="GitHub logo"&gt;
      &lt;a href="https://github.com/fabiojose" rel="noopener noreferrer"&gt;
        fabiojose
      &lt;/a&gt; / &lt;a href="https://github.com/fabiojose/skc-ex" rel="noopener noreferrer"&gt;
        skc-ex
      &lt;/a&gt;
    &lt;/h2&gt;
    &lt;h3&gt;
      Spring Kafka Consumer
    &lt;/h3&gt;
  &lt;/div&gt;
  &lt;div class="ltag-github-body"&gt;
    
&lt;div id="readme" class="md"&gt;
&lt;div class="markdown-heading"&gt;
&lt;h1 class="heading-element"&gt;Spring Kafka Consumer Example&lt;/h1&gt;
&lt;/div&gt;

&lt;p&gt;Exemplo de consumer com Spring Kafka&lt;/p&gt;

&lt;div class="markdown-heading"&gt;
&lt;h2 class="heading-element"&gt;Requerimentos&lt;/h2&gt;
&lt;/div&gt;

&lt;ul&gt;
&lt;li&gt;JDK 1.8&lt;/li&gt;
&lt;li&gt;Acesso ao repositório &lt;a href="https://repo.maven.apache.org/maven2/" rel="nofollow noopener noreferrer"&gt;https://repo.maven.apache.org/maven2/&lt;/a&gt; ou uma
alternativa com acesso às dependências presentes no &lt;code&gt;pom.xml&lt;/code&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="markdown-heading"&gt;
&lt;h2 class="heading-element"&gt;Build &amp;amp; Run&lt;/h2&gt;
&lt;/div&gt;

&lt;div class="markdown-heading"&gt;
&lt;h3 class="heading-element"&gt;Maven&lt;/h3&gt;

&lt;/div&gt;

&lt;p&gt;Para montar o fatjar, execute o comando:&lt;/p&gt;

&lt;p&gt;&lt;strong&gt;Linux&lt;/strong&gt;&lt;/p&gt;

&lt;div class="highlight highlight-source-shell notranslate position-relative overflow-auto js-code-highlight"&gt;
&lt;pre&gt;./mvnw clean package&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;&lt;strong&gt;Windows&lt;/strong&gt;&lt;/p&gt;

&lt;div class="highlight highlight-source-powershell notranslate position-relative overflow-auto js-code-highlight"&gt;
&lt;pre&gt;.\&lt;span class="pl-c1"&gt;mvnw.cmd&lt;/span&gt; &lt;span class="pl-k"&gt;clean&lt;/span&gt; package&lt;/pre&gt;

&lt;/div&gt;

&lt;p&gt;Para executar:&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Você pode utilizar o &lt;a href="https://github.com/fabiojose/skc-ex./docker-compose.yaml" rel="noopener noreferrer"&gt;&lt;code&gt;docker-compose.yaml&lt;/code&gt;&lt;/a&gt; para
subir um Kafka em sua máquina&lt;/p&gt;
&lt;/blockquote&gt;

&lt;div class="highlight highlight-source-shell notranslate position-relative overflow-auto js-code-highlight"&gt;
&lt;pre&gt;java \
  -Dspring.kafka.bootstrap-servers=&lt;span class="pl-s"&gt;&lt;span class="pl-pds"&gt;'&lt;/span&gt;localhost:9092&lt;span class="pl-pds"&gt;'&lt;/span&gt;&lt;/span&gt; \
  -Dspring.kafka.consumer.client-id=&lt;span class="pl-s"&gt;&lt;span class="pl-pds"&gt;'&lt;/span&gt;spring-kafka-ex&lt;span class="pl-pds"&gt;'&lt;/span&gt;&lt;/span&gt; \
  -Dspring.kafka.consumer.group-id=&lt;span class="pl-s"&gt;&lt;span class="pl-pds"&gt;'&lt;/span&gt;meu-grupo&lt;span class="pl-pds"&gt;'&lt;/span&gt;&lt;/span&gt; \
  -jar target/app-spring-boot.jar&lt;/pre&gt;

&lt;/div&gt;
&lt;div class="markdown-heading"&gt;
&lt;h3 class="heading-element"&gt;Docker&lt;/h3&gt;

&lt;/div&gt;

&lt;p&gt;A definição &lt;a href="https://github.com/fabiojose/skc-ex./Dockerfile" rel="noopener noreferrer"&gt;Dockerfile&lt;/a&gt; desta aplicação emprega
&lt;a href="https://docs.docker.com/develop/develop-images/multistage-build/" rel="nofollow noopener noreferrer"&gt;multi-stage builds&lt;/a&gt;.
Isso significa que nela acontece o build da aplicação e a criação da imagem.&lt;/p&gt;
&lt;p&gt;Se for necessário somente a criar a imagem, pode-se utilizar a definição
&lt;a href="https://github.com/fabiojose/skc-ex./Dockerfile-image" rel="noopener noreferrer"&gt;Dockerfile-image&lt;/a&gt;. Mas antes é necessário montar
o fatjar através do maven.&lt;/p&gt;
&lt;p&gt;Para build do fatjar e montar a imagem, execute o comando:&lt;/p&gt;
&lt;div class="highlight highlight-source-shell notranslate position-relative overflow-auto js-code-highlight"&gt;
&lt;pre&gt;docker build &lt;span class="pl-c1"&gt;.&lt;/span&gt; -t sk-consumer-ex:1.0&lt;/pre&gt;

&lt;/div&gt;
&lt;p&gt;Para montar apenas a imagem (antes…&lt;/p&gt;
&lt;/div&gt;
  &lt;/div&gt;
  &lt;div class="gh-btn-container"&gt;&lt;a class="gh-btn" href="https://github.com/fabiojose/skc-ex" rel="noopener noreferrer"&gt;View on GitHub&lt;/a&gt;&lt;/div&gt;
&lt;/div&gt;


&lt;p&gt;Photo by Paweł Czerwiński on Unsplash&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>kafkabr</category>
      <category>spring</category>
      <category>java</category>
    </item>
    <item>
      <title>Kafka: compactação e compressão</title>
      <dc:creator>Fabio José</dc:creator>
      <pubDate>Mon, 20 Apr 2020 14:58:36 +0000</pubDate>
      <link>https://dev.to/kafkabr/kafka-compactacao-e-compressao-1o2i</link>
      <guid>https://dev.to/kafkabr/kafka-compactacao-e-compressao-1o2i</guid>
      <description>&lt;p&gt;Compactar e comprimir tem algo em comum:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;reduzir o espaço ocupado pelos dados&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Seja removendo partes desnecessárias através da compactação ou tornado-as menores com algoritmos de compressão.&lt;/p&gt;

&lt;p&gt;No Kafka podemos utilizar ambas.&lt;/p&gt;




&lt;h2&gt;
  
  
  Compactação
&lt;/h2&gt;

&lt;p&gt;A compactação no Kafka pode ser ativada através de uma configuração feita nos tópicos:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;cleanup.policy=compact&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;Por padrão ela é definida como &lt;code&gt;delete&lt;/code&gt;. Veja outros detalhes na &lt;a href="https://kafka.apache.org/documentation/#topicconfigs"&gt;documentação oficial&lt;/a&gt;.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Quando a política de limpeza for igual a &lt;code&gt;compact&lt;/code&gt;, periodicamente os tópicos são varridos em busca de registros repetidos. E para encontrá-los, o Kafka utiliza a &lt;code&gt;key&lt;/code&gt; presente em cada um deles.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Registros sem chave, são imediatamente eliminados.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Acompanhe este exemplo, onde os registros estão dispotos dos mais antigos para os mais novos;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Key  Value
c1   Graziela Maciel, MS
c3   Carlos Chagas, MG
c5   Bertha Lutz, SP
c1   Graziela Maciel, RJ
c3   Carlos Chagas, RJ
c5   Bertha Luttz, SP
c7   Suzana Herculano, RJ
c5   Bertha Lutz, RJ
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Estes são os registros mantidos após o processo de compactação:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;Key  Value
c1   Graziela Maciel, RJ
c3   Carlos Chagas, RJ
c7   Suzana Herculano, RJ
c5   Bertha Lutz, RJ
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;É assim que funciona a compactação no Kafka, somente o registro mais recente é mantido. Todos os outros mais antigos e com a mesma &lt;code&gt;key&lt;/code&gt; são eliminados.&lt;/p&gt;

&lt;p&gt;Por exemplo, existem três registros com a chave &lt;code&gt;c5&lt;/code&gt; e o mais recente é &lt;code&gt;Bertha Lutz, RJ&lt;/code&gt;, que foi mantido pela compactação. Note que haviam 8 registros, restando apenas 4.&lt;/p&gt;

&lt;h2&gt;
  
  
  Compressão
&lt;/h2&gt;

&lt;p&gt;A compressão pode ser feita através de uma configuração nos tópicos ou nos produtores.&lt;/p&gt;

&lt;p&gt;No &lt;strong&gt;tópico&lt;/strong&gt; a compressão é ativada através da configuração &lt;code&gt;compression.type&lt;/code&gt;, que por padrão está definida como &lt;code&gt;producer&lt;/code&gt;. Isso significa que a compressão empregada pelos produtores será mantida e repassada aos consumidores, mesmo que não exista compressão alguma.&lt;/p&gt;

&lt;p&gt;Os valores possíveis para &lt;code&gt;compression.type&lt;/code&gt; são:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;uncompressed&lt;/code&gt;: mesmo que os produtores enviem dados comprimidos, eles serão persistidos no seu tamanho original, ou seja, sem compressão.&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://facebook.github.io/zstd/"&gt;&lt;code&gt;zstd&lt;/code&gt;&lt;/a&gt;: algoritmo de compressão criado pelo Facebook.&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://github.com/lz4/lz4"&gt;&lt;code&gt;lz4&lt;/code&gt;&lt;/a&gt;: a compressão mais recomendada pelos especialistas.&lt;/li&gt;
&lt;li&gt;
&lt;a href="https://github.com/google/snappy"&gt;&lt;code&gt;snappy&lt;/code&gt;&lt;/a&gt;: algoritmo criado pelo Google.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;gzip&lt;/code&gt;: ótimos níveis de compressão, usando com mais intensidade a cpu.&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;producer&lt;/code&gt;: mantém a compressão do produtor, se existir.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;E no &lt;strong&gt;produtor&lt;/strong&gt; também existe a configuração &lt;code&gt;compression.type&lt;/code&gt;, que possuí os mesmos valores, com exceção de &lt;code&gt;uncompressed&lt;/code&gt; e &lt;code&gt;producer&lt;/code&gt;. Adicionalmente, existe a opção &lt;code&gt;none&lt;/code&gt; para não aplicar compressão, que também é o valor padrão.&lt;/p&gt;




&lt;p&gt;Comprimir dados no produtor é interessante porque reduz o uso da banda de rede e de espaço em disco e cpu nos brokers.&lt;/p&gt;

&lt;p&gt;E a compressão traz bons resultados quando são produzidos lotes de dados, pois ela não é aplicada em nível de registro. Ou seja, tanto no produtor, quando no Kafka, a compressão não é em cima de cada mensagem ou evento.&lt;/p&gt;




&lt;p&gt;Já nos consumidores, nenhuma configuração especial é necessária, porque através do protocolo Kafka ele verifica se há compressão e emprega o mesmo algoritmo.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>kafkabr</category>
      <category>confluent</category>
    </item>
    <item>
      <title>Kafka: Garantia de Ordem</title>
      <dc:creator>Fabio José</dc:creator>
      <pubDate>Sat, 11 Apr 2020 11:09:24 +0000</pubDate>
      <link>https://dev.to/kafkabr/garantia-de-ordem-6b3</link>
      <guid>https://dev.to/kafkabr/garantia-de-ordem-6b3</guid>
      <description>&lt;p&gt;Existem casos de uso onde a manutenção da ordem em que os dados foram produzidos é um requisito. Há outros em que isso não importa.&lt;/p&gt;




&lt;p&gt;&lt;a href="https://martinfowler.com/eaaDev/EventSourcing.html"&gt;Event Sourcing&lt;/a&gt; é um exemplo onde a manutenção da ordem importa, caso contrário o estado final seria inconsistente.&lt;/p&gt;




&lt;p&gt;Kafka, através das configurações corretas, pode nos ajudar em qualquer um deles.&lt;/p&gt;

&lt;h2&gt;
  
  
  Como é a ordenação no Kafka
&lt;/h2&gt;

&lt;p&gt;A ordem é garantida em nível de partição, ou seja, tópicos com várias partições não possuem ordenação global dos dados.&lt;/p&gt;

&lt;p&gt;Bem, dado um lote de registros produzidos pelo produtor &lt;code&gt;p1&lt;/code&gt; que tem como destino a partição &lt;code&gt;4&lt;/code&gt; no tópico &lt;code&gt;t1&lt;/code&gt;, haverá garantia de que a ordem produzida será mantida pelo Kafka e repassada aos consumidores.&lt;/p&gt;

&lt;p&gt;Desse modo se houver um produtor &lt;code&gt;p2&lt;/code&gt; também produzindo lotes de registros no mesmo destino de &lt;code&gt;p1&lt;/code&gt;, nenhuma verificação de ordem será realizada entre os lotes destes produtores, prevalecendo àquele que primeiro chegar.&lt;/p&gt;

&lt;p&gt;Mas existem situações e configurações que podem interferir e mudar a ordenação dos registros criados por um determinado produtor &lt;code&gt;p&lt;/code&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  O que pode mudar a ordenação
&lt;/h2&gt;

&lt;p&gt;Há uma dezena de configurações que podem ser ajustadas para determinar como será o comportamento do nosso produtor. E muitas já possuem valores padrão que atendem alguns casos de uso, como:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;retries: 2&lt;sup&gt;16&lt;/sup&gt;

&lt;ul&gt;
&lt;li&gt;número de retentativas&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;max.in.flight.requests.per.connection: 5 

&lt;ul&gt;
&lt;li&gt;requisições que ainda não receberam ack, ou seja, estão em voo.&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;li&gt;acks: 1

&lt;ul&gt;
&lt;li&gt;reconhecimento de recebimento e não&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;O valor padrão de retentativas, definido por &lt;code&gt;retries&lt;/code&gt;, é bem grande e elas são realizadas para todos os problemas relativamente recuperáveis, como uma falha na rede.&lt;/p&gt;

&lt;p&gt;E o número de requisições em voo é igual a &lt;code&gt;5&lt;/code&gt;. E isso poderá causar a troca de ordem.&lt;/p&gt;

&lt;p&gt;Digamos que se produz o primeiro lote de registros, &lt;code&gt;l1&lt;/code&gt;, que fica em voo aguardando reconhecimento. Em seguida o mesmo produtor envia o segundo lote, &lt;code&gt;l2&lt;/code&gt;. Pela ordenação, &lt;code&gt;l1&lt;/code&gt; contém registros que estão antes daqueles em &lt;code&gt;l2&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;Essa é a ordem criada pelo produtor:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;l1-&amp;gt;1,2,3,4
l2-&amp;gt;5,6,7
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Espera-se que seja mantida:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;1,2,3,4,5,6,7
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Porém, se &lt;code&gt;l1&lt;/code&gt; for para retentativa,  &lt;code&gt;l2&lt;/code&gt; for efetivado e &lt;code&gt;l1&lt;/code&gt; efetivado em seguida, a ordenação muda, não prevalecendo àquela criada no produtor.&lt;/p&gt;

&lt;p&gt;Realidade como descrito acima:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;5,6,7,1,2,3,4
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;h2&gt;
  
  
  Configuração correta
&lt;/h2&gt;

&lt;p&gt;Felizmente é possível adequar as configurações do produtor e eliminar as chances da perda de ordem:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;max.in.flight.requests.per.connection&lt;/code&gt; deverá ser configurado para que apenas uma requisição fique em voo, ou seja, &lt;code&gt;1&lt;/code&gt;.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Isso certamente irá diminuir a taxa de transferência, não de maneira drástica. Mas agora a ordenação está garantida, mesmo em situações de retentativas.&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>kafkabr</category>
      <category>confluent</category>
    </item>
    <item>
      <title>Kafka: quantas partições por tópico?</title>
      <dc:creator>Fabio José</dc:creator>
      <pubDate>Sun, 01 Mar 2020 19:19:22 +0000</pubDate>
      <link>https://dev.to/kafkabr/kafka-quantas-particoes-por-topico-537c</link>
      <guid>https://dev.to/kafkabr/kafka-quantas-particoes-por-topico-537c</guid>
      <description>&lt;p&gt;Umas das grandes dúvidas ao utilizar o Kafka é saber quantas partições são necessárias ao criar novos tópicos. Bem, não existe uma fórmula geral, o que temos é uma aproximação detalhada neste excelente artigo &lt;a href="https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster/"&gt;How to choose the number of topics/partitions in a Kafka cluster?&lt;/a&gt;, escrito por Jun Rao. Outro artigo muito relevante é o &lt;a href="https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines"&gt;Benchmarking Apache Kafka: 2 Million Writes Per Second&lt;/a&gt;, escrito por Jay Kreps. Nele são revelados resultados importantes sobre a taxa de transferência para &lt;em&gt;producers&lt;/em&gt; e &lt;em&gt;consumers&lt;/em&gt;.&lt;/p&gt;

&lt;p&gt;Então, com base no artigo de Jun Rao, temos a fórmula aproximada para determinar o número de partições:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;  MAX(t/p, t/c)
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Onde:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;t&lt;/code&gt;: taxa de transferência desejada&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;p&lt;/code&gt;: taxa de transferência do &lt;em&gt;producer&lt;/em&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;c&lt;/code&gt;: taxa de transferência do &lt;em&gt;consumer&lt;/em&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Como o artigo sugere, o valor para a taxa de transferência do &lt;em&gt;consumer&lt;/em&gt; depende de como ele processa os registros e por esse motivo devemos realizar nossas próprias medições ao invés de utilizar o valor-base descrito no artigo de Jay Kreps. Já para o valor referente ao &lt;em&gt;producer&lt;/em&gt;, podemos tomar como base àquele revelado pelo artigo.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Uma dica é você realizar todas as medições, assim você também entenderá como é a sua infra kafka.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Aplicando a fórmula
&lt;/h2&gt;

&lt;p&gt;Primeiro temos de definir qual é a unidade da nossa taxa de transferência, que poderia ser MB/s ou Mensagens/s. Mas as mensagens tem tamanhos variados e utilizá-las nas medições talvez não conduza a resultados realistas, então, &lt;code&gt;MB/s&lt;/code&gt; é uma boa unidade de medida para aplicação da fórmula.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Unidade: &lt;code&gt;MB/s&lt;/code&gt; (megabytes por segundo)&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;A medição é realizada empregando um &lt;em&gt;producer&lt;/em&gt; e um tópico com apenas uma partição. Então, digamos que o resultado para nosso &lt;code&gt;p&lt;/code&gt;, foi:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;89 MB/s
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;A medição para taxa de transferência do &lt;em&gt;consumer&lt;/em&gt; é similar, ou seja, apenas um tópico com uma única partição. Então, digamos que o valor de &lt;code&gt;c&lt;/code&gt; é:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;75 MB/s
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Imagine que nosso alvo com relação a taxa de transferência seja &lt;code&gt;450 MB/s&lt;/code&gt;. Aplicando a fórmula de aproximação temos:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight"&gt;&lt;pre class="highlight plaintext"&gt;&lt;code&gt;p = 89
c = 75
t = 450

  MAX(450/89, 450/75)

  MAX(5.1, 6) = 6
&lt;/code&gt;&lt;/pre&gt;&lt;/div&gt;



&lt;p&gt;Portanto, o número de partições é &lt;code&gt;6&lt;/code&gt;.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;Notadamente, não trata-se de uma fórmula para qualquer caso de uso, porém, agora temos um ponto de partida e não somente números mágicos.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Realize testes e coloque nos comentários quais foram os resultados, suas observações são valiósas.&lt;/p&gt;

&lt;p&gt;Até o próximo artigo!&lt;/p&gt;

</description>
      <category>kafka</category>
      <category>confluent</category>
      <category>brasil</category>
      <category>kafkabr</category>
    </item>
  </channel>
</rss>
