DEV Community

Cover image for Building Kafka Producer-Consumer Using Go and Docker
Azam Akram
Azam Akram

Posted on • Originally published at solutiontoolkit.com

Building Kafka Producer-Consumer Using Go and Docker

In this blog, we'll walk through the process of building Kafka Producer and Consumer microservices using Go, integrated with Docker.

We will,

  1. Build a Go Kafka producer with an HTTP endpoint.
  2. Build a Go Kafka consumer with retry and dead-letter support.
  3. Set up Kafka using Docker (KRaft mode — no Zookeeper required).
  4. Run the application end-to-end.

Introduction

Microservices architecture is a popular approach for building scalable and maintainable applications. By breaking down applications into smaller, independent services, developers can enhance flexibility and streamline deployment cycles. We will implement Kafka producer and consumer applications in Go and demonstrate how to run Kafka in a Docker container with Docker Compose to create a seamless microservices environment.

Go

Go is an open-source, statically typed, compiled language designed at Google for simplicity, reliability, and efficiency. It ships with a rich standard library, first-class concurrency primitives (goroutines and channels), and produces single, statically-linked binaries — making it an excellent fit for microservices and containerised workloads.

Apache Kafka

Kafka is a distributed streaming platform used to build real-time data pipelines and streaming applications. It allows producers to send messages to topics, which are then consumed by various consumers, making it ideal for event-driven architectures.

Docker

Docker is a platform that allows developers to automate the deployment of applications inside lightweight, portable containers. With Docker Compose, you can manage services like Kafka in isolated containers, making it easy to build and maintain microservices architectures.

Prerequisites

  • Go 1.22 or later. If you haven't installed Go yet, this guide walks you through the setup.
  • Docker
  • Docker Compose
  • Basic knowledge of Go and Kafka

Download Code from Github

Download full code from this github repository.

Project Overview

We will create two Go applications:

  1. Kafka Producer: Exposes a GET /send HTTP endpoint that publishes a message to a Kafka topic.
  2. Kafka Consumer: Connects to the topic, processes each message, and retries on failure with dead-letter support.
kafka-docker-go/
├── kafka-producer/
│   ├── go.mod
│   ├── main.go
│   ├── producer.go
│   ├── handler.go
│   └── model.go
├── kafka-consumer/
│   ├── go.mod
│   ├── main.go
│   ├── consumer.go
│   └── model.go
└── docker-compose.yml
Enter fullscreen mode Exit fullscreen mode

Building and Running the Application

Docker Compose for Kafka Setup

Before we start building the Go producer and consumer, we first need a running Kafka instance locally. Instead of manually installing Kafka and managing all its dependencies, we'll use Docker Compose to spin up a ready-to-use environment in a single command.

We'll be using Apache Kafka running in KRaft mode (Kafka without ZooKeeper). Traditionally, Kafka relied on Apache ZooKeeper for cluster coordination, but modern Kafka versions (3.3+) have introduced KRaft (Kafka Raft metadata mode), which simplifies the architecture by removing ZooKeeper entirely.

docker-compose.yml:

This configuration provisions a single Kafka broker that also acts as a controller (KRaft mode):

services:
  kafka:
    image: apache/kafka:latest
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@localhost:9093'
      KAFKA_LISTENERS: 'PLAINTEXT://:9092,CONTROLLER://:9093'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Enter fullscreen mode Exit fullscreen mode
  • KAFKA_PROCESS_ROLES: 'broker,controller'
    Instructs this single container to act simultaneously as both the metadata controller and the message broker, completely eliminating the need for ZooKeeper.

  • KAFKA_ADVERTISED_LISTENERS
    Defines the network address and port (localhost:9092) that external clients, such as your Go producers and consumers, will use to connect to the cluster.

  • KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    Sets the replication factor for the internal offsets topic to 1. This is ideal for single-node local development, ensuring the cluster stays healthy without requiring peer nodes.

To launch your local Kafka environment in detached mode, run:

$ docker-compose up -d
[+] up 2/2
 ✔ Network golang-kafka-docker_default  Created                          0.0s
 ✔ Container kafka                      Started
Enter fullscreen mode Exit fullscreen mode

