DEV Community

Cover image for Alguns conceitos de Kafka e como criar producer e consumer Kafka usando Docker
Ratkovski
Ratkovski

Posted on • Edited on

Alguns conceitos de Kafka e como criar producer e consumer Kafka usando Docker

Meu Primeiro Post...

O que é o Apache Kafka?

O Apache Kafka é uma plataforma distribuída de transmissão de dados que é capaz de publicar, subscrever, armazenar e processar fluxos de registro em tempo real.

https://kafka.apache.org/

Para que serve?

É uma plataforma desenvolvida para processar fluxos de dados vindos de diferentes fontes e entregar para os mais diversos clientes.

Geralmente mais usado quando este fluxo de dados é muito alto e precisa ser processado em tempo real e os dados precisam ser distribuídos para diferentes aplicações.

Quem usa?
EMPRESAS QUE USAM KAFKA
Netflix
Spotify
Uber
LinkedIn
Twitter
PayPal
AirBnB
Cisco
Goldman Sachs
SalesForce

Conhecendo alguns termos do Apache Kafka:

Messages:
Mensagem é o principal recurso do Kafka. Mensagem é toda informação que trafega sob o Apache Kafka seja uma palavra, número, frase, array. O Kafka permite definir Schemas para mensagens, como por exemplo utilizando o Avro. Este recurso pode auxiliar impedindo que mensagens contendo conteúdos inválidos sejam trafegadas no tópico.
Uma mensagem pode também ser composta por uma chave (key/value), que é utilizada para sharding e compactação dentro do Kafka, assim em um ambiente distribuído, é garantido a ordem das mensagens uma vez que mensagens com a mesma chaves são direcionadas para uma única partição do Kafka e elas são adicionadas sequencialmente ao final de um arquivo de log de uma partição e numeradas por offsets exclusivos. São também persistidas em disco e replicadas no cluster para evitar a perda de dados. As mensagens também possuem um cache de páginas em memória para melhorar a performance de leituras dos dados e ficam disponíveis nas partições até serem excluídas.

Topic:

  • Tópicos é como se fosse um canal por onde as mensagens percorrem
  • Um tópico se assemelha a uma tabela no banco de dados, porém sem restrições.
  • Por padrão o tempo de retenção de mensagem no tópico é de 7 dias. retention.ms
  • Você pode ter quantos tópicos quiser (até o momento que estudei sobre este assunto não possuía um limite específico porém a algumas Specifications que o site abaixo mostra melhor)
  • Is There a Limit on the Number of Topics in a Kafka Instance?
  • Um tópico pode ser definido por seu nome, o nome não tem restrições porém geralmente é nomeado conforme o fluxo dos dados.
  • Tópicos são divididos em partições
  • Quando se cria um tópico precisa especificar a quantidade de partições que ele terá. (essa quantidade pode ser alterada posteriormente)

Como escolher a quantidade de partições por tópico??? O artigo no link abaixo pode te ajudar.
dev.to/kafkabr/kafka-quantas-particoes-por-topico-537c

Tópicos internos e tópicos de usuário

  • Tópicos internos são criados automaticamente pelo kafka:
    • __consumer_offsets
    • __transaction_state
  • Tópicos de usuários, aqueles criados pelo usuário via comando ou definido na aplicação
    • 'topic_test'

Partitions:

  • Cada partição é ordenada, a ordem dos dados é garantida apenas dentro da partição.
  • Partições iniciam do 0 (zero) e vão até um número qualquer.
  • Cada mensagem dentro da partição pega um id incremental, chamado de offset.
  • Uma vez que os dados foram gravados na partição eles não podem ser alterados o que garante o pilar da imutabilidade.

*Offsets: *

  • São as posições das mensagens em uma partição de um tópico no Apache Kafka.
  • São números incrementais adicionados em cada mensagem da partição, são infinitos e começam do zero.
  • Offsets só fazem sentido para a sua referida partição.

Brokers:

  • Um cluster Kafka é composto por múltiplos brokers(servidores).
  • O broker é identificado por um ID.
  • Ele poderá conter alguns tópicos e partições.
  • Um bom número para começar com os brokers é 3 , na área comercial certamente que esse número será bem maior.

