DEV Community

Cover image for Como Construí um Pipeline de Dados em Go, Kafka e Elasticsearch com Docker Compose
Cláudio Filipe Lima Rapôso
Cláudio Filipe Lima Rapôso

Posted on

Como Construí um Pipeline de Dados em Go, Kafka e Elasticsearch com Docker Compose

Nos últimos dias eu precisava testar o Kafka Connect para um cenário de ingestão de dados e pensei: por que não montar um laboratório simples que já faça sentido para outros projetos?

Diagrama de Arquitetura

A ideia era clara:

  • Um producer em Go que consulta a API da Binance e envia mensagens para o Kafka.
  • Kafka Connect consumindo esse tópico e jogando as mensagens no Elasticsearch.
  • Kafka UI para inspecionar o que está rolando nos tópicos.

Tudo isso rodando em containers com Docker Compose, porque ninguém merece configurar isso manualmente.

Quer aprender como fazer? Bora lá!


O que vamos construir

Nosso fluxo de dados vai ser assim:

Diagrama de Sequência

E vamos adicionar o Kafka UI para facilitar a visualização.


Passo 1 – Estrutura do projeto

Crie uma pasta para o projeto:

mkdir kafka-connect-lab
cd kafka-connect-lab
Enter fullscreen mode Exit fullscreen mode

Estruture assim:

kafka-connect-lab/
├── cmd/worker/        # Código Go do producer
│   └── main.go
├── Dockerfile         # Dockerfile do worker
├── docker-compose.yml # Orquestração
└── go.mod             # Módulo Go
Enter fullscreen mode Exit fullscreen mode

Inicialize o módulo Go:

go mod init github.com/seuprojeto/kafka-connect-lab
go get github.com/segmentio/kafka-go
Enter fullscreen mode Exit fullscreen mode

Passo 2 – Escrevendo o Worker em Go

Vamos criar um producer que consulta o preço do Bitcoin na Binance e publica no Kafka.

O código completo:

Abra cmd/worker/main.go:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "os"
    "time"

    kafka "github.com/segmentio/kafka-go"
)
Enter fullscreen mode Exit fullscreen mode

👉 O que temos aqui?

  • Importamos pacotes básicos (log, os, time, etc.).
  • Importamos a lib kafka-go para conectar no Kafka.

type Ticker struct {
    Symbol string `json:"symbol"`
    Price  string `json:"price"`
}
Enter fullscreen mode Exit fullscreen mode

👉 Por que essa struct?
Ela representa o JSON que recebemos da API da Binance, com par de negociação e preço.


func fetchTicker(symbol string) (*Ticker, error) {
    url := "https://api.binance.com/api/v3/ticker/price?symbol=" + symbol
    resp, err := http.Get(url)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    if resp.StatusCode != http.StatusOK {
        return nil, fmt.Errorf("erro Binance: %d", resp.StatusCode)
    }
    var t Ticker
    if err := json.NewDecoder(resp.Body).Decode(&t); err != nil {
        return nil, err
    }
    return &t, nil
}
Enter fullscreen mode Exit fullscreen mode

👉 O que essa função faz?

  • Monta a URL da Binance.
  • Faz uma requisição HTTP GET.
  • Decodifica a resposta JSON direto para a struct Ticker.
  • Retorna um ponteiro para Ticker.

Agora vem a função main():

