In this blog, we'll walk through the process of building Kafka Producer and Consumer microservices using Go, integrated with Docker.
We will,
- Build a Go Kafka producer with an HTTP endpoint.
- Build a Go Kafka consumer with retry and dead-letter support.
- Set up Kafka using Docker (KRaft mode — no Zookeeper required).
- Run the application end-to-end.
Introduction
Microservices architecture is a popular approach for building scalable and maintainable applications. By breaking down applications into smaller, independent services, developers can enhance flexibility and streamline deployment cycles. We will implement Kafka producer and consumer applications in Go and demonstrate how to run Kafka in a Docker container with Docker Compose to create a seamless microservices environment.
Go
Go is an open-source, statically typed, compiled language designed at Google for simplicity, reliability, and efficiency. It ships with a rich standard library, first-class concurrency primitives (goroutines and channels), and produces single, statically-linked binaries — making it an excellent fit for microservices and containerised workloads.
Apache Kafka
Kafka is a distributed streaming platform used to build real-time data pipelines and streaming applications. It allows producers to send messages to topics, which are then consumed by various consumers, making it ideal for event-driven architectures.
Docker
Docker is a platform that allows developers to automate the deployment of applications inside lightweight, portable containers. With Docker Compose, you can manage services like Kafka in isolated containers, making it easy to build and maintain microservices architectures.
Prerequisites
- Go 1.22 or later. If you haven't installed Go yet, this guide walks you through the setup.
- Docker
- Docker Compose
- Basic knowledge of Go and Kafka
Download Code from Github
Download full code from this github repository.
Project Overview
We will create two Go applications:
-
Kafka Producer: Exposes a
GET /sendHTTP endpoint that publishes a message to a Kafka topic. - Kafka Consumer: Connects to the topic, processes each message, and retries on failure with dead-letter support.
kafka-docker-go/
├── kafka-producer/
│ ├── go.mod
│ ├── main.go
│ ├── producer.go
│ ├── handler.go
│ └── model.go
├── kafka-consumer/
│ ├── go.mod
│ ├── main.go
│ ├── consumer.go
│ └── model.go
└── docker-compose.yml
Building and Running the Application
Docker Compose for Kafka Setup
Before we start building the Go producer and consumer, we first need a running Kafka instance locally. Instead of manually installing Kafka and managing all its dependencies, we'll use Docker Compose to spin up a ready-to-use environment in a single command.
We'll be using Apache Kafka running in KRaft mode (Kafka without ZooKeeper). Traditionally, Kafka relied on Apache ZooKeeper for cluster coordination, but modern Kafka versions (3.3+) have introduced KRaft (Kafka Raft metadata mode), which simplifies the architecture by removing ZooKeeper entirely.
docker-compose.yml:
This configuration provisions a single Kafka broker that also acts as a controller (KRaft mode):
services:
kafka:
image: apache/kafka:latest
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@localhost:9093'
KAFKA_LISTENERS: 'PLAINTEXT://:9092,CONTROLLER://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: 'broker,controller'
Instructs this single container to act simultaneously as both the metadata controller and the message broker, completely eliminating the need for ZooKeeper.KAFKA_ADVERTISED_LISTENERS
Defines the network address and port (localhost:9092) that external clients, such as your Go producers and consumers, will use to connect to the cluster.KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Sets the replication factor for the internal offsets topic to 1. This is ideal for single-node local development, ensuring the cluster stays healthy without requiring peer nodes.
To launch your local Kafka environment in detached mode, run:
$ docker-compose up -d
[+] up 2/2
✔ Network golang-kafka-docker_default Created 0.0s
✔ Container kafka Started
Verify the container is running:
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
ae58e1252135 apache/kafka:latest "/__cacert_entrypoin…" 3 minutes ago Up 3 minutes 0.0.0.0:9092->9092/tcp kafka
Now that Kafka is running, let's quickly verify it using the built-in console tools.
In a terminal, start a Kafka console consumer that reads all messages from the beginning of the topic:
I wrote following command in git Bash. Git Bash on Windows automatically converts paths starting with / to Windows paths, turning /opt/kafka/bin/... into C:/Program Files/Git/opt/kafka/bin/....
SO I wrote path starting with double slashes, //opt/kafka/bin/kafka-console-consumer.sh
docker exec -it kafka //opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic demo-kafka-topic \
--from-beginning
In a second terminal, start a producer to send a test message:
docker exec -it kafka //opt/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic demo-kafka-topic
Type a message and press Enter:
Hello from kafka Producer
The consumer terminal will immediately display:
Hello from kafka Producer
src="/static/images/golang-kafka-producer-consumer-with-docker/kafka-producer-consumer-manual-test.webp"
alt="kafka-producer-consumer-manual-test"
className="mx-auto my-6 h-5/6 w-5/6 rounded-lg shadow-lg"
/>
Kafka Consumer Service (Go)
Now that our Kafka broker is running, let's build the consumer service that listens to messages from the topic and processes them in real time.
We'll use kafka-go — a pure Go Kafka client that is a light-weight dependency, and gives fine-grained control over offset commits.
go.mod
module github.com/azam-akram/kafka-docker-go/kafka-consumer
go 1.22
require github.com/segmentio/kafka-go v0.4.51
Run go mod tidy after creating this file to download the dependency.
Message Model (model.go)
The Message struct is the shared data contract between producer and consumer. It is marshalled to JSON by the producer and unmarshalled back by the consumer.
package main
type Message struct {
UUID string `json:"uuid"`
From string `json:"from"`
To string `json:"to"`
Message string `json:"message"`
}
Kafka Consumer (consumer.go)
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
)
const (
maxRetries = 3
retryDelay = time.Second
)
type KafkaConsumer struct {
reader *kafka.Reader
dlqWriter *kafka.Writer
}
func NewKafkaConsumer(broker, topic, groupID string) *KafkaConsumer {
dlqTopic := topic + "-dlt"
return &KafkaConsumer{
reader: kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{broker},
Topic: topic,
GroupID: groupID,
MinBytes: 10e3,
MaxBytes: 10e6,
StartOffset: kafka.FirstOffset,
}),
dlqWriter: &kafka.Writer{
Addr: kafka.TCP(broker),
Topic: dlqTopic,
Balancer: &kafka.LeastBytes{},
AllowAutoTopicCreation: true,
},
}
}
// Run fetches messages in a loop until ctx is cancelled.
func (c *KafkaConsumer) Run(ctx context.Context) {
for {
m, err := c.reader.FetchMessage(ctx)
if err != nil {
if ctx.Err() != nil {
return // clean shutdown on SIGTERM / SIGINT
}
log.Printf("ERROR fetch: %v", err)
continue
}
if err := c.processWithRetry(m); err != nil {
log.Printf("ERROR retries exhausted, routing to DLQ — uuid=%s", string(m.Key))
c.sendToDLQ(ctx, m)
}
if err := c.reader.CommitMessages(ctx, m); err != nil {
log.Printf("ERROR commit: %v", err)
}
}
}
func (c *KafkaConsumer) processWithRetry(m kafka.Message) error {
var lastErr error
for attempt := 1; attempt <= maxRetries; attempt++ {
if err := process(m); err != nil {
lastErr = err
log.Printf("WARN attempt %d/%d failed: %v", attempt, maxRetries, err)
if attempt < maxRetries {
time.Sleep(retryDelay)
}
continue
}
return nil
}
return lastErr
}
func process(m kafka.Message) error {
var msg Message
if err := json.Unmarshal(m.Value, &msg); err != nil {
return fmt.Errorf("unmarshal: %w", err)
}
log.Println("=================================")
log.Println("Received message:")
log.Printf(" UUID: %s", msg.UUID)
log.Printf(" From: %s", msg.From)
log.Printf(" To: %s", msg.To)
log.Printf(" Message: %s", msg.Message)
log.Println("=================================")
return nil
}
func (c *KafkaConsumer) sendToDLQ(ctx context.Context, m kafka.Message) {
if err := c.dlqWriter.WriteMessages(ctx, kafka.Message{
Key: m.Key,
Value: m.Value,
}); err != nil {
log.Printf("ERROR DLQ write: %v", err)
}
}
func (c *KafkaConsumer) Close() {
c.reader.Close()
c.dlqWriter.Close()
}
Key design decisions explained:
kafka.NewReaderwithGroupID: registering aGroupIDenables consumer-group semantics — Kafka assigns partitions to group members and tracks committed offsets per group. Multiple consumer instances sharing the sameGroupIDcan scale horizontally across partitions.FetchMessagevsReadMessage:kafka-goprovides two fetch APIs.ReadMessageautomatically commits the offset after each fetch, which risks marking a message as processed even if your handler panics.FetchMessageleaves the offset uncommitted; we callCommitMessagesonly after the message has been fully processed (or routed to the DLQ). This gives us at-least-once delivery semantics.StartOffset: kafka.FirstOffset: equivalent toauto-offset-reset: earliestin a Spring Boot config. On first start (no committed offset for the group), the consumer reads from the beginning of the topic.processWithRetry: wrapsprocessin a simple retry loop — up tomaxRetriesattempts with aretryDelaybetween each. If a transient error causesprocessto fail, the message is retried in place before being sent to the DLQ.Dead-letter topic (
-dltsuffix): if all retry attempts are exhausted, the raw message bytes are forwarded todemo-kafka-topic-dlt. Failed messages are captured for inspection or manual reprocessing rather than silently dropped — the same pattern as@DltHandlerin Spring Kafka.
Application Entry Point (main.go)
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
)
func main() {
broker := "localhost:9092"
topic := "demo-kafka-topic"
groupID := "demo-kafka-consumer"
port := "5555"
consumer := NewKafkaConsumer(broker, topic, groupID)
defer consumer.Close()
// Simple health endpoint
go func() {
mux := http.NewServeMux()
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"status":"up"}`))
})
log.Printf("Health endpoint on :%s", port)
http.ListenAndServe(":"+port, mux)
}()
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()
log.Printf("Consumer started topic=%s group=%s", topic, groupID)
consumer.Run(ctx)
log.Println("Consumer stopped")
}
-
signal.NotifyContext: creates a context that is cancelled when the process receivesSIGINT(Ctrl+C) orSIGTERM(e.g. fromdocker stop). When the context is cancelled,FetchMessagereturns immediately, allowing theRunloop to exit cleanly. -
Health goroutine: a minimal
GET /healthendpoint runs on port 5555 in a background goroutine, so the consumer can be probed by Docker health checks or an orchestrator without interfering with the main consume loop.
Kafka Producer Service (Go)
The producer service exposes a simple REST endpoint. When called, it builds a Message and publishes it to the Kafka topic.
go.mod
module github.com/azam-akram/kafka-docker-go/kafka-producer
go 1.22
require github.com/segmentio/kafka-go v0.4.47
Message Model (model.go)
The producer's Message model mirrors the consumer's exactly, ensuring the JSON wire format is compatible on both sides.
package main
type Message struct {
UUID string `json:"uuid"`
From string `json:"from"`
To string `json:"to"`
Message string `json:"message"`
}
In a larger system you would extract this into a shared Go module to avoid duplication.
For this self-contained example, both services define their own copy.
Kafka Producer (producer.go)
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/segmentio/kafka-go"
)
type KafkaProducer struct {
writer *kafka.Writer
}
func NewKafkaProducer(broker, topic string) *KafkaProducer {
return &KafkaProducer{
writer: &kafka.Writer{
Addr: kafka.TCP(broker),
Topic: topic,
Balancer: &kafka.LeastBytes{},
RequiredAcks: kafka.RequireOne,
AllowAutoTopicCreation: true,
},
}
}
func (p *KafkaProducer) Send(ctx context.Context, msg Message) error {
value, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("marshal: %w", err)
}
if err := p.writer.WriteMessages(ctx, kafka.Message{
Key: []byte(msg.UUID),
Value: value,
}); err != nil {
return fmt.Errorf("write: %w", err)
}
log.Printf("Sent uuid=%s topic=%s", msg.UUID, p.writer.Topic)
return nil
}
func (p *KafkaProducer) Close() error {
return p.writer.Close()
}
Key design decisions:
-
RequiredAcks: kafka.RequireOne: the broker acknowledges the write once the leader partition has persisted the message. This balances throughput and durability for a single-node development setup. In production, usekafka.RequireAllto wait for all in-sync replicas. -
AllowAutoTopicCreation: true: ifdemo-kafka-topicdoes not exist yet, the broker creates it automatically on first write — no need to pre-create topics manually. - Message key set to UUID: using the UUID as the partition key ensures messages with the same UUID always land on the same partition. For random distribution across partitions, omit the key.
-
fmt.Errorf("...: %w", err): wraps errors with context using the%wverb so callers can useerrors.Is/errors.Asfor structured error handling.
HTTP Handler (handler.go)
package main
import (
"context"
"fmt"
"log"
"net/http"
"time"
"github.com/google/uuid"
)
type SendHandler struct {
producer *KafkaProducer
}
func (h *SendHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
msg := Message{
UUID: uuid.New().String(),
From: "Alice",
To: "Bob",
Message: "Hello from Go producer",
}
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
if err := h.producer.Send(ctx, msg); err != nil {
log.Printf("ERROR send: %v", err)
http.Error(w, "failed to send message", http.StatusInternalServerError)
return
}
fmt.Fprintln(w, "Message sent!")
}
-
ServeHTTP: implementing thehttp.Handlerinterface directly (rather than a plainfunc) lets us inject the*KafkaProducerdependency cleanly and register the handler withmux.Handle("/send", &SendHandler{...}). -
context.WithTimeout: the send is bound to a 5-second deadline. If the Kafka broker is unavailable,WriteMessagesreturns promptly with a deadline-exceeded error rather than hanging the HTTP request indefinitely. -
newUUID: generates a RFC 4122 v4 UUID viagithub.com/google/uuid.
Application Entry Point (main.go)
package main
import (
"log"
"net/http"
)
func main() {
broker := "localhost:9092"
topic := "demo-kafka-topic"
port := "4444"
producer := NewKafkaProducer(broker, topic)
defer producer.Close()
mux := http.NewServeMux()
mux.Handle("/send", &SendHandler{producer: producer})
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"status":"up"}`))
})
log.Printf("Producer listening on :%s", port)
log.Fatal(http.ListenAndServe(":"+port, mux))
}
The producer uses Go's built-in net/http server, no third-party web framework needed. A GET /health endpoint is registered alongside /send so that the service can be health-checked by Docker or an orchestrator.
Running the Application End-to-End
Step 1 — Start Kafka
docker-compose up -d
Step 2 — Start the Consumer
From the kafka-consumer directory:
go mod tidy
go run .
The consumer starts on port 5555 and begins listening to demo-kafka-topic.
2026/05/29 10:00:00 Health endpoint on :5555
2026/05/29 10:00:00 Consumer started topic=demo-kafka-topic group=demo-kafka-consumer
Step 3 — Start the Producer
From the kafka-producer directory:
go mod tidy
go run .
The producer starts on port 4444.
2026/05/29 10:00:05 Producer listening on :4444
Step 4 — Send a Message
Trigger the producer's REST endpoint:
curl http://localhost:4444/send
You should see the response:
Message sent!
Step 5 — Observe the Consumer Logs
In the consumer terminal you will see:
=================================
Received message:
UUID: 3f2504e0-4f89-11d3-9a0c-0305e82c3301
From: Alice
To: Bob
Message: Hello from Go producer
=================================
And in the producer terminal, the send confirmation:
2026/05/29 10:00:10 Sent uuid=3f2504e0-4f89-11d3-9a0c-0305e82c3301 topic=demo-kafka-topic
src="/static/images/golang-kafka-producer-consumer-with-docker/kafka-docker-go-e2e-test.png"
alt="kafka-docker-go-e2e-test"
className="mx-auto my-6 h-5/6 w-5/6 rounded-lg shadow-lg"
/>
Step 6 — Check Health Endpoints
Both services expose a /health endpoint:
# Producer health
curl http://localhost:4444/health
# Consumer health
curl http://localhost:5555/health
Both return:
{"status":"up"}
Step 7 — Graceful Shutdown
Press Ctrl+C in the consumer terminal. The signal.NotifyContext cancels the context, FetchMessage returns, and the consumer exits cleanly:
2026/05/29 10:01:00 Consumer stopped
Summary
In this blog we built two Go microservices that communicate asynchronously through Apache Kafka running in Docker:
| Component | Responsibility |
|---|---|
docker-compose.yml |
Runs a single-node Kafka broker in KRaft mode (no ZooKeeper) |
KafkaProducer |
Marshals a Message to JSON and publishes it to a topic |
SendHandler |
Exposes a GET /send HTTP endpoint to trigger the producer |
KafkaConsumer |
Unmarshals and processes messages with manual offset commits |
processWithRetry |
Retries failed messages up to 3 times with a 1-second backoff |
sendToDLQ |
Routes exhausted messages to a dead-letter topic (-dlt) |
The key production-ready features we added beyond a minimal example:
-
Manual offset commit —
FetchMessage+CommitMessagesensures an offset is committed only after successful processing, giving at-least-once delivery semantics. - Retry with backoff — transient failures are retried up to 3 times before escalating to the DLQ.
-
Dead-letter topic — messages that exhaust all retries are forwarded to
demo-kafka-topic-dltfor inspection rather than silently dropped. -
Graceful shutdown —
signal.NotifyContextcancels the consume loop onSIGTERM/SIGINT, preventing data loss on process stop. -
Health endpoints — both services expose
GET /healthfor Docker and orchestrator health checks. -
Environment-driven config — all connection parameters (
KAFKA_BROKER,KAFKA_TOPIC, etc.) are read from environment variables, making both services container-ready out of the box.
Top comments (0)