Fator de replicação de tópico :

  • Os tópicos deveriam ter um fator de replicação maior que 1 usualmente entre 2 e 3.
  • Sendo o 3 o padrão ouro e o 2 um pouco arriscado.
  • Para que serve este fator de replicação, pense da seguinte forma a partir do momento que possui o fator de replicação você possui uma cópia de todos os dados, caso um broker venha a sair fora do ar o segundo broker que tem a copia assume e o processo continua sem precisar ser interrompido .

Conceito de líder por partição:

  • Apenas um broker pode ser um líder para determinada partição.
  • Apenas esse líder pode receber e servir dados para esta determinada partição.
  • Os outros brokers serão apenas réplicas passivas que sincronizam os dados.
  • Cada partição terá um líder e múltiplos ISR(in-sync replica, ou seja réplica em sincronia).

Producers:

  • Produtores escrevem dados para os tópicos
  • Produtores sabem automaticamente para qual broker e partição devem escrever.
  • Quando os brokers falham os Producers se recuperam automaticamente.
  • Produtores podem escolher se querem receber o reconhecimento (acknowledgment) se os dados foram gravados.
  • Produtores podem escolher enviar uma chave (key) junto com a mensagem (a chave poder ser uma string, numero, etc...) caso a chave não for enviada por padrão dela é null e os dados são enviados em round robin(ou seja os dados são enviados em forma de rodizio ex: mgs1 para broker1, msg2 para o broker2, mgs3 para broker1, msg4 para o broker2 .....)
  • Quando os produtores definem uma chave (key) então todos os dados com aquela chave vão para uma mesma partição
  • Utiliza-se uma chave quando precisamos enviar dados de forma ordenada para um determinado campo

A três possibilidades de receber o acknowledgment:

  • acks = 0 Produtor não espera uma resposta de reconhecimento (Possibilidade de perda de dados)
  • acks = 1 Produtor vai espera uma resposta de reconhecimento do líder (Possibilidade de perda de dados é limitada)
  • acks = all O líder e as réplicas precisam responder que sim que receberam os dados (Não a perda de dados pois caso um broker tenha problemas as réplicas também receberam os dados e o fluxo pode seguir normalmente)

Consumers:

  • Leem os dados de um tópico(o tópico a ser consumido é identificado pelo nome).
  • Consumers sabem qual broker ler automaticamente .
  • Caso o broker falhe os consumers sabem como se recuperar Os dados são lidos em ordem dentro de cada partição.

Consumer Groups :

  • Os Consumers leem os dados em grupos de consumo.
  • Cada consumer dentro de um consumer group lê partições exclusivas.
  • Caso tenha mais consumers que partições, alguns consumers ficaram inativos .
  • Os consumers sabem se ordenar e para qual partição irá automaticamente devido ao coordenador de grupo e coordenador de consumidor, esta parte não é algo que se precise programar é um mecanismo já implementado no Kafka

Consumer offsets:

  • Kafka armazena os offsets(deslocamentos ou compensações) após serem lidos pelo consumer group
  • Os offsets são comprometidos ao vivo assim que são lidos pelos consumer groups e são armazenados em um tópico do Kafka chamado __consumer_offsets esse tópico é um tópico interno do Kafka
  • Quando um grupo de consumidores processar os dados recebidos do kafka estes deveriam ser commitados nos offsets, este procedimento acontece automaticamente , mas você pode programá-lo também.
  • Quando um consumidor falha ou morre a partir do momento que ele se recupera ele pode continuar de onde ele parou graças ao offset.

Delivery Semantics Consumer: (semânticas de entrega)

"O Apache Kafka suporta 3 semânticas de entrega de mensagens: no máximo uma vez, no mínimo uma vez e exatamente uma vez. Então, como você escolhe qual configuração é certa para você? A decisão geralmente se resume ao nível de integridade de dados exigido pelo seu caso de uso específico, mas outros fatores, como custo, sobrecarga de implementação e desempenho, também podem afetar qual semântica de entrega é sua melhor opção." Jacob Vasque

O Consumer escolhe quando será feita as compensações porém você tem a possibilidade de escolher, para isso tem 3 semantics delivery.

