Introdução
O Apache Kafka é uma plataforma de streaming de dados distribuída, de código aberto, que fornece uma arquitetura para a transmissão eficiente, processamento em tempo real e armazenamento durável de grandes volumes de dados em um ambiente distribuído. Ele é projetado para lidar com fluxos de dados em tempo real, sendo amplamente utilizado para construir sistemas escaláveis e de alta performance em aplicações que demandam processamento de eventos em tempo real.
O Kafka é como um mega grupo de WhatsApp super organizado, mas em vez de mensagens de texto, é usado para compartilhar informações de todos os tipos! Imagina que você e seus amigos adoram falar sobre vários assuntos, como séries, jogos e curiosidades.
Agora, cada um desses assuntos é como um "canal" no Kafka. As pessoas que adoram séries ficam em um canal, os gamers em outro, e assim por diante. O Kafka ajuda a garantir que todo mundo receba as notícias certinhas, no momento certo, mesmo que muita gente esteja conversando ao mesmo tempo.
É como se fosse um assistente pessoal mega eficiente, mas para manter todos atualizados sobre as coisas legais que estão acontecendo, como os lançamentos das suas séries favoritas ou novidades nos jogos que você curte. É uma forma de organizar e compartilhar informações, mesmo que muita gente esteja participando da conversa! 🌐✨
O que é Mensageria?
Atualmente o Kafka é empregado principalmente no uso de microserviços como um gerenciador de mensageria, mas o que é mensageria?
A mensageria é um padrão arquitetural em que sistemas distribuídos se comunicam através do envio de mensagens assíncronas entre componentes. Essas mensagens podem conter informações, comandos ou eventos e são fundamentais para a construção de sistemas resilientes, escaláveis e desacoplados.
Por que Usar Mensageria?
- Desacoplamento: Permite que componentes do sistema se comuniquem sem conhecimento direto uns dos outros, reduzindo o acoplamento.
- Escalabilidade: Facilita a escalabilidade horizontal, permitindo a distribuição de processamento entre diversos serviços.
- Resiliência: Mensagens assíncronas lidam melhor com falhas temporárias, garantindo que a comunicação seja retomada após a recuperação do serviço.
Desvantagens da Mensageria:
- Complexidade: Introduz complexidade adicional na arquitetura.
- Latência: Mensagens assíncronas podem adicionar latência à comunicação entre componentes.
Por que Usar o Kafka?
O Kafka é escolhido por muitas organizações devido às suas características distintas:
- Escalabilidade: Kafka é altamente escalável, permitindo processar grandes volumes de dados em tempo real.
- Durabilidade: Mensagens são armazenadas de forma durável e podem ser reprocessadas, garantindo confiabilidade.
- Particionamento: Tópicos podem ser particionados, distribuindo o processamento entre vários consumidores.
No contexto do Apache Kafka, os conceitos de partition, consumer, topic, broker e offset são fundamentais para entender como o Kafka opera.
-
Topic (Tópico):
-
Definição: Um tópico no Kafka é uma categoria ou feed de mensagens. Cada mensagem publicada em um tópico é mantida por um período de tempo configurável, independentemente de os consumidores terem ou não lido essa mensagem.
Pensa nos tópicos como canais de conversa. Cada tópico é como um grupo no WhatsApp, mas ao invés de textos, a galera troca mensagens.
Os produtores (quem manda as mensagens) jogam suas mensagens nos tópicos, e os consumidores (quem recebe as mensagens) ficam de olho nos tópicos que interessam a eles.
-
-
Partition (Partição):
-
Definição: Um tópico pode ser dividido em várias partições. As partições permitem que os dados sejam distribuídos e processados em paralelo.
Imagina que um tópico é um bolo enorme, e as partições são pedaços desse bolo. Dividir em pedaços ajuda na hora de distribuir para todo mundo.
Se o tópico é sobre séries, uma partição pode ser só de "Game of Thrones" e outra de "Stranger Things". Cada fã vai direto para a partição que mais gosta.
As partições possibilitam a escala horizontal e permitem que várias instâncias de consumidores processem mensagens de um tópico simultaneamente.
-
-
Consumer (Consumidor):
-
Definição: Um consumidor é uma aplicação ou processo que se inscreve em tópicos do Kafka para consumir mensagens. Cada consumidor mantém um registro do último offset que leu em cada partição.
Os consumidores são os fãs que querem receber as mensagens. Eles escolhem em quais tópicos querem participar.
Se você é um consumidor, pode se inscrever no tópico de "Game of Thrones" e receber todas as últimas novidades dessa série.
Consumidores processam mensagens em tempo real ou de acordo com a lógica de processamento específica do aplicativo.
-
-
Offset:
-
Definição: O offset é um identificador único associado a cada mensagem em uma partição. Ele representa a posição de um consumidor em uma partição específica.
Sabe quando você para de assistir a um episódio no Netflix e quer continuar depois? O offset é tipo o número do episódio onde você parou. Se você já viu até o terceiro episódio de "Stranger Things", o offset vai ser 3. Então, na próxima vez que voltar, sabe exatamente por onde começar.
O offset permite que os consumidores controlem o andamento do processamento, indicando qual mensagem eles já consumiram. Os consumidores podem reiniciar a partir de um offset específico em caso de falha ou para processar novamente mensagens.
-
-
Broker:
- O que é: O broker desempenha o papel central na organização do sistema. Funciona como o gestor principal do Kafka, encarregado de administrar tópicos, particionamento e monitoramento das mensagens. O broker é como o servidor do WhatsApp que garante que suas mensagens cheguem para todos no grupo. Ele também cuida para que, se um consumidor estiver offline, ele receba as mensagens quando voltar.
Fluxo de Dados Básico:
- Um produtor publica mensagens em um tópico.
- Cada tópico pode ter várias partições.
- Consumidores se inscrevem em tópicos e podem processar mensagens de partições diferentes em paralelo.
- Cada mensagem em uma partição tem um offset único.
- Consumidores mantêm o controle do último offset consumido para cada partição.
Escalabilidade:
- A divisão em partições permite a distribuição de carga e paralelismo.
- Consumidores podem ser escalados horizontalmente para processar partições em paralelo.
Durabilidade e Retenção:
- As mensagens em um tópico podem ser mantidas por um período configurável.
- Os consumidores mantêm o controle do offset, permitindo que continuem de onde pararam, mesmo após reinicializações.
Rebalanceamento:
- Quando novos consumidores são adicionados ou consumidores existentes são removidos, o Kafka realiza um rebalanceamento para redistribuir as partições entre os consumidores.
- Isso garante um processamento equitativo e eficiente das mensagens.
Mão na massa
Agora que você já sabe todos esses conceitos fundamentais do Kafka para projetar sistemas escaláveis, duráveis e tolerantes a falhas para processamento de eventos em tempo real, vamos botar a mão na massa e fazer uma implementação básica.
Configuração do Ambiente
Vamos facilitar nossa vida e vamos utilizar uma imagem docker do kafka, para isso vamos configurar um docker-compose para nos ajudar:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
networks:
- broker-kafka
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:latest
networks:
- broker-kafka
depends_on:
- zookeeper
ports:
- 9092:9092
- 9101:9101
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafdrop2:
image: obsidiandynamics/kafdrop:latest
networks:
- broker-kafka
depends_on:
- kafka
ports:
- 19000:9000
environment:
KAFKA_BROKERCONNECT: kafka:29092
akhq:
image: tchiotludo/akhq
environment:
AKHQ_CONFIGURATION: |
akhq:
connections:
localhost:
properties:
bootstrap.servers: "kafka:29092"
ports:
- 8082:8082
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "kafka:29092"
networks:
- broker-kafka
networks:
broker-kafka:
driver: bridge
Execute o comando abaixo para iniciar os serviços do Kafka e do ZooKeeper:
docker-compose up -d
Criação de Tópico
Agora, você precisa criar um tópico para produzir e consumir mensagens. Substitua meu-topico
pelo nome desejado:
docker exec -it kafka /opt/kafka_2.12-2.8.0/bin/kafka-topics.sh --create --topic meu-topico --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Configuração no Startup.cs
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace Kafkadotnetexample
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; }
public void ConfigureServices(IServiceCollection services)
{
// Adiciona os serviços necessários
services.AddSingleton<EventProducer>(); // Adiciona o produtor como serviço singleton
services.AddHostedService<EventConsumer>(); // Adiciona o consumidor como serviço hospedado
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
}
}
}
Explicação:
-
Startup
é uma classe que configura o ambiente da aplicação durante a inicialização. -
ConfigureServices
é chamado para configurar os serviços que serão usados pela aplicação. Aqui, registramos o produtor e o consumidor como serviços. -
Configure
é usado para configurar o pipeline de middleware da aplicação, mas está vazio neste exemplo. - Um
HostedService
é adequado quando você precisa de um serviço que seja executado continuamente em segundo plano, como é o caso de um consumidor de eventos Kafka que precisa estar sempre ativo para consumir mensagens em tempo real. - Ele é iniciado quando a aplicação é iniciada e é encerrado quando a aplicação é encerrada.
- Se múltiplos consumidores precisam ser executados simultaneamente, cada um deles pode ser registrado como um
HostedService
separado.
Produção e consumo de mensagens:
Produtor de Eventos (C#)
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
public class MessageProducer
{
private readonly IProducer<string, string> _producer;
private readonly ILogger<MessageProducer> _logger;
public EventProducer(IConfiguration configuration, ILogger<MessageProducer> logger)
{
_logger = logger;
var kafkaConfig = new ProducerConfig();
configuration.GetSection("KafkaProducer").Bind(kafkaConfig);
_producer = new ProducerBuilder<string, string>(kafkaConfig).Build();
}
public async Task ProduceMessageAsync(string topic, string message)
{
var kafkaMessage = new Message<string, string>
{
Key = null,
Value = message
};
var deliveryResult = await _producer.ProduceAsync(topic, kafkaMessage);
_logger.LogInformation($"Mensagem entregue para: {deliveryResult.TopicPartitionOffset}");
}
public async Task ProduceMessagesInBatchAsync(string topic, List<string> messages)
{
var tasks = new List<Task<DeliveryResult<string, string>>>();
foreach (var message in messages)
{
var kafkaMessage = new Message<string, string>
{
Key = null,
Value = message
};
tasks.Add(_producer.ProduceAsync(topic, kafkaMessage));
}
await Task.WhenAll(tasks);
}
}
aqui podemos analisar o código da seguinte forma:
- É feita a injeção de dependência no construtor injetando a interface de log
ILogger
, a interface do producer do confluent.kafkaIProducer
e as configurações do kafka comIConfiguration
. - Há um método assíncrono
ProduceMessageAsync
que produz e envia as mensagens - há outro método assíncrono
ProduceMessagesInBatchAsync
que envia as mensagens em lote.
Consumidor de Eventos (C#)
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Threading;
using System.Threading.Tasks;
public class MessageConsumer : BackgroundService
{
private readonly IConsumer<Ignore, string> _consumer;
private readonly IProducer<string, string> _deadLetterProducer;
private readonly ILogger<MessageConsumer> _logger;
public EventConsumer(IConfiguration configuration, IProducer<string, string> deadLetterProducer, ILogger<MessageConsumer> logger)
{
_logger = logger;
_deadLetterProducer = deadLetterProducer;
var kafkaConfig = new ConsumerConfig();
configuration.GetSection("KafkaConsumer").Bind(kafkaConfig);
_consumer = new ConsumerBuilder<Ignore, string>(kafkaConfig).Build();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumer.Subscribe("meu-topico");
try
{
while (!stoppingToken.IsCancellationRequested)
{
var consumeResult = _consumer.Consume(stoppingToken);
if (ProcessAndCommitMessage(consumeResult))
continue;
_logger.LogError($"Erro ao processar mensagem: {consumeResult.Value}");
HandleDeadLetter(consumeResult);
}
}
catch (ConsumeException e)
{
_logger.LogError($"Erro ao consumir: {e.Error.Reason}");
}
}
private bool ProcessAndCommitMessage(ConsumeResult<Ignore, string> consumeResult)
{
var isSuccess = ProcessMessage(consumeResult.Value);
if (isSuccess)
{
_logger.LogInformation($"Mensagem processada com sucesso: {consumeResult.Value}");
_consumer.Commit(consumeResult);
}
return isSuccess;
}
private bool ProcessMessage(string message)
{
return message.Contains("result success", StringComparison.OrdinalIgnoreCase);
}
private void HandleDeadLetter(ConsumeResult<Ignore, string> consumeResult)
{
try
{
_deadLetterProducer.ProduceAsync("meu-topico-dead-letter", new Message<string, string> { Value = consumeResult.Value });
_consumer.Commit(consumeResult);
}
catch (Exception ex)
{
_logger.LogError($"Erro ao encaminhar para dead letter: {ex.Message}");
}
}
}
Aqui podemos analisar o código da seguinte forma:
- É feita a injeção de dependência no construtor injetando a parte da interface de log
ILogger
, a interface do consumerIConsumer
e do deadletter producerIProducer
do confluent.kafka e as configurações do kafka comIConfiguration
. - A classe herda de
BackgroundService
que permite ela ser executada em segundo plano. - o método de consumo
ExecuteAsync
onde é verificado o valor da mensagem no métodoProcessAndCommitMessage
onde se o valor for success ele gera a confirmação da mensagem com o commit. - se o retorno do
ProcessAndCommitMessage
for falso ele encaminha a mensagem para o topico da deadletter e confirma a mensagem original para não ser reenviada.
Resumindo:
-
Produtor de Eventos (
MessageProducer
):- O que faz? Envio de mensagens para o Kafka.
- Como funciona? Configura o produtor Kafka e oferece métodos para enviar mensagens, tanto individualmente como em lotes.
-
Consumidor de Eventos (
MessageConsumer
):- O que faz? Recebe mensagens do Kafka e processa-as.
- Como funciona? Configura o consumidor Kafka, assina um tópico e, em um loop contínuo, consome mensagens. Se o processamento for bem-sucedido, confirma a mensagem. Em caso de falha, encaminha a mensagem para um tópico de dead letter.
Voce ainda pode criar uma classe program e testar a aplicação em um projeto console app:
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
namespace KafkaConsoleApp
{
class Program
{
static async Task Main(string[] args)
{
var cts = new CancellationTokenSource();
var producerTask = Task.Run(() => ProduceMessagesAsync(cts.Token));
var consumerTask = Task.Run(() => ConsumeMessagesAsync(cts.Token));
Console.WriteLine("Pressione ENTER para encerrar...");
Console.ReadLine();
cts.Cancel();
await Task.WhenAll(producerTask, consumerTask);
}
static async Task ProduceMessagesAsync(CancellationToken cancellationToken)
{
var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
using var producer = new ProducerBuilder<string, string>(config).Build();
var topic = "meu-topico";
var i = 0;
while (!cancellationToken.IsCancellationRequested)
{
var message = $"Mensagem de produção {i++}";
await producer.ProduceAsync(topic, new Message<string, string> { Value = message });
Console.WriteLine($"Produzida mensagem: {message}");
await Task.Delay(1000);
}
}
static async Task ConsumeMessagesAsync(CancellationToken cancellationToken)
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "test-group",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
consumer.Subscribe("meu-topico");
while (!cancellationToken.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(cancellationToken);
Console.WriteLine($"Consumida mensagem: {consumeResult.Message.Value}");
}
catch (OperationCanceledException)
{
// Ignore cancellation
}
catch (Exception ex)
{
Console.WriteLine($"Erro ao consumir mensagem: {ex.Message}");
}
}
}
}
}
Kafka UI
Geralmente o Kafka é somente uma aplicação em background rodando que você não consegue interagir diretamente, mas se você voltar no nosso docker-compose ali em cima vai perceber que a gente chamou o container do kafka-UI. Se você acessar o http://localhost:8080 no seu navegador você irá poder desfrutar dessa interface do kafka onde você pode verificar as mensagens que chegam, os consumidores, os grupos, e tudo mais, permitindo assim gerenciar visualmente seu kafka.
Conclusão
Nessa jornada pelo universo do Apache Kafka para .NET, mergulhamos fundo nos conceitos, desde o básico até colocar a mão na massa. O Kafka, esse maestro da comunicação assíncrona, nos mostrou como organizar o caos de eventos em tempo real de forma eficiente.
- Considerações Vimos até então o básico do Kafka: entendemos sua logica de funcionamento e implementamos um produtor consumidor básico, mas não para por aqui. Provavelmente eu trarei uma segunda parte desse artigo focando mais no lado técnico com estratégias de otimizações que podem ser aplicadas dentro de um ambiente onde o kafka é usado, reduzindo riscos e aumentando consistência do produto.
Então, galera, esse é só o começo! Bora explorar mais e desbravar as possibilidades do Kafka. O palco tá pronto, e a plateia (ou os tópicos, no caso) estão esperando por mais! 🚀🎉
Top comments (0)