Verify the container is running:

$ docker ps
CONTAINER ID   IMAGE                COMMAND                  CREATED         STATUS         PORTS                    NAMES
ae58e1252135   apache/kafka:latest  "/__cacert_entrypoin…"   3 minutes ago   Up 3 minutes   0.0.0.0:9092->9092/tcp   kafka
Enter fullscreen mode Exit fullscreen mode

Now that Kafka is running, let's quickly verify it using the built-in console tools.

In a terminal, start a Kafka console consumer that reads all messages from the beginning of the topic:

I wrote following command in git Bash. Git Bash on Windows automatically converts paths starting with / to Windows paths, turning /opt/kafka/bin/... into C:/Program Files/Git/opt/kafka/bin/....
SO I wrote path starting with double slashes, //opt/kafka/bin/kafka-console-consumer.sh

docker exec -it kafka //opt/kafka/bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic demo-kafka-topic \
  --from-beginning
Enter fullscreen mode Exit fullscreen mode

In a second terminal, start a producer to send a test message:

docker exec -it kafka //opt/kafka/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic demo-kafka-topic
Enter fullscreen mode Exit fullscreen mode

Type a message and press Enter:

Hello from kafka Producer
Enter fullscreen mode Exit fullscreen mode

The consumer terminal will immediately display:

Hello from kafka Producer
Enter fullscreen mode Exit fullscreen mode

src="/static/images/golang-kafka-producer-consumer-with-docker/kafka-producer-consumer-manual-test.webp"
alt="kafka-producer-consumer-manual-test"
className="mx-auto my-6 h-5/6 w-5/6 rounded-lg shadow-lg"
/>


Kafka Consumer Service (Go)

Now that our Kafka broker is running, let's build the consumer service that listens to messages from the topic and processes them in real time.

We'll use kafka-go — a pure Go Kafka client that is a light-weight dependency, and gives fine-grained control over offset commits.

go.mod

module github.com/azam-akram/kafka-docker-go/kafka-consumer

go 1.22

require github.com/segmentio/kafka-go v0.4.51
Enter fullscreen mode Exit fullscreen mode

Run go mod tidy after creating this file to download the dependency.

Message Model (model.go)

The Message struct is the shared data contract between producer and consumer. It is marshalled to JSON by the producer and unmarshalled back by the consumer.

package main

type Message struct {
    UUID    string `json:"uuid"`
    From    string `json:"from"`
    To      string `json:"to"`
    Message string `json:"message"`
}
Enter fullscreen mode Exit fullscreen mode

Kafka Consumer (consumer.go)

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
)

const (
    maxRetries = 3
    retryDelay = time.Second
)

type KafkaConsumer struct {
    reader    *kafka.Reader
    dlqWriter *kafka.Writer
}

func NewKafkaConsumer(broker, topic, groupID string) *KafkaConsumer {
    dlqTopic := topic + "-dlt"
    return &KafkaConsumer{
        reader: kafka.NewReader(kafka.ReaderConfig{
            Brokers:     []string{broker},
            Topic:       topic,
            GroupID:     groupID,
            MinBytes:    10e3,
            MaxBytes:    10e6,
            StartOffset: kafka.FirstOffset,
        }),
        dlqWriter: &kafka.Writer{
            Addr:                   kafka.TCP(broker),
            Topic:                  dlqTopic,
            Balancer:               &kafka.LeastBytes{},
            AllowAutoTopicCreation: true,
        },
    }
}

// Run fetches messages in a loop until ctx is cancelled.
func (c *KafkaConsumer) Run(ctx context.Context) {
    for {
        m, err := c.reader.FetchMessage(ctx)
        if err != nil {
            if ctx.Err() != nil {
                return // clean shutdown on SIGTERM / SIGINT
            }
            log.Printf("ERROR fetch: %v", err)
            continue
        }

        if err := c.processWithRetry(m); err != nil {
            log.Printf("ERROR retries exhausted, routing to DLQ — uuid=%s", string(m.Key))
            c.sendToDLQ(ctx, m)
        }

        if err := c.reader.CommitMessages(ctx, m); err != nil {
            log.Printf("ERROR commit: %v", err)
        }
    }
}