Tabela do Excel primeira linha de titulo diz semântica de entrega na segunda linha é dividida em 4 coluna a primeira linha da coluna esta vazia a segunda coluna diz at most once a terceira coluna diz at least once a quarta coluna diz exactly once<br>
na terceira linha na primeira coluna diz duplicada na segunda coluna diz não na terceira coluna diz sim na quarta coluna diz não<br>
quarta linha na primeira coluna diz perda de dados na segunda coluna diz sim na terceira coluna diz não na quarta coluna diz não <br>
quinta linha primeira coluna diz processamento segunda coluna diz  zero ou uma vez terceira coluna diz uma ou mais vezes quarta coluna exatamente uma vez

At most once (no máximo uma vez)

  • A mensagem é entregue apenas uma vez ou não é entregue.
  • Offset são confirmados assim que a mensagem é recebida .
  • Se o processamento falhar a mensagem será perdida e não será lida novamente.
  • Não é uma boa opção para dados sensíveis como aplicações da área financeira devido que podem haver perdas.

Imagem representando a semântica de entrega at most once do lado esquerdo tem um retângulo arredondado na cor roxa representando o producer e no lado direito tem um objeto cilíndrico cinza com um quadrado verde com escrita em preto dentro topicox abaixo de topicox diz Partição0 e abaixo de Partição0 diz líder tem uma seta simbolizando a mensagem saindo do retângulo roxo e apontando para o objeto cilíndrico abaixo desta seta tem duas frases a primeira é Não faz retentativa e a segunda é Envia e esquece

At least once (pelo menos uma vez) geralmente o mais usado

  • Offset são confirmados assim que a mensagem é processada .
  • Se o processamento falhar, a mensagem será lida novamente.
  • Este pode resultar em processamento duplicado de mensagens, para que isso não venha a ser um problema para nosso sistema precisamos ter certeza que nosso processamento é Idempotent.

Imagem representando a semântica de entrega at least once do lado esquerdo tem um retângulo arredondado na cor roxa representando o producer e no lado direito tem um objeto cilíndrico cinza com um quadrado verde com escrita em preto dentro topicox abaixo de topicox diz Partição0 e abaixo de Partição0 diz líder este tem 4 seta simbolizando a mensagem  a primeira saindo do retângulo roxo e apontando para o objeto cilíndrico abaixo desta seta tem a palavra envio a segunda seta sai do objeto cilíndrico em sentido ao retângulo roxo porém não chega até ele e possui um x vermelho na ponta da seta simbolizando falha a terceira seta sai do retângulo roxo em sentido ao objeto cilíndrico e abaixo da seta esta escrito retenta e a quarta seta sai do objeto cilindrico em sentido ao retangulo roxo com e chega até ele e abaixo da seta esta escrito acknowledgement

Exactly once (exatamente uma vez)

  • A mensagem será entregue somente uma vez .
  • A mensagem não pode ser descartada e nem duplicada.
  • É a única configuração que pode garantir que nenhum evento seja perdido ou duplicado.
  • Tem maior custo de implantação e menor desempenho, porém é necessária em cenários críticos de negócio.

Imagem representando a semântica de entrega exactly once do lado esquerdo tem um retângulo arredondado na cor roxa representando o producer e no lado direito tem um objeto cilíndrico cinza com um quadrado verde com escrita em preto dentro topicox abaixo de topicox diz Partição0 e abaixo de Partição0 diz líder e tem duas setas seta simbolizando a mensagem a primeira seta saindo do retângulo roxo e apontando para o objeto cilíndrico acima desta seta tem tem escrito envio a segunda seta saindo do objeto cilíndrico e apontando para o retângulo roxo e abaixo da seta esta ecrito acknowledgement

Kafka Broker Discovery
Todo Kafka broker é chamado de "bootstrap server"
Isso significa que você precisa se conectar a um único broker e estará conectado ao cluster inteiro.
Cada broker sabe de todos os outros brokers, tópicos e partições(metadata)

Zookeeper
Zookeeper mantém todos os brokers juntos e os gerencia, por esse motivo mantém uma lista deles.
Ajuda na eleição para líder das partições(ex: quando um broker falha, uma nova partição se torna o líder então ele ajuda com isso).
Zokeeper envia notificações para o kafka quando há alguma mudança.
Kafka não funciona sem o Zookeeper.
Zookeeper opera com número ímpar de servidores (3,5,7).
Zookeeper funciona no conceito líder e seguidores o líder trabalha com a escrita e os seguidores com a leitura.
Produtores e Consumidores escrevem para o kafka e o kafka gerencia os metadados no zookeeper.

