Meu Primeiro Post...
O que é o Apache Kafka?
O Apache Kafka é uma plataforma distribuída de transmissão de dados que é capaz de publicar, subscrever, armazenar e processar fluxos de registro em tempo real.
Para que serve?
É uma plataforma desenvolvida para processar fluxos de dados vindos de diferentes fontes e entregar para os mais diversos clientes.
Geralmente mais usado quando este fluxo de dados é muito alto e precisa ser processado em tempo real e os dados precisam ser distribuídos para diferentes aplicações.
Quem usa?
EMPRESAS QUE USAM KAFKA
Netflix
Spotify
Uber
LinkedIn
Twitter
PayPal
AirBnB
Cisco
Goldman Sachs
SalesForce
Conhecendo alguns termos do Apache Kafka:
Messages:
Mensagem é o principal recurso do Kafka. Mensagem é toda informação que trafega sob o Apache Kafka seja uma palavra, número, frase, array. O Kafka permite definir Schemas para mensagens, como por exemplo utilizando o Avro. Este recurso pode auxiliar impedindo que mensagens contendo conteúdos inválidos sejam trafegadas no tópico.
Uma mensagem pode também ser composta por uma chave (key/value), que é utilizada para sharding e compactação dentro do Kafka, assim em um ambiente distribuído, é garantido a ordem das mensagens uma vez que mensagens com a mesma chaves são direcionadas para uma única partição do Kafka e elas são adicionadas sequencialmente ao final de um arquivo de log de uma partição e numeradas por offsets exclusivos. São também persistidas em disco e replicadas no cluster para evitar a perda de dados. As mensagens também possuem um cache de páginas em memória para melhorar a performance de leituras dos dados e ficam disponíveis nas partições até serem excluídas.
Topic:
- Tópicos é como se fosse um canal por onde as mensagens percorrem
- Um tópico se assemelha a uma tabela no banco de dados, porém sem restrições.
- Por padrão o tempo de retenção de mensagem no tópico é de 7 dias. retention.ms
- Você pode ter quantos tópicos quiser (até o momento que estudei sobre este assunto não possuía um limite específico porém a algumas Specifications que o site abaixo mostra melhor)
- Is There a Limit on the Number of Topics in a Kafka Instance?
- Um tópico pode ser definido por seu nome, o nome não tem restrições porém geralmente é nomeado conforme o fluxo dos dados.
- Tópicos são divididos em partições
- Quando se cria um tópico precisa especificar a quantidade de partições que ele terá. (essa quantidade pode ser alterada posteriormente)
Como escolher a quantidade de partições por tópico??? O artigo no link abaixo pode te ajudar.
dev.to/kafkabr/kafka-quantas-particoes-por-topico-537c
Tópicos internos e tópicos de usuário
- Tópicos internos são criados automaticamente pelo kafka:
- __consumer_offsets
- __transaction_state
- Tópicos de usuários, aqueles criados pelo usuário via comando ou
definido na aplicação
- 'topic_test'
Partitions:
- Cada partição é ordenada, a ordem dos dados é garantida apenas dentro da partição.
- Partições iniciam do 0 (zero) e vão até um número qualquer.
- Cada mensagem dentro da partição pega um id incremental, chamado de offset.
- Uma vez que os dados foram gravados na partição eles não podem ser alterados o que garante o pilar da imutabilidade.
*Offsets: *
- São as posições das mensagens em uma partição de um tópico no Apache Kafka.
- São números incrementais adicionados em cada mensagem da partição, são infinitos e começam do zero.
- Offsets só fazem sentido para a sua referida partição.
Brokers:
- Um cluster Kafka é composto por múltiplos brokers(servidores).
- O broker é identificado por um ID.
- Ele poderá conter alguns tópicos e partições.
- Um bom número para começar com os brokers é 3 , na área comercial certamente que esse número será bem maior.
Fator de replicação de tópico :
- Os tópicos deveriam ter um fator de replicação maior que 1 usualmente entre 2 e 3.
- Sendo o 3 o padrão ouro e o 2 um pouco arriscado.
- Para que serve este fator de replicação, pense da seguinte forma a partir do momento que possui o fator de replicação você possui uma cópia de todos os dados, caso um broker venha a sair fora do ar o segundo broker que tem a copia assume e o processo continua sem precisar ser interrompido .
Conceito de líder por partição:
- Apenas um broker pode ser um líder para determinada partição.
- Apenas esse líder pode receber e servir dados para esta determinada partição.
- Os outros brokers serão apenas réplicas passivas que sincronizam os dados.
- Cada partição terá um líder e múltiplos ISR(in-sync replica, ou seja réplica em sincronia).
Producers:
- Produtores escrevem dados para os tópicos
- Produtores sabem automaticamente para qual broker e partição devem escrever.
- Quando os brokers falham os Producers se recuperam automaticamente.
- Produtores podem escolher se querem receber o reconhecimento (acknowledgment) se os dados foram gravados.
- Produtores podem escolher enviar uma chave (key) junto com a mensagem (a chave poder ser uma string, numero, etc...) caso a chave não for enviada por padrão dela é null e os dados são enviados em round robin(ou seja os dados são enviados em forma de rodizio ex: mgs1 para broker1, msg2 para o broker2, mgs3 para broker1, msg4 para o broker2 .....)
- Quando os produtores definem uma chave (key) então todos os dados com aquela chave vão para uma mesma partição
- Utiliza-se uma chave quando precisamos enviar dados de forma ordenada para um determinado campo
A três possibilidades de receber o acknowledgment:
- acks = 0 Produtor não espera uma resposta de reconhecimento (Possibilidade de perda de dados)
- acks = 1 Produtor vai espera uma resposta de reconhecimento do líder (Possibilidade de perda de dados é limitada)
- acks = all O líder e as réplicas precisam responder que sim que receberam os dados (Não a perda de dados pois caso um broker tenha problemas as réplicas também receberam os dados e o fluxo pode seguir normalmente)
Consumers:
- Leem os dados de um tópico(o tópico a ser consumido é identificado pelo nome).
- Consumers sabem qual broker ler automaticamente .
- Caso o broker falhe os consumers sabem como se recuperar Os dados são lidos em ordem dentro de cada partição.
Consumer Groups :
- Os Consumers leem os dados em grupos de consumo.
- Cada consumer dentro de um consumer group lê partições exclusivas.
- Caso tenha mais consumers que partições, alguns consumers ficaram inativos .
- Os consumers sabem se ordenar e para qual partição irá automaticamente devido ao coordenador de grupo e coordenador de consumidor, esta parte não é algo que se precise programar é um mecanismo já implementado no Kafka
Consumer offsets:
- Kafka armazena os offsets(deslocamentos ou compensações) após serem lidos pelo consumer group
- Os offsets são comprometidos ao vivo assim que são lidos pelos consumer groups e são armazenados em um tópico do Kafka chamado __consumer_offsets esse tópico é um tópico interno do Kafka
- Quando um grupo de consumidores processar os dados recebidos do kafka estes deveriam ser commitados nos offsets, este procedimento acontece automaticamente , mas você pode programá-lo também.
- Quando um consumidor falha ou morre a partir do momento que ele se recupera ele pode continuar de onde ele parou graças ao offset.
Delivery Semantics Consumer: (semânticas de entrega)
"O Apache Kafka suporta 3 semânticas de entrega de mensagens: no máximo uma vez, no mínimo uma vez e exatamente uma vez. Então, como você escolhe qual configuração é certa para você? A decisão geralmente se resume ao nível de integridade de dados exigido pelo seu caso de uso específico, mas outros fatores, como custo, sobrecarga de implementação e desempenho, também podem afetar qual semântica de entrega é sua melhor opção." Jacob Vasque
O Consumer escolhe quando será feita as compensações porém você tem a possibilidade de escolher, para isso tem 3 semantics delivery.
At most once (no máximo uma vez)
- A mensagem é entregue apenas uma vez ou não é entregue.
- Offset são confirmados assim que a mensagem é recebida .
- Se o processamento falhar a mensagem será perdida e não será lida novamente.
- Não é uma boa opção para dados sensíveis como aplicações da área financeira devido que podem haver perdas.
At least once (pelo menos uma vez) geralmente o mais usado
- Offset são confirmados assim que a mensagem é processada .
- Se o processamento falhar, a mensagem será lida novamente.
- Este pode resultar em processamento duplicado de mensagens, para que isso não venha a ser um problema para nosso sistema precisamos ter certeza que nosso processamento é Idempotent.
Exactly once (exatamente uma vez)
- A mensagem será entregue somente uma vez .
- A mensagem não pode ser descartada e nem duplicada.
- É a única configuração que pode garantir que nenhum evento seja perdido ou duplicado.
- Tem maior custo de implantação e menor desempenho, porém é necessária em cenários críticos de negócio.
Kafka Broker Discovery
Todo Kafka broker é chamado de "bootstrap server"
Isso significa que você precisa se conectar a um único broker e estará conectado ao cluster inteiro.
Cada broker sabe de todos os outros brokers, tópicos e partições(metadata)
Zookeeper
Zookeeper mantém todos os brokers juntos e os gerencia, por esse motivo mantém uma lista deles.
Ajuda na eleição para líder das partições(ex: quando um broker falha, uma nova partição se torna o líder então ele ajuda com isso).
Zokeeper envia notificações para o kafka quando há alguma mudança.
Kafka não funciona sem o Zookeeper.
Zookeeper opera com número ímpar de servidores (3,5,7).
Zookeeper funciona no conceito líder e seguidores o líder trabalha com a escrita e os seguidores com a leitura.
Produtores e Consumidores escrevem para o kafka e o kafka gerencia os metadados no zookeeper.
Kafka Garantias
- Mensagens são anexadas nas partições na ordem em que são enviadas .
- Consumers lerão estas mensagens na ordem em que foram gravadas nas partições .
Kafka na prática exemplo simples de Producer e Consumer:
[Producer]-> github.com/Ratkovski/Poc-Java-Kafka-Producer
[Consumer]-> github.com/Ratkovski/Poc-Java-Kafka-Consumer
Ferramentas Utilizadas:
- Docker
- Intellij IDEA
- Postman
Vamos iniciar nosso projeto acessando o Spring Initializr
Após gerar o código base podemos adicionar um arquivo docker-compose.yml que nos fornecerá um ambiente para trabalharmos com o Kafka de forma mais amigável, para usar desta forma você precisa ter o Docker instalado.
version: '3'
services:
kafka-cluster:
image: lensesio/fast-data-dev:2.2
environment:
ADV_HOST: 127.0.0.1 # Change to 192.168.99.100 if using Docker Toolbox
RUNTESTS: 0 # Disable Running tests so the cluster starts faster
FORWARDLOGS: 0 # Disable running 5 file source connectors that bring application logs into Kafka topics
SAMPLEDATA: 0 # Do not create sea_vessel_position_reports, nyc_yellow_taxi_trip_data, reddit_posts topics with sample Avro records.
ports:
- 2181:2181 # Zookeeper
- 3030:3030 # Lenses UI
- 8081-8083:8081-8083 # REST Proxy, Schema Registry, Kafka Connect ports
- 9581-9585:9581-9585 # JMX Ports
- 9092:9092 # Kafka Broker
Agora vamos criar as configurações do kafka.
Para este exemplo vamos criar um exemplo bem simples contendo um domínio, controller e a configuração do kafka
@Getter
@Setter
@ToString
public class Post {
private String userId;
private String id;
private String title;
private String body;
}
[Configuração do Kafka Producer]
public class KafkaConfiguration {
@Value("${kafka.bootstrap.server}")
private String bootStrapServer;
@Value("${spring.kafka.producer.enable-idempotence}")
private String idempotence;
@Value("${spring.kafka.producer.acks-config}")
private String acks;
@Value("${spring.kafka.producer.retries-config}")
private String retries;
@Value("${spring.kafka.producer.schema-registry-url}")
private String schemaRegistryUrl;
@Value("${spring.kafka.producer.topic-name}")
private String topicName;
@Bean
public ProducerFactory<String, Post> producerFactory() {
Map<String, Object> config = new HashMap<>();
//create Producer Properties
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
//create save Producer
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, idempotence);
config.put(ProducerConfig.ACKS_CONFIG, acks);
config.put(ProducerConfig.RETRIES_CONFIG, retries);
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,schemaRegistryUrl);
return new DefaultKafkaProducerFactory(config);
}
@Bean
public KafkaTemplate<String, kafka.avro.Post> kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
@Bean
public NewTopic topic() {
return new NewTopic(topicName, 1, (short) 1);
}
}
[Controller]
@RestController
@RequestMapping("/posts/")
@RequiredArgsConstructor
public class PostController {
@Value("${spring.kafka.producer.topic-name}")
private String topicName;
private final PostService postService;
private final KafkaTemplate<String, Post> kafkaTemplate;
@GetMapping("{userId}")
private ResponseEntity<Post> post(@PathVariable String userId) {
try {
Post post = postService.consulta(userId);
kafkaTemplate.send(topicName, userId, post);
return ResponseEntity.ok().body(post);
} catch (RuntimeException e) {
e.printStackTrace();
return ResponseEntity.badRequest().build();
}
}
}
Agora algumas configurações do Consumer
Para base pode usar as mesmas dependências do producer
[Domínio]
@Getter
@Setter
@ToString
public class Post {
private String userId;
private String id;
private String title;
private String body;
public Post() {
}
public Post(String userId, String id, String title, String body) {
this.userId = userId;
this.id = id;
this.title = title;
this.body = body;
}
}
[Configuração do Kafka Consumer]
@Configuration
@EnableKafka
@RequiredArgsConstructor
public class KafkaConfiguration {
//Consumer properties
public ConsumerFactory<String, kafka.avro.Post> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://127.0.0.1:8081");
//config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"");//pode possuir 3 valores *earliest/latest/none
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, kafka.avro.Post> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, kafka.avro.Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
[Consumer Listener ]
@Component
@RequiredArgsConstructor
public class PostListener {
@KafkaListener(topics = "test_topic", groupId = "groupTest",
properties = {"spring.json.value.default.type=br.com.academy.ratkovski.Poc.Java.Kafka.Consumer.domain.User"})
public void listener(ConsumerRecord<String, Object> consumerRecord) {
System.out.println("Message received in group" + consumerRecord.value());
}
}
Agora já podemos testar
1-Deixar as duas peças em runnig
2-No local que tiver o arquivo docker-compose.yml rodar o comando docker-compose up
3-No postman enviar um GET :http://localhost:9000/posts/3
Se tudo estiver correto na peça de consumer terá o log:
Com esta imagem do docker é possível acessar o dashboard do Lenses e ver algumas informações do kafka http://localhost:3030/
Espero que este post possa ajudar de alguma forma a todos que estão aprendendo.
Referências:
https://vepo.github.io/posts/o-que-e-o-apache-kafka
https://medium.com/@sdjemails/kafka-producer-delivery-semantics-be863c727d3f
https://kafka.apache.org/documentation/#semantics
https://keen.io/blog/demystifying-apache-kafka-message-delivery-semantics-at-most-once-at-least-once-exactly-once/
https://blog.kafkabr.com/posts/user-topics-internal-topics/
https://www.redhat.com/pt-br/topics/integration/what-is-apache-kafka
https://medium.com/@andy.bryant/processing-guarantees-in-kafka-12dd2e30be0e
https://medium.com/@gabrielqueiroz/o-que-%C3%A9-esse-tal-de-apache-kafka-a8f447cac028#:~:text=Uma%20mensagem%20pode%20tamb%C3%A9m%20ser,uma%20%C3%BAnica%20parti%C3%A7%C3%A3o%20do%20Kafka.
https://medium.com/@andy.bryant/processing-guarantees-in-kafka-12dd2e30be0e
https://dzone.com/articles/kafka-consumer-delivery-semantics#:~:text=In%20at%20least%20once%20delivery,preferred%20semantics%20out%20of%20all.
https://github.com/lensesio/fast-data-dev/tree/fdd/main
https://developer.confluent.io/quickstart/kafka-docker/
https://www.infoq.com/br/articles/apache-kafka-licoes/#:~:text=Messages%20(Mensagens),e%20numeradas%20por%20offsets%20exclusivos.





Top comments (1)
Parabéns, principalmente por ser o primeiro post!
Ficou excelente! Abrangeu o tema de maneira bem objetiva, irá ajudar tanto pessoas que não conhecem a tecnologia, quanto quem quer obter mais conhecimento!