func (c *KafkaConsumer) processWithRetry(m kafka.Message) error {
    var lastErr error
    for attempt := 1; attempt <= maxRetries; attempt++ {
        if err := process(m); err != nil {
            lastErr = err
            log.Printf("WARN attempt %d/%d failed: %v", attempt, maxRetries, err)
            if attempt < maxRetries {
                time.Sleep(retryDelay)
            }
            continue
        }
        return nil
    }
    return lastErr
}

func process(m kafka.Message) error {
    var msg Message
    if err := json.Unmarshal(m.Value, &msg); err != nil {
        return fmt.Errorf("unmarshal: %w", err)
    }

    log.Println("=================================")
    log.Println("Received message:")
    log.Printf("  UUID:    %s", msg.UUID)
    log.Printf("  From:    %s", msg.From)
    log.Printf("  To:      %s", msg.To)
    log.Printf("  Message: %s", msg.Message)
    log.Println("=================================")
    return nil
}

func (c *KafkaConsumer) sendToDLQ(ctx context.Context, m kafka.Message) {
    if err := c.dlqWriter.WriteMessages(ctx, kafka.Message{
        Key:   m.Key,
        Value: m.Value,
    }); err != nil {
        log.Printf("ERROR DLQ write: %v", err)
    }
}

func (c *KafkaConsumer) Close() {
    c.reader.Close()
    c.dlqWriter.Close()
}
Enter fullscreen mode Exit fullscreen mode

Key design decisions explained:

  • kafka.NewReader with GroupID: registering a GroupID enables consumer-group semantics — Kafka assigns partitions to group members and tracks committed offsets per group. Multiple consumer instances sharing the same GroupID can scale horizontally across partitions.

  • FetchMessage vs ReadMessage: kafka-go provides two fetch APIs. ReadMessage automatically commits the offset after each fetch, which risks marking a message as processed even if your handler panics. FetchMessage leaves the offset uncommitted; we call CommitMessages only after the message has been fully processed (or routed to the DLQ). This gives us at-least-once delivery semantics.

  • StartOffset: kafka.FirstOffset: equivalent to auto-offset-reset: earliest in a Spring Boot config. On first start (no committed offset for the group), the consumer reads from the beginning of the topic.

  • processWithRetry: wraps process in a simple retry loop — up to maxRetries attempts with a retryDelay between each. If a transient error causes process to fail, the message is retried in place before being sent to the DLQ.

  • Dead-letter topic (-dlt suffix): if all retry attempts are exhausted, the raw message bytes are forwarded to demo-kafka-topic-dlt. Failed messages are captured for inspection or manual reprocessing rather than silently dropped — the same pattern as @DltHandler in Spring Kafka.

Application Entry Point (main.go)

package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
)

func main() {
    broker := "localhost:9092"
    topic := "demo-kafka-topic"
    groupID := "demo-kafka-consumer"
    port := "5555"

    consumer := NewKafkaConsumer(broker, topic, groupID)
    defer consumer.Close()

    // Simple health endpoint
    go func() {
        mux := http.NewServeMux()
        mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
            w.Header().Set("Content-Type", "application/json")
            w.Write([]byte(`{"status":"up"}`))
        })
        log.Printf("Health endpoint on :%s", port)
        http.ListenAndServe(":"+port, mux)
    }()

    ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
    defer cancel()

    log.Printf("Consumer started topic=%s group=%s", topic, groupID)
    consumer.Run(ctx)
    log.Println("Consumer stopped")
}
Enter fullscreen mode Exit fullscreen mode
  • signal.NotifyContext: creates a context that is cancelled when the process receives SIGINT (Ctrl+C) or SIGTERM (e.g. from docker stop). When the context is cancelled, FetchMessage returns immediately, allowing the Run loop to exit cleanly.
  • Health goroutine: a minimal GET /health endpoint runs on port 5555 in a background goroutine, so the consumer can be probed by Docker health checks or an orchestrator without interfering with the main consume loop.

Kafka Producer Service (Go)

The producer service exposes a simple REST endpoint. When called, it builds a Message and publishes it to the Kafka topic.

go.mod

module github.com/azam-akram/kafka-docker-go/kafka-producer

