DEV Community

Cover image for Descomplicando o mundo da programação - Guia basico sobre funcionamento e utilização do kafka em .NET
Bruno Pizol Camargo
Bruno Pizol Camargo

Posted on

Descomplicando o mundo da programação - Guia basico sobre funcionamento e utilização do kafka em .NET

Introdução

O Apache Kafka é uma plataforma de streaming de dados distribuída, de código aberto, que fornece uma arquitetura para a transmissão eficiente, processamento em tempo real e armazenamento durável de grandes volumes de dados em um ambiente distribuído. Ele é projetado para lidar com fluxos de dados em tempo real, sendo amplamente utilizado para construir sistemas escaláveis e de alta performance em aplicações que demandam processamento de eventos em tempo real.

O Kafka é como um mega grupo de WhatsApp super organizado, mas em vez de mensagens de texto, é usado para compartilhar informações de todos os tipos! Imagina que você e seus amigos adoram falar sobre vários assuntos, como séries, jogos e curiosidades.
Agora, cada um desses assuntos é como um "canal" no Kafka. As pessoas que adoram séries ficam em um canal, os gamers em outro, e assim por diante. O Kafka ajuda a garantir que todo mundo receba as notícias certinhas, no momento certo, mesmo que muita gente esteja conversando ao mesmo tempo.
É como se fosse um assistente pessoal mega eficiente, mas para manter todos atualizados sobre as coisas legais que estão acontecendo, como os lançamentos das suas séries favoritas ou novidades nos jogos que você curte. É uma forma de organizar e compartilhar informações, mesmo que muita gente esteja participando da conversa! 🌐✨

Image description

O que é Mensageria?

Atualmente o Kafka é empregado principalmente no uso de microserviços como um gerenciador de mensageria, mas o que é mensageria?

A mensageria é um padrão arquitetural em que sistemas distribuídos se comunicam através do envio de mensagens assíncronas entre componentes. Essas mensagens podem conter informações, comandos ou eventos e são fundamentais para a construção de sistemas resilientes, escaláveis e desacoplados.

Image description

Por que Usar Mensageria?

  • Desacoplamento: Permite que componentes do sistema se comuniquem sem conhecimento direto uns dos outros, reduzindo o acoplamento.
  • Escalabilidade: Facilita a escalabilidade horizontal, permitindo a distribuição de processamento entre diversos serviços.
  • Resiliência: Mensagens assíncronas lidam melhor com falhas temporárias, garantindo que a comunicação seja retomada após a recuperação do serviço.

Desvantagens da Mensageria:

  • Complexidade: Introduz complexidade adicional na arquitetura.
  • Latência: Mensagens assíncronas podem adicionar latência à comunicação entre componentes.

Por que Usar o Kafka?

O Kafka é escolhido por muitas organizações devido às suas características distintas:

  • Escalabilidade: Kafka é altamente escalável, permitindo processar grandes volumes de dados em tempo real.
  • Durabilidade: Mensagens são armazenadas de forma durável e podem ser reprocessadas, garantindo confiabilidade.
  • Particionamento: Tópicos podem ser particionados, distribuindo o processamento entre vários consumidores.

