DEV Community

Kaike Castro
Kaike Castro

Posted on

Criando um cluster do Kafka com Docker Compose e desenvolvendo um Consumer e Producer em Golang

Introdução:

O Apache Kafka é uma plataforma de streaming distribuída que permite o envio e a recepção de fluxos de eventos em tempo real. No contexto do Kafka, um Producer é responsável por enviar mensagens para um ou mais tópicos, enquanto um Consumer é responsável por receber essas mensagens e processá-las.

Neste tutorial, vamos explorar como configurar um cluster do Kafka usando Docker Compose e desenvolver um Consumer e um Producer usando a linguagem de programação Go (Golang). Essa combinação é ideal para construir aplicações escaláveis e de alto desempenho que podem se beneficiar do poder e da flexibilidade do Kafka.

Pré-requisitos:

Antes de começar, certifique-se de ter os seguintes itens instalados em sua máquina:

Configurando o Kafka com Docker Compose:

Comece criando um arquivo chamado docker-compose.yml no diretório de sua escolha e adicione o seguinte conteúdo:

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

Enter fullscreen mode Exit fullscreen mode

Neste arquivo, estamos configurando dois serviços: o ZooKeeper, que é necessário para o funcionamento do Kafka, e o Kafka em si. A porta 2181 é usada para se conectar ao ZooKeeper e a porta 9092 para se conectar ao Kafka. Note que definimos a propriedade KAFKA_ADVERTISED_LISTENERS para que o Kafka seja acessível tanto dentro do contêiner (PLAINTEXT://kafka:9092) quanto fora (PLAINTEXT_HOST://localhost:9092).

Para iniciar o cluster do Kafka, execute o seguinte comando na raiz do seu projeto:

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

O papel do ZooKeeper no Kafka

O ZooKeeper é um serviço centralizado usado pelo Kafka para coordenar e gerenciar os nós do cluster. Ele é responsável por eleger um líder para cada partição do tópico, manter informações de configuração e status dos nós, além de auxiliar na detecção de falhas e no processo de failover.

No nosso exemplo, configuramos o ZooKeeper como um serviço separado no Docker Compose. O Kafka depende do ZooKeeper para registrar e manter as informações do cluster, garantindo que todos os nós estejam sincronizados.

Configurando o ambiente Go

Agora que o Kafka está em execução, vamos configurar nosso ambiente Go para desenvolver o Consumer e o Producer. Abra um terminal e crie um novo diretório para o projeto. Dentro dele, inicialize um módulo Go executando o seguinte comando:

go mod init kafka-tutorial
Enter fullscreen mode Exit fullscreen mode

Isso criará um arquivo go.mod para gerenciar as dependências do projeto.

Vamos usar a lib Sarama
A lib Sarama é uma biblioteca cliente para interagir com o Apache Kafka em Go. Ela fornece uma API fácil de usar para criar produtores e consumidores de mensagens Kafka, bem como administrar tópicos, partições e offsets.

Para instalá-la, execute o seguinte comando:

go get github.com/Shopify/sarama
Enter fullscreen mode Exit fullscreen mode

Criando o Consumer e o Producer:

Agora vamos criar os arquivos consumer.go e producer.go para implementar o Consumer e o Producer, respectivamente.

No consumer.go, adicione o seguinte código:

package main

import (
    "fmt"
    "log"

    "github.com/Shopify/sarama"
)

func main() {
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
    if err != nil {
        log.Fatalln("Failed to start consumer:", err)
    }
    defer func() {
        if err := consumer.Close(); err != nil {
            log.Fatalln("Failed to close consumer:", err)
        }
    }()

    partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, sarama.OffsetOldest)
    if err != nil {
        log.Fatalln("Failed to start partition consumer:", err)
    }
    defer func() {
        if err := partitionConsumer.Close(); err != nil {
            log.Fatalln("Failed to close partition consumer:", err)
        }
    }()

    for message := range partitionConsumer.Messages() {
        fmt.Printf("Received message: Key = %s, Value = %s\n", string(message.Key), string(message.Value))
    }
}
Enter fullscreen mode Exit fullscreen mode

No producer.go, adicione o seguinte código:

package main

import (
    "log"

    "github.com/Shopify/sarama"
)

func main() {
    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)
    if err != nil {
        log.Fatalln("Failed to start producer:", err)
    }
    defer func() {
        if err := producer.Close(); err != nil {
            log.Fatalln("Failed to close producer:", err)
        }
    }()

    message := &sarama.ProducerMessage{
        Topic: "test-topic",
        Value: sarama.StringEncoder("Hello, Kafka!"),
    }

    partition, offset, err := producer.SendMessage(message)
    if err != nil {
        log.Fatalln("Failed to send message:", err)
    }

    log.Printf("Message sent! Partition = %d, Offset = %d\n", partition, offset)
}
Enter fullscreen mode Exit fullscreen mode

No código acima, estamos no Consumer se conectando ao Kafka e consumindo mensagens do tópico "test-topic" a partir do offset mais antigo. No Producer, estamos enviando uma mensagem com o valor "Hello, Kafka!" para o tópico "test-topic".

Executando o Consumer e o Producer

Para executar o Producer, abra um terminal, navegue até o diretório do projeto e execute o seguinte comando:

go run producer.go
Enter fullscreen mode Exit fullscreen mode

Para executar o Consumer, abra outro terminal, navegue até o diretório do projeto e execute o seguinte comando:

go run consumer.go
Enter fullscreen mode Exit fullscreen mode

Agora você verá o Consumer recebendo a mensagem enviada pelo Producer.

Conclusão:

Neste tutorial, aprendemos como configurar um cluster do Kafka usando Docker Compose e como desenvolver um Consumer e um Producer usando a linguagem Go. O Kafka é uma poderosa plataforma de streaming que pode ser integrada a várias aplicações para processar eventos em tempo real. Com essa combinação, você está pronto para construir aplicações escaláveis

Top comments (2)

Collapse
 
matfigueiredo profile image
Matheus Figueiredo

Parabéns pelo post! Excelente tutorial.

Surgiu uma dúvida para alguns casos de usos: Além do offset mais antigo, é possível definir um offset específico para o Consumer começar a consumir as mensagens?

Collapse
 
kaike_castro profile image
Kaike Castro • Edited

Existe sim na lib do sarama tem como você pegar o offset mais recente sarama.OffsetNewest ou passar o offset na mão

offset := int64(42)
partitionConsumer, err := consumer.ConsumePartition(topic, partition, offset)
Enter fullscreen mode Exit fullscreen mode

Porém vai se você optar por passar um offset específico, lembre-se de que será responsabilidade do consumidor controlar e gerenciar o progresso dos offsets. Você precisará acompanhar manualmente o offset atualizado e garantir que ele seja salvo corretamente para continuar a partir desse ponto em futuras leituras.