go 1.22

require github.com/segmentio/kafka-go v0.4.47
Enter fullscreen mode Exit fullscreen mode

Message Model (model.go)

The producer's Message model mirrors the consumer's exactly, ensuring the JSON wire format is compatible on both sides.

package main

type Message struct {
    UUID    string `json:"uuid"`
    From    string `json:"from"`
    To      string `json:"to"`
    Message string `json:"message"`
}
Enter fullscreen mode Exit fullscreen mode

In a larger system you would extract this into a shared Go module to avoid duplication.
For this self-contained example, both services define their own copy.

Kafka Producer (producer.go)

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"

    "github.com/segmentio/kafka-go"
)

type KafkaProducer struct {
    writer *kafka.Writer
}

func NewKafkaProducer(broker, topic string) *KafkaProducer {
    return &KafkaProducer{
        writer: &kafka.Writer{
            Addr:                   kafka.TCP(broker),
            Topic:                  topic,
            Balancer:               &kafka.LeastBytes{},
            RequiredAcks:           kafka.RequireOne,
            AllowAutoTopicCreation: true,
        },
    }
}

func (p *KafkaProducer) Send(ctx context.Context, msg Message) error {
    value, err := json.Marshal(msg)
    if err != nil {
        return fmt.Errorf("marshal: %w", err)
    }

    if err := p.writer.WriteMessages(ctx, kafka.Message{
        Key:   []byte(msg.UUID),
        Value: value,
    }); err != nil {
        return fmt.Errorf("write: %w", err)
    }

    log.Printf("Sent  uuid=%s topic=%s", msg.UUID, p.writer.Topic)
    return nil
}

func (p *KafkaProducer) Close() error {
    return p.writer.Close()
}
Enter fullscreen mode Exit fullscreen mode

Key design decisions:

  • RequiredAcks: kafka.RequireOne: the broker acknowledges the write once the leader partition has persisted the message. This balances throughput and durability for a single-node development setup. In production, use kafka.RequireAll to wait for all in-sync replicas.
  • AllowAutoTopicCreation: true: if demo-kafka-topic does not exist yet, the broker creates it automatically on first write — no need to pre-create topics manually.
  • Message key set to UUID: using the UUID as the partition key ensures messages with the same UUID always land on the same partition. For random distribution across partitions, omit the key.
  • fmt.Errorf("...: %w", err): wraps errors with context using the %w verb so callers can use errors.Is / errors.As for structured error handling.

HTTP Handler (handler.go)

package main

import (
    "context"
    "fmt"
    "log"
    "net/http"
    "time"

    "github.com/google/uuid"
)

type SendHandler struct {
    producer *KafkaProducer
}

func (h *SendHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    msg := Message{
        UUID:    uuid.New().String(),
        From:    "Alice",
        To:      "Bob",
        Message: "Hello from Go producer",
    }

    ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
    defer cancel()

    if err := h.producer.Send(ctx, msg); err != nil {
        log.Printf("ERROR send: %v", err)
        http.Error(w, "failed to send message", http.StatusInternalServerError)
        return
    }

    fmt.Fprintln(w, "Message sent!")
}
Enter fullscreen mode Exit fullscreen mode
  • ServeHTTP: implementing the http.Handler interface directly (rather than a plain func) lets us inject the *KafkaProducer dependency cleanly and register the handler with mux.Handle("/send", &SendHandler{...}).
  • context.WithTimeout: the send is bound to a 5-second deadline. If the Kafka broker is unavailable, WriteMessages returns promptly with a deadline-exceeded error rather than hanging the HTTP request indefinitely.
  • newUUID: generates a RFC 4122 v4 UUID via github.com/google/uuid.

Application Entry Point (main.go)

package main

import (
    "log"
    "net/http"
)

func main() {
    broker := "localhost:9092"
    topic := "demo-kafka-topic"
    port := "4444"

    producer := NewKafkaProducer(broker, topic)
    defer producer.Close()

    mux := http.NewServeMux()
    mux.Handle("/send", &SendHandler{producer: producer})
    mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "application/json")
        w.Write([]byte(`{"status":"up"}`))
    })

    log.Printf("Producer listening on :%s", port)
    log.Fatal(http.ListenAndServe(":"+port, mux))
}
Enter fullscreen mode Exit fullscreen mode