Kafka Garantias

  • Mensagens são anexadas nas partições na ordem em que são enviadas .
  • Consumers lerão estas mensagens na ordem em que foram gravadas nas partições .

Kafka na prática exemplo simples de Producer e Consumer:
[Producer]-> github.com/Ratkovski/Poc-Java-Kafka-Producer
[Consumer]-> github.com/Ratkovski/Poc-Java-Kafka-Consumer

Ferramentas Utilizadas:

  • Docker
  • Intellij IDEA
  • Postman

Vamos iniciar nosso projeto acessando o Spring Initializr

Dashboard do SpringBoot

Após gerar o código base podemos adicionar um arquivo docker-compose.yml que nos fornecerá um ambiente para trabalharmos com o Kafka de forma mais amigável, para usar desta forma você precisa ter o Docker instalado.

version: '3'

services:
  kafka-cluster:
    image: lensesio/fast-data-dev:2.2
    environment:
      ADV_HOST: 127.0.0.1         # Change to 192.168.99.100 if using Docker Toolbox
      RUNTESTS: 0                 # Disable Running tests so the cluster starts faster
      FORWARDLOGS: 0              # Disable running 5 file source connectors that bring application logs into Kafka topics
      SAMPLEDATA: 0               # Do not create sea_vessel_position_reports, nyc_yellow_taxi_trip_data, reddit_posts topics with sample Avro records.
    ports:
      - 2181:2181                 # Zookeeper
      - 3030:3030                 # Lenses UI
      - 8081-8083:8081-8083       # REST Proxy, Schema Registry, Kafka Connect ports
      - 9581-9585:9581-9585       # JMX Ports
      - 9092:9092                 # Kafka Broker
Enter fullscreen mode Exit fullscreen mode

Agora vamos criar as configurações do kafka.
Para este exemplo vamos criar um exemplo bem simples contendo um domínio, controller e a configuração do kafka

@Getter
@Setter
@ToString
public class Post {

 private String userId;
 private String id;
 private String title;
 private String body;

}
Enter fullscreen mode Exit fullscreen mode

[Configuração do Kafka Producer]

public class KafkaConfiguration {


 @Value("${kafka.bootstrap.server}")
 private String bootStrapServer;
 @Value("${spring.kafka.producer.enable-idempotence}")
 private String idempotence;
 @Value("${spring.kafka.producer.acks-config}")
 private String acks;
 @Value("${spring.kafka.producer.retries-config}")
 private String retries;
 @Value("${spring.kafka.producer.schema-registry-url}")
 private String schemaRegistryUrl;
 @Value("${spring.kafka.producer.topic-name}")
 private String topicName;


 @Bean
 public ProducerFactory<String, Post> producerFactory() {
   Map<String, Object> config = new HashMap<>();

   //create Producer Properties
   config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
   config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
   config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);


   //create save Producer
   config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, idempotence);
   config.put(ProducerConfig.ACKS_CONFIG, acks);
   config.put(ProducerConfig.RETRIES_CONFIG, retries);
   config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,schemaRegistryUrl);

   return new DefaultKafkaProducerFactory(config);
 }

 @Bean
 public KafkaTemplate<String, kafka.avro.Post> kafkaTemplate() {
   return new KafkaTemplate(producerFactory());
 }

 @Bean
 public NewTopic topic() {
   return new NewTopic(topicName, 1, (short) 1);
 }
}
Enter fullscreen mode Exit fullscreen mode

[Controller]

@RestController
@RequestMapping("/posts/")
@RequiredArgsConstructor
public class PostController {

 @Value("${spring.kafka.producer.topic-name}")
 private String topicName;

 private final PostService postService;

 private final KafkaTemplate<String, Post> kafkaTemplate;

 @GetMapping("{userId}")
 private ResponseEntity<Post> post(@PathVariable String userId) {
   try {
     Post post = postService.consulta(userId);
     kafkaTemplate.send(topicName, userId, post);
     return ResponseEntity.ok().body(post);
   } catch (RuntimeException e) {
     e.printStackTrace();
     return ResponseEntity.badRequest().build();
   }
 }
}

Enter fullscreen mode Exit fullscreen mode