No contexto do Apache Kafka, os conceitos de partition, consumer, topic, broker e offset são fundamentais para entender como o Kafka opera.

  1. Topic (Tópico):

    • Definição: Um tópico no Kafka é uma categoria ou feed de mensagens. Cada mensagem publicada em um tópico é mantida por um período de tempo configurável, independentemente de os consumidores terem ou não lido essa mensagem.

      Pensa nos tópicos como canais de conversa. Cada tópico é como um grupo no WhatsApp, mas ao invés de textos, a galera troca mensagens.

      Os produtores (quem manda as mensagens) jogam suas mensagens nos tópicos, e os consumidores (quem recebe as mensagens) ficam de olho nos tópicos que interessam a eles.

  2. Partition (Partição):

    • Definição: Um tópico pode ser dividido em várias partições. As partições permitem que os dados sejam distribuídos e processados em paralelo.

      Imagina que um tópico é um bolo enorme, e as partições são pedaços desse bolo. Dividir em pedaços ajuda na hora de distribuir para todo mundo.

      Se o tópico é sobre séries, uma partição pode ser só de "Game of Thrones" e outra de "Stranger Things". Cada fã vai direto para a partição que mais gosta.

      As partições possibilitam a escala horizontal e permitem que várias instâncias de consumidores processem mensagens de um tópico simultaneamente.

  3. Consumer (Consumidor):

    • Definição: Um consumidor é uma aplicação ou processo que se inscreve em tópicos do Kafka para consumir mensagens. Cada consumidor mantém um registro do último offset que leu em cada partição.

      Os consumidores são os fãs que querem receber as mensagens. Eles escolhem em quais tópicos querem participar.

      Se você é um consumidor, pode se inscrever no tópico de "Game of Thrones" e receber todas as últimas novidades dessa série.

      Consumidores processam mensagens em tempo real ou de acordo com a lógica de processamento específica do aplicativo.

  4. Offset:

    • Definição: O offset é um identificador único associado a cada mensagem em uma partição. Ele representa a posição de um consumidor em uma partição específica.

      Sabe quando você para de assistir a um episódio no Netflix e quer continuar depois? O offset é tipo o número do episódio onde você parou. Se você já viu até o terceiro episódio de "Stranger Things", o offset vai ser 3. Então, na próxima vez que voltar, sabe exatamente por onde começar.

    O offset permite que os consumidores controlem o andamento do processamento, indicando qual mensagem eles já consumiram. Os consumidores podem reiniciar a partir de um offset específico em caso de falha ou para processar novamente mensagens.

  5. Broker:

    • O que é: O broker desempenha o papel central na organização do sistema. Funciona como o gestor principal do Kafka, encarregado de administrar tópicos, particionamento e monitoramento das mensagens. O broker é como o servidor do WhatsApp que garante que suas mensagens cheguem para todos no grupo. Ele também cuida para que, se um consumidor estiver offline, ele receba as mensagens quando voltar.

Fluxo de Dados Básico:

  • Um produtor publica mensagens em um tópico.
  • Cada tópico pode ter várias partições.
  • Consumidores se inscrevem em tópicos e podem processar mensagens de partições diferentes em paralelo.
  • Cada mensagem em uma partição tem um offset único.
  • Consumidores mantêm o controle do último offset consumido para cada partição.

Image description

fonte: https://developers.redhat.com/blog/2021/05/04/event-driven-apis-and-schema-governance-for-apache-kafka-get-ready-for-kafka-summit-europe-2021

Escalabilidade:

  • A divisão em partições permite a distribuição de carga e paralelismo.
  • Consumidores podem ser escalados horizontalmente para processar partições em paralelo.

Durabilidade e Retenção:

  • As mensagens em um tópico podem ser mantidas por um período configurável.
  • Os consumidores mantêm o controle do offset, permitindo que continuem de onde pararam, mesmo após reinicializações.

Rebalanceamento:

  • Quando novos consumidores são adicionados ou consumidores existentes são removidos, o Kafka realiza um rebalanceamento para redistribuir as partições entre os consumidores.
  • Isso garante um processamento equitativo e eficiente das mensagens.

Mão na massa

Agora que você já sabe todos esses conceitos fundamentais do Kafka para projetar sistemas escaláveis, duráveis e tolerantes a falhas para processamento de eventos em tempo real, vamos botar a mão na massa e fazer uma implementação básica.

Configuração do Ambiente

Vamos facilitar nossa vida e vamos utilizar uma imagem docker do kafka, para isso vamos configurar um docker-compose para nos ajudar:

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    networks: 
      - broker-kafka
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    networks: 
      - broker-kafka
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
      - 9101:9101
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  kafdrop2:
    image: obsidiandynamics/kafdrop:latest
    networks: 
      - broker-kafka
    depends_on:
      - kafka
    ports:
      - 19000:9000
    environment:
      KAFKA_BROKERCONNECT: kafka:29092

  akhq:    
    image: tchiotludo/akhq
    environment:
      AKHQ_CONFIGURATION: |
        akhq:
          connections:
            localhost:
              properties:
                bootstrap.servers: "kafka:29092"
    ports:
      - 8082:8082          
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - 8080:8080
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "kafka:29092"        
    networks:
      - broker-kafka
networks:
  broker-kafka:
    driver: bridge  

Enter fullscreen mode Exit fullscreen mode

Execute o comando abaixo para iniciar os serviços do Kafka e do ZooKeeper:

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

Criação de Tópico

Agora, você precisa criar um tópico para produzir e consumir mensagens. Substitua meu-topico pelo nome desejado:

docker exec -it kafka /opt/kafka_2.12-2.8.0/bin/kafka-topics.sh --create --topic meu-topico --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
Enter fullscreen mode Exit fullscreen mode

Configuração no Startup.cs

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace Kafkadotnetexample
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        public void ConfigureServices(IServiceCollection services)
        {
            // Adiciona os serviços necessários
            services.AddSingleton<EventProducer>(); // Adiciona o produtor como serviço singleton
            services.AddHostedService<EventConsumer>(); // Adiciona o consumidor como serviço hospedado
        }

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