func main() {
    broker := os.Getenv("KAFKA_BROKER")
    topic := os.Getenv("KAFKA_TOPIC")
    symbol := os.Getenv("BINANCE_SYMBOL")
    if symbol == "" {
        symbol = "BTCUSDT"
    }
    interval := os.Getenv("FETCH_INTERVAL")
    if interval == "" {
        interval = "5s"
    }
    d, err := time.ParseDuration(interval)
    if err != nil {
        log.Fatalf("Intervalo inválido: %v", err)
    }
Enter fullscreen mode Exit fullscreen mode

👉 O que estamos fazendo?

  • Pegando as configs via variáveis de ambiente.
  • Se não passar nada, usamos defaults (BTCUSDT e intervalo de 5s).

    writer := kafka.NewWriter(kafka.WriterConfig{
        Brokers:  []string{broker},
        Topic:    topic,
        Balancer: &kafka.LeastBytes{},
    })
    defer writer.Close()

    log.Printf("Worker iniciado. Broker=%s, Tópico=%s, Ativo=%s", broker, topic, symbol)
Enter fullscreen mode Exit fullscreen mode

👉 Aqui inicializamos o Producer:

  • NewWriter cria um writer para enviar mensagens ao Kafka.
  • Usamos o balanceador LeastBytes para distribuir mensagens (bom mesmo em clusters).

    ticker := time.NewTicker(d)
    defer ticker.Stop()

    for range ticker.C {
        data, err := fetchTicker(symbol)
        if err != nil {
            log.Println("Erro ao buscar ticker:", err)
            continue
        }
        msg, _ := json.Marshal(data)
        err = writer.WriteMessages(context.Background(), kafka.Message{Value: msg})
        if err != nil {
            log.Println("Erro ao publicar:", err)
        } else {
            log.Printf("Mensagem publicada: %s = %s", data.Symbol, data.Price)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

👉 O loop principal:

  • Executa a cada interval segundos.
  • Busca o preço na Binance.
  • Serializa para JSON.
  • Publica no Kafka.

Resultado: A cada 5 segundos, temos o preço do BTC publicado no tópico.

Tópico


Passo 3 – Empacotando com Docker

Crie Dockerfile na raiz:

# build
FROM golang:1.20-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY cmd/worker ./cmd/worker
RUN go build -o worker ./cmd/worker

# runtime
FROM alpine:latest
WORKDIR /app
COPY --from=builder /app/worker ./
RUN apk add --no-cache ca-certificates
ENTRYPOINT ["./worker"]
Enter fullscreen mode Exit fullscreen mode

👉 Por que multi-stage?

  • A primeira imagem compila o binário.
  • A segunda só carrega o executável final, leve e rápida.

Passo 4 – Orquestrando tudo com Docker Compose

Crie docker-compose.yml:

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.7.4
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.7.4
    depends_on: [zookeeper]
    ports: ["9092:9092"]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.17.28
    environment:
      - discovery.type=single-node
      - ES_JAVA_OPTS=-Xms512m -Xmx512m
    ports: ["9200:9200"]

  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.7.4
    depends_on: [kafka]
    ports: ["8083:8083"]
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_GROUP_ID: connect-cluster
      CONNECT_CONFIG_STORAGE_TOPIC: connect-config
      CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: connect-status
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: 'false'
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'false'
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
    command:
      - bash -c
      - |
        confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:13.0.0
        /etc/confluent/docker/run

  worker:
    build: .
    depends_on: [kafka]
    environment:
      KAFKA_BROKER: kafka:9092
      KAFKA_TOPIC: binance-ticker
      BINANCE_SYMBOL: BTCUSDT
      FETCH_INTERVAL: 5s

  kafka-ui:
    image: provectuslabs/kafka-ui:v0.7.2
    depends_on: [kafka, kafka-connect]
    ports: ["8080:8080"]
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect:8083
Enter fullscreen mode Exit fullscreen mode

👉 O que temos aqui?

  • Kafka + Zookeeper.
  • Elasticsearch (para receber os dados).
  • Kafka Connect (já baixando o conector para ES).
  • Worker em Go.
  • Kafka UI para facilitar o debug.

Passo 5 – Subindo tudo

docker-compose up --build
Enter fullscreen mode Exit fullscreen mode

Se o Elasticsearch reclamar de memória:

sudo sysctl -w vm.max_map_count=262144
Enter fullscreen mode Exit fullscreen mode

Passo 6 – Criando o tópico

docker-compose exec kafka kafka-topics --create \
  --topic binance-ticker \
  --bootstrap-server kafka:9092 \
  --replication-factor 1 \
  --partitions 1
Enter fullscreen mode Exit fullscreen mode

Passo 7 – Configurando o conector

Agora vem a estrela do show: Kafka Connect.

curl -X POST -H "Content-Type: application/json" \
  --data '{
    "name": "es-sink",
    "config": {
      "connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "tasks.max":"1",
      "topics":"binance-ticker",
      "connection.url":"http://elasticsearch:9200",
      "type.name":"_doc",
      "key.ignore":"true",
      "schema.ignore":"true"
    }
  }' http://localhost:8083/connectors
Enter fullscreen mode Exit fullscreen mode

Quer saber se ele subiu?

curl http://localhost:8083/connectors/es-sink/status
Enter fullscreen mode Exit fullscreen mode

Status

Passo 8 – Testando tudo

  • Vá no Kafka UI: http://localhost:8080 → veja o tópico binance-ticker recebendo mensagens.
  • Consulte no Elasticsearch:
  curl http://localhost:9200/binance-ticker/_search?pretty
Enter fullscreen mode Exit fullscreen mode

Saída esperada:

Saida

Pronto. Kafka Connect funcionando!


Conclusão

Neste artigo, construímos um laboratório completo para testar o Kafka Connect:

  • Criamos um worker em Go que consome dados da API da Binance e publica no Kafka.
  • Subimos um ambiente com Kafka, Kafka Connect, Elasticsearch e Kafka UI usando Docker Compose.
  • Configuramos um conector Elasticsearch no Kafka Connect para consumir mensagens e indexá-las automaticamente.
  • Validamos o pipeline via Kafka UI e consultas REST no Elasticsearch.

Esse ambiente é um ótimo ponto de partida para testar outros conectores, experimentar transformações e entender o fluxo de dados em arquiteturas orientadas a eventos.

Como próximos passos, você pode:

  • Adicionar Kibana para visualizações.
  • Criar um cluster Kafka com múltiplos brokers.
  • Habilitar TLS e autenticação, aproximando o setup de produção.

Referências

Apache Software Foundation. (2024). Kafka Connect documentation. https://kafka.apache.org/documentation/#connect

Confluent, Inc. (2024). Confluent Hub. https://www.confluent.io/hub/

Elastic. (2024). Elasticsearch REST API documentation. https://www.elastic.co/guide/en/elasticsearch/reference/current/docs.html

LuizTools. (2023). Introdução ao Kafka e como usar com Node.js. https://luiztools.com.br/kafka-com-nodejs

Segment.io. (2024). kafka-go: Apache Kafka client for Go. https://github.com/segmentio/kafka-go


Leitura complementar


💡Curtiu?

Se quiser trocar ideia sobre IA, cloud e arquitetura, me segue nas redes:

Publico conteúdos técnicos direto do campo de batalha. E quando descubro uma ferramenta que economiza tempo e resolve bem, como essa, você fica sabendo também.

Top comments (0)