Nos últimos dias eu precisava testar o Kafka Connect para um cenário de ingestão de dados e pensei: por que não montar um laboratório simples que já faça sentido para outros projetos?
A ideia era clara:
- Um producer em Go que consulta a API da Binance e envia mensagens para o Kafka.
- Kafka Connect consumindo esse tópico e jogando as mensagens no Elasticsearch.
- Kafka UI para inspecionar o que está rolando nos tópicos.
Tudo isso rodando em containers com Docker Compose, porque ninguém merece configurar isso manualmente.
Quer aprender como fazer? Bora lá!
O que vamos construir
Nosso fluxo de dados vai ser assim:
E vamos adicionar o Kafka UI para facilitar a visualização.
Passo 1 – Estrutura do projeto
Crie uma pasta para o projeto:
mkdir kafka-connect-lab
cd kafka-connect-lab
Estruture assim:
kafka-connect-lab/
├── cmd/worker/ # Código Go do producer
│ └── main.go
├── Dockerfile # Dockerfile do worker
├── docker-compose.yml # Orquestração
└── go.mod # Módulo Go
Inicialize o módulo Go:
go mod init github.com/seuprojeto/kafka-connect-lab
go get github.com/segmentio/kafka-go
Passo 2 – Escrevendo o Worker em Go
Vamos criar um producer que consulta o preço do Bitcoin na Binance e publica no Kafka.
O código completo:
Abra cmd/worker/main.go
:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"time"
kafka "github.com/segmentio/kafka-go"
)
👉 O que temos aqui?
- Importamos pacotes básicos (
log
,os
,time
, etc.). - Importamos a lib
kafka-go
para conectar no Kafka.
type Ticker struct {
Symbol string `json:"symbol"`
Price string `json:"price"`
}
👉 Por que essa struct?
Ela representa o JSON que recebemos da API da Binance, com par de negociação e preço.
func fetchTicker(symbol string) (*Ticker, error) {
url := "https://api.binance.com/api/v3/ticker/price?symbol=" + symbol
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("erro Binance: %d", resp.StatusCode)
}
var t Ticker
if err := json.NewDecoder(resp.Body).Decode(&t); err != nil {
return nil, err
}
return &t, nil
}
👉 O que essa função faz?
- Monta a URL da Binance.
- Faz uma requisição HTTP GET.
- Decodifica a resposta JSON direto para a struct
Ticker
. - Retorna um ponteiro para
Ticker
.
Agora vem a função main()
:
func main() {
broker := os.Getenv("KAFKA_BROKER")
topic := os.Getenv("KAFKA_TOPIC")
symbol := os.Getenv("BINANCE_SYMBOL")
if symbol == "" {
symbol = "BTCUSDT"
}
interval := os.Getenv("FETCH_INTERVAL")
if interval == "" {
interval = "5s"
}
d, err := time.ParseDuration(interval)
if err != nil {
log.Fatalf("Intervalo inválido: %v", err)
}
👉 O que estamos fazendo?
- Pegando as configs via variáveis de ambiente.
- Se não passar nada, usamos defaults (
BTCUSDT
e intervalo de5s
).
writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{broker},
Topic: topic,
Balancer: &kafka.LeastBytes{},
})
defer writer.Close()
log.Printf("Worker iniciado. Broker=%s, Tópico=%s, Ativo=%s", broker, topic, symbol)
👉 Aqui inicializamos o Producer:
-
NewWriter
cria um writer para enviar mensagens ao Kafka. - Usamos o balanceador
LeastBytes
para distribuir mensagens (bom mesmo em clusters).
ticker := time.NewTicker(d)
defer ticker.Stop()
for range ticker.C {
data, err := fetchTicker(symbol)
if err != nil {
log.Println("Erro ao buscar ticker:", err)
continue
}
msg, _ := json.Marshal(data)
err = writer.WriteMessages(context.Background(), kafka.Message{Value: msg})
if err != nil {
log.Println("Erro ao publicar:", err)
} else {
log.Printf("Mensagem publicada: %s = %s", data.Symbol, data.Price)
}
}
}
👉 O loop principal:
- Executa a cada
interval
segundos. - Busca o preço na Binance.
- Serializa para JSON.
- Publica no Kafka.
Resultado: A cada 5 segundos, temos o preço do BTC publicado no tópico.
Passo 3 – Empacotando com Docker
Crie Dockerfile
na raiz:
# build
FROM golang:1.20-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY cmd/worker ./cmd/worker
RUN go build -o worker ./cmd/worker
# runtime
FROM alpine:latest
WORKDIR /app
COPY --from=builder /app/worker ./
RUN apk add --no-cache ca-certificates
ENTRYPOINT ["./worker"]
👉 Por que multi-stage?
- A primeira imagem compila o binário.
- A segunda só carrega o executável final, leve e rápida.
Passo 4 – Orquestrando tudo com Docker Compose
Crie docker-compose.yml
:
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.7.4
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.7.4
depends_on: [zookeeper]
ports: ["9092:9092"]
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.28
environment:
- discovery.type=single-node
- ES_JAVA_OPTS=-Xms512m -Xmx512m
ports: ["9200:9200"]
kafka-connect:
image: confluentinc/cp-kafka-connect:7.7.4
depends_on: [kafka]
ports: ["8083:8083"]
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_GROUP_ID: connect-cluster
CONNECT_CONFIG_STORAGE_TOPIC: connect-config
CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: 'false'
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'false'
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
command:
- bash -c
- |
confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:13.0.0
/etc/confluent/docker/run
worker:
build: .
depends_on: [kafka]
environment:
KAFKA_BROKER: kafka:9092
KAFKA_TOPIC: binance-ticker
BINANCE_SYMBOL: BTCUSDT
FETCH_INTERVAL: 5s
kafka-ui:
image: provectuslabs/kafka-ui:v0.7.2
depends_on: [kafka, kafka-connect]
ports: ["8080:8080"]
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect:8083
👉 O que temos aqui?
- Kafka + Zookeeper.
- Elasticsearch (para receber os dados).
- Kafka Connect (já baixando o conector para ES).
- Worker em Go.
- Kafka UI para facilitar o debug.
Passo 5 – Subindo tudo
docker-compose up --build
Se o Elasticsearch reclamar de memória:
sudo sysctl -w vm.max_map_count=262144
Passo 6 – Criando o tópico
docker-compose exec kafka kafka-topics --create \
--topic binance-ticker \
--bootstrap-server kafka:9092 \
--replication-factor 1 \
--partitions 1
Passo 7 – Configurando o conector
Agora vem a estrela do show: Kafka Connect.
curl -X POST -H "Content-Type: application/json" \
--data '{
"name": "es-sink",
"config": {
"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max":"1",
"topics":"binance-ticker",
"connection.url":"http://elasticsearch:9200",
"type.name":"_doc",
"key.ignore":"true",
"schema.ignore":"true"
}
}' http://localhost:8083/connectors
Quer saber se ele subiu?
curl http://localhost:8083/connectors/es-sink/status
Passo 8 – Testando tudo
- Vá no Kafka UI: http://localhost:8080 → veja o tópico
binance-ticker
recebendo mensagens. - Consulte no Elasticsearch:
curl http://localhost:9200/binance-ticker/_search?pretty
Saída esperada:
Pronto. Kafka Connect funcionando!
Conclusão
Neste artigo, construímos um laboratório completo para testar o Kafka Connect:
- Criamos um worker em Go que consome dados da API da Binance e publica no Kafka.
- Subimos um ambiente com Kafka, Kafka Connect, Elasticsearch e Kafka UI usando Docker Compose.
- Configuramos um conector Elasticsearch no Kafka Connect para consumir mensagens e indexá-las automaticamente.
- Validamos o pipeline via Kafka UI e consultas REST no Elasticsearch.
Esse ambiente é um ótimo ponto de partida para testar outros conectores, experimentar transformações e entender o fluxo de dados em arquiteturas orientadas a eventos.
Como próximos passos, você pode:
- Adicionar Kibana para visualizações.
- Criar um cluster Kafka com múltiplos brokers.
- Habilitar TLS e autenticação, aproximando o setup de produção.
Referências
Apache Software Foundation. (2024). Kafka Connect documentation. https://kafka.apache.org/documentation/#connect
Confluent, Inc. (2024). Confluent Hub. https://www.confluent.io/hub/
Elastic. (2024). Elasticsearch REST API documentation. https://www.elastic.co/guide/en/elasticsearch/reference/current/docs.html
LuizTools. (2023). Introdução ao Kafka e como usar com Node.js. https://luiztools.com.br/kafka-com-nodejs
Segment.io. (2024). kafka-go: Apache Kafka client for Go. https://github.com/segmentio/kafka-go
Leitura complementar
- LuizTools – Introdução ao Kafka e como usar com Node.js
- Confluent – Building Data Pipelines with Kafka Connect
- Elastic – Ingestão de dados com Kafka + Elasticsearch
- Go by Example – Trabalhando com JSON em Go
💡Curtiu?
Se quiser trocar ideia sobre IA, cloud e arquitetura, me segue nas redes:
Publico conteúdos técnicos direto do campo de batalha. E quando descubro uma ferramenta que economiza tempo e resolve bem, como essa, você fica sabendo também.
Top comments (0)