Agora algumas configurações do Consumer
Para base pode usar as mesmas dependências do producer

[Domínio]

@Getter
@Setter
@ToString
public class Post {

   private String userId;
   private String id;
   private String title;
   private String body;

   public Post() {
   }

   public Post(String userId, String id, String title, String body) {
       this.userId = userId;
       this.id = id;
       this.title = title;
       this.body = body;
   }
}
Enter fullscreen mode Exit fullscreen mode

[Configuração do Kafka Consumer]


@Configuration
@EnableKafka
@RequiredArgsConstructor
public class KafkaConfiguration {


   //Consumer properties
   public ConsumerFactory<String, kafka.avro.Post> consumerFactory() {
       Map<String, Object> config = new HashMap<>();
       config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
       config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://127.0.0.1:8081");
       //config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"");//pode possuir 3 valores *earliest/latest/none
       return new DefaultKafkaConsumerFactory<>(config);
   }


   @Bean
   public ConcurrentKafkaListenerContainerFactory<String, kafka.avro.Post> kafkaListenerContainerFactory() {
       ConcurrentKafkaListenerContainerFactory<String, kafka.avro.Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
       factory.setConsumerFactory(consumerFactory());
       return factory;
   }
}
Enter fullscreen mode Exit fullscreen mode

[Consumer Listener ]

@Component
@RequiredArgsConstructor
public class PostListener {
   @KafkaListener(topics = "test_topic", groupId = "groupTest",
           properties = {"spring.json.value.default.type=br.com.academy.ratkovski.Poc.Java.Kafka.Consumer.domain.User"})
   public void listener(ConsumerRecord<String, Object> consumerRecord) {
       System.out.println("Message received in group" + consumerRecord.value());

   }
}


Enter fullscreen mode Exit fullscreen mode

Agora já podemos testar

1-Deixar as duas peças em runnig
2-No local que tiver o arquivo docker-compose.yml rodar o comando docker-compose up
3-No postman enviar um GET :http://localhost:9000/posts/3

Se tudo estiver correto na peça de consumer terá o log:

Log da aplicação indicando que o consumo da mensagem teve sucesso

Com esta imagem do docker é possível acessar o dashboard do Lenses e ver algumas informações do kafka http://localhost:3030/

Dashboard do lenses com gráficos e algumas informações dos serviços que estão em uso como schemas, conectors e brokers topicos

Espero que este post possa ajudar de alguma forma a todos que estão aprendendo.

Referências:
https://vepo.github.io/posts/o-que-e-o-apache-kafka
https://medium.com/@sdjemails/kafka-producer-delivery-semantics-be863c727d3f
https://kafka.apache.org/documentation/#semantics
https://keen.io/blog/demystifying-apache-kafka-message-delivery-semantics-at-most-once-at-least-once-exactly-once/
https://blog.kafkabr.com/posts/user-topics-internal-topics/
https://www.redhat.com/pt-br/topics/integration/what-is-apache-kafka
https://medium.com/@andy.bryant/processing-guarantees-in-kafka-12dd2e30be0e
https://medium.com/@gabrielqueiroz/o-que-%C3%A9-esse-tal-de-apache-kafka-a8f447cac028#:~:text=Uma%20mensagem%20pode%20tamb%C3%A9m%20ser,uma%20%C3%BAnica%20parti%C3%A7%C3%A3o%20do%20Kafka.
https://medium.com/@andy.bryant/processing-guarantees-in-kafka-12dd2e30be0e
https://dzone.com/articles/kafka-consumer-delivery-semantics#:~:text=In%20at%20least%20once%20delivery,preferred%20semantics%20out%20of%20all.
https://github.com/lensesio/fast-data-dev/tree/fdd/main
https://developer.confluent.io/quickstart/kafka-docker/
https://www.infoq.com/br/articles/apache-kafka-licoes/#:~:text=Messages%20(Mensagens),e%20numeradas%20por%20offsets%20exclusivos.

Top comments (1)

Collapse
 
ggmoura profile image
Gleidson Guimarães Moura

Parabéns, principalmente por ser o primeiro post!

Ficou excelente! Abrangeu o tema de maneira bem objetiva, irá ajudar tanto pessoas que não conhecem a tecnologia, quanto quem quer obter mais conhecimento!