Explicação:

  • Startup é uma classe que configura o ambiente da aplicação durante a inicialização.
  • ConfigureServices é chamado para configurar os serviços que serão usados pela aplicação. Aqui, registramos o produtor e o consumidor como serviços.
  • Configure é usado para configurar o pipeline de middleware da aplicação, mas está vazio neste exemplo.
  • Um HostedService é adequado quando você precisa de um serviço que seja executado continuamente em segundo plano, como é o caso de um consumidor de eventos Kafka que precisa estar sempre ativo para consumir mensagens em tempo real.
  • Ele é iniciado quando a aplicação é iniciada e é encerrado quando a aplicação é encerrada.
  • Se múltiplos consumidores precisam ser executados simultaneamente, cada um deles pode ser registrado como um HostedService separado.

Produção e consumo de mensagens:

Produtor de Eventos (C#)

using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

public class MessageProducer
{
    private readonly IProducer<string, string> _producer;
    private readonly ILogger<MessageProducer> _logger;

    public EventProducer(IConfiguration configuration, ILogger<MessageProducer> logger)
    {
        _logger = logger;

        var kafkaConfig = new ProducerConfig();
        configuration.GetSection("KafkaProducer").Bind(kafkaConfig);

        _producer = new ProducerBuilder<string, string>(kafkaConfig).Build();
    }

    public async Task ProduceMessageAsync(string topic, string message)
    {
        var kafkaMessage = new Message<string, string>
        {
            Key = null,
            Value = message
        };

        var deliveryResult = await _producer.ProduceAsync(topic, kafkaMessage);
        _logger.LogInformation($"Mensagem entregue para: {deliveryResult.TopicPartitionOffset}");
    }

    public async Task ProduceMessagesInBatchAsync(string topic, List<string> messages)
    {
        var tasks = new List<Task<DeliveryResult<string, string>>>();

        foreach (var message in messages)
        {
            var kafkaMessage = new Message<string, string>
            {
                Key = null,
                Value = message
            };

            tasks.Add(_producer.ProduceAsync(topic, kafkaMessage));
        }

        await Task.WhenAll(tasks);


    }
}

Enter fullscreen mode Exit fullscreen mode

aqui podemos analisar o código da seguinte forma:

  • É feita a injeção de dependência no construtor injetando a interface de log ILogger, a interface do producer do confluent.kafka IProducer e as configurações do kafka com IConfiguration.
  • Há um método assíncrono ProduceMessageAsync que produz e envia as mensagens
  • há outro método assíncrono ProduceMessagesInBatchAsync que envia as mensagens em lote.

Consumidor de Eventos (C#)

using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Threading;
using System.Threading.Tasks;

public class MessageConsumer : BackgroundService
{
    private readonly IConsumer<Ignore, string> _consumer;
    private readonly IProducer<string, string> _deadLetterProducer;
    private readonly ILogger<MessageConsumer> _logger;

    public EventConsumer(IConfiguration configuration, IProducer<string, string> deadLetterProducer, ILogger<MessageConsumer> logger)
    {
        _logger = logger;
        _deadLetterProducer = deadLetterProducer;

        var kafkaConfig = new ConsumerConfig();
        configuration.GetSection("KafkaConsumer").Bind(kafkaConfig);

        _consumer = new ConsumerBuilder<Ignore, string>(kafkaConfig).Build();
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _consumer.Subscribe("meu-topico");

        try
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                var consumeResult = _consumer.Consume(stoppingToken);

                if (ProcessAndCommitMessage(consumeResult))
                    continue;

                _logger.LogError($"Erro ao processar mensagem: {consumeResult.Value}");
                HandleDeadLetter(consumeResult);
            }
        }
        catch (ConsumeException e)
        {
            _logger.LogError($"Erro ao consumir: {e.Error.Reason}");
        }
    }

    private bool ProcessAndCommitMessage(ConsumeResult<Ignore, string> consumeResult)
    {
        var isSuccess = ProcessMessage(consumeResult.Value);

        if (isSuccess)
        {
            _logger.LogInformation($"Mensagem processada com sucesso: {consumeResult.Value}");
            _consumer.Commit(consumeResult);
        }

        return isSuccess;
    }

    private bool ProcessMessage(string message)
    {
        return message.Contains("result success", StringComparison.OrdinalIgnoreCase);
    }

    private void HandleDeadLetter(ConsumeResult<Ignore, string> consumeResult)
    {
        try
        {
            _deadLetterProducer.ProduceAsync("meu-topico-dead-letter", new Message<string, string> { Value = consumeResult.Value });
            _consumer.Commit(consumeResult); 
        }
        catch (Exception ex)
        {
            _logger.LogError($"Erro ao encaminhar para dead letter: {ex.Message}");
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

Aqui podemos analisar o código da seguinte forma:

  • É feita a injeção de dependência no construtor injetando a parte da interface de log ILogger, a interface do consumer IConsumer e do deadletter producer IProducer do confluent.kafka e as configurações do kafka com IConfiguration.
  • A classe herda de BackgroundService que permite ela ser executada em segundo plano.
  • o método de consumo ExecuteAsync onde é verificado o valor da mensagem no método ProcessAndCommitMessage onde se o valor for success ele gera a confirmação da mensagem com o commit.
  • se o retorno do ProcessAndCommitMessage for falso ele encaminha a mensagem para o topico da deadletter e confirma a mensagem original para não ser reenviada.

Resumindo:

  1. Produtor de Eventos (MessageProducer):

    • O que faz? Envio de mensagens para o Kafka.
    • Como funciona? Configura o produtor Kafka e oferece métodos para enviar mensagens, tanto individualmente como em lotes.
  2. Consumidor de Eventos (MessageConsumer):

    • O que faz? Recebe mensagens do Kafka e processa-as.
    • Como funciona? Configura o consumidor Kafka, assina um tópico e, em um loop contínuo, consome mensagens. Se o processamento for bem-sucedido, confirma a mensagem. Em caso de falha, encaminha a mensagem para um tópico de dead letter.

Voce ainda pode criar uma classe program e testar a aplicação em um projeto console app:

using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;

namespace KafkaConsoleApp
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var cts = new CancellationTokenSource();

            var producerTask = Task.Run(() => ProduceMessagesAsync(cts.Token));
            var consumerTask = Task.Run(() => ConsumeMessagesAsync(cts.Token));

            Console.WriteLine("Pressione ENTER para encerrar...");
            Console.ReadLine();

            cts.Cancel();

            await Task.WhenAll(producerTask, consumerTask);
        }

        static async Task ProduceMessagesAsync(CancellationToken cancellationToken)
        {
            var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
            using var producer = new ProducerBuilder<string, string>(config).Build();

            var topic = "meu-topico";

            var i = 0;
            while (!cancellationToken.IsCancellationRequested)
            {
                var message = $"Mensagem de produção {i++}";
                await producer.ProduceAsync(topic, new Message<string, string> { Value = message });
                Console.WriteLine($"Produzida mensagem: {message}");
                await Task.Delay(1000);
            }
        }

        static async Task ConsumeMessagesAsync(CancellationToken cancellationToken)
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = "localhost:9092",
                GroupId = "test-group",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            using var consumer = new ConsumerBuilder<Ignore, string>(config).Build();
            consumer.Subscribe("meu-topico");

            while (!cancellationToken.IsCancellationRequested)
            {
                try
                {
                    var consumeResult = consumer.Consume(cancellationToken);
                    Console.WriteLine($"Consumida mensagem: {consumeResult.Message.Value}");
                }
                catch (OperationCanceledException)
                {
                    // Ignore cancellation
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Erro ao consumir mensagem: {ex.Message}");
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Kafka UI

Geralmente o Kafka é somente uma aplicação em background rodando que você não consegue interagir diretamente, mas se você voltar no nosso docker-compose ali em cima vai perceber que a gente chamou o container do kafka-UI. Se você acessar o http://localhost:8080 no seu navegador você irá poder desfrutar dessa interface do kafka onde você pode verificar as mensagens que chegam, os consumidores, os grupos, e tudo mais, permitindo assim gerenciar visualmente seu kafka.

Conclusão

Nessa jornada pelo universo do Apache Kafka para .NET, mergulhamos fundo nos conceitos, desde o básico até colocar a mão na massa. O Kafka, esse maestro da comunicação assíncrona, nos mostrou como organizar o caos de eventos em tempo real de forma eficiente.

  • Considerações Vimos até então o básico do Kafka: entendemos sua logica de funcionamento e implementamos um produtor consumidor básico, mas não para por aqui. Provavelmente eu trarei uma segunda parte desse artigo focando mais no lado técnico com estratégias de otimizações que podem ser aplicadas dentro de um ambiente onde o kafka é usado, reduzindo riscos e aumentando consistência do produto.

Então, galera, esse é só o começo! Bora explorar mais e desbravar as possibilidades do Kafka. O palco tá pronto, e a plateia (ou os tópicos, no caso) estão esperando por mais! 🚀🎉

Top comments (0)