The producer uses Go's built-in net/http server, no third-party web framework needed. A GET /health endpoint is registered alongside /send so that the service can be health-checked by Docker or an orchestrator.


Running the Application End-to-End

Step 1 — Start Kafka

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

Step 2 — Start the Consumer

From the kafka-consumer directory:

go mod tidy
go run .
Enter fullscreen mode Exit fullscreen mode

The consumer starts on port 5555 and begins listening to demo-kafka-topic.

2026/05/29 10:00:00 Health endpoint on :5555
2026/05/29 10:00:00 Consumer started topic=demo-kafka-topic group=demo-kafka-consumer
Enter fullscreen mode Exit fullscreen mode

Step 3 — Start the Producer

From the kafka-producer directory:

go mod tidy
go run .
Enter fullscreen mode Exit fullscreen mode

The producer starts on port 4444.

2026/05/29 10:00:05 Producer listening on :4444
Enter fullscreen mode Exit fullscreen mode

Step 4 — Send a Message

Trigger the producer's REST endpoint:

curl http://localhost:4444/send
Enter fullscreen mode Exit fullscreen mode

You should see the response:

Message sent!
Enter fullscreen mode Exit fullscreen mode

Step 5 — Observe the Consumer Logs

In the consumer terminal you will see:

=================================
Received message:
  UUID:    3f2504e0-4f89-11d3-9a0c-0305e82c3301
  From:    Alice
  To:      Bob
  Message: Hello from Go producer
=================================
Enter fullscreen mode Exit fullscreen mode

And in the producer terminal, the send confirmation:

2026/05/29 10:00:10 Sent  uuid=3f2504e0-4f89-11d3-9a0c-0305e82c3301 topic=demo-kafka-topic
Enter fullscreen mode Exit fullscreen mode

src="/static/images/golang-kafka-producer-consumer-with-docker/kafka-docker-go-e2e-test.png"
alt="kafka-docker-go-e2e-test"
className="mx-auto my-6 h-5/6 w-5/6 rounded-lg shadow-lg"
/>

Step 6 — Check Health Endpoints

Both services expose a /health endpoint:

# Producer health
curl http://localhost:4444/health

# Consumer health
curl http://localhost:5555/health
Enter fullscreen mode Exit fullscreen mode

Both return:

{"status":"up"}
Enter fullscreen mode Exit fullscreen mode

Step 7 — Graceful Shutdown

Press Ctrl+C in the consumer terminal. The signal.NotifyContext cancels the context, FetchMessage returns, and the consumer exits cleanly:

2026/05/29 10:01:00 Consumer stopped
Enter fullscreen mode Exit fullscreen mode

Summary

In this blog we built two Go microservices that communicate asynchronously through Apache Kafka running in Docker:

Component Responsibility
docker-compose.yml Runs a single-node Kafka broker in KRaft mode (no ZooKeeper)
KafkaProducer Marshals a Message to JSON and publishes it to a topic
SendHandler Exposes a GET /send HTTP endpoint to trigger the producer
KafkaConsumer Unmarshals and processes messages with manual offset commits
processWithRetry Retries failed messages up to 3 times with a 1-second backoff
sendToDLQ Routes exhausted messages to a dead-letter topic (-dlt)

The key production-ready features we added beyond a minimal example:

  • Manual offset commitFetchMessage + CommitMessages ensures an offset is committed only after successful processing, giving at-least-once delivery semantics.
  • Retry with backoff — transient failures are retried up to 3 times before escalating to the DLQ.
  • Dead-letter topic — messages that exhaust all retries are forwarded to demo-kafka-topic-dlt for inspection rather than silently dropped.
  • Graceful shutdownsignal.NotifyContext cancels the consume loop on SIGTERM/SIGINT, preventing data loss on process stop.
  • Health endpoints — both services expose GET /health for Docker and orchestrator health checks.
  • Environment-driven config — all connection parameters (KAFKA_BROKER, KAFKA_TOPIC, etc.) are read from environment variables, making both services container-ready out of the box.

Top comments (0)