DEV Community

giveitatry
giveitatry

Posted on

Kafka 4.2.0 on Kubernetes - Complete Setup Guide - Exposed to Internet

3-broker Kafka cluster on k3s with KRaft, SASL/SCRAM, and external access via Traefik.

Before you start — get your Traefik IP:

kubectl get svc traefik -n kube-system
Enter fullscreen mode Exit fullscreen mode

Copy any IP from the EXTERNAL-IP column. Replace every occurrence of
192.168.1.119 in the YAMLs below with your actual IP.


Stage 1 — Bootstrap

Apply all of these files in order. This stage starts the cluster with port 9094
open (no auth) so you can register SCRAM credentials.


1.1 namespace.yaml

apiVersion: v1
kind: Namespace
metadata:
  name: kafka
Enter fullscreen mode Exit fullscreen mode
kubectl apply -f namespace.yaml
Enter fullscreen mode Exit fullscreen mode

1.2 kafka-jaas.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-jaas
  namespace: kafka
data:
  jaas.conf: |
    KafkaServer {
      org.apache.kafka.common.security.scram.ScramLoginModule required
      username="admin"
      password="supersecret";
    };
Enter fullscreen mode Exit fullscreen mode
kubectl apply -f kafka-jaas.yaml
Enter fullscreen mode Exit fullscreen mode

1.3 kafka-sasl.yaml

apiVersion: v1
kind: Secret
metadata:
  name: kafka-sasl
  namespace: kafka
type: Opaque
stringData:
  username: admin
  password: supersecret
Enter fullscreen mode Exit fullscreen mode
kubectl apply -f kafka-sasl.yaml
Enter fullscreen mode Exit fullscreen mode

1.4 traefik-config.yaml

Opens three TCP entrypoints on Traefik — one per broker.

apiVersion: helm.cattle.io/v1
kind: HelmChartConfig
metadata:
  name: traefik
  namespace: kube-system
spec:
  valuesContent: |-
    ports:
      kafka-0:
        port: 9092
        expose:
          default: true
        exposedPort: 9092
        protocol: TCP
      kafka-1:
        port: 9093
        expose:
          default: true
        exposedPort: 9093
        protocol: TCP
      kafka-2:
        port: 9094
        expose:
          default: true
        exposedPort: 9094
        protocol: TCP
Enter fullscreen mode Exit fullscreen mode
kubectl apply -f traefik-config.yaml
kubectl rollout status deployment/traefik -n kube-system
Enter fullscreen mode Exit fullscreen mode

Verify ports appeared:

kubectl get svc traefik -n kube-system
# PORT(S) should include 9092, 9093, 9094 alongside 80 and 443
Enter fullscreen mode Exit fullscreen mode

1.5 kafka-traefik.yaml

Headless service for internal pod DNS, one ClusterIP service per broker
(Traefik v3 routes to port 9095 on each pod), and IngressRouteTCP rules.

apiVersion: v1
kind: Service
metadata:
  name: kafka-headless
  namespace: kafka
spec:
  clusterIP: None
  selector:
    app: kafka
  ports:
    - name: internal
      port: 9092
    - name: controller
      port: 9093
    - name: plaintext-bootstrap
      port: 9094

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-0-external
  namespace: kafka
spec:
  type: ClusterIP
  selector:
    app: kafka
    statefulset.kubernetes.io/pod-name: kafka-0
  ports:
    - name: external
      port: 9092
      targetPort: 9095

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-1-external
  namespace: kafka
spec:
  type: ClusterIP
  selector:
    app: kafka
    statefulset.kubernetes.io/pod-name: kafka-1
  ports:
    - name: external
      port: 9092
      targetPort: 9095

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-2-external
  namespace: kafka
spec:
  type: ClusterIP
  selector:
    app: kafka
    statefulset.kubernetes.io/pod-name: kafka-2
  ports:
    - name: external
      port: 9092
      targetPort: 9095

---
apiVersion: traefik.io/v1alpha1
kind: IngressRouteTCP
metadata:
  name: kafka-0-tcp
  namespace: kafka
spec:
  entryPoints:
    - kafka-0
  routes:
    - match: HostSNI(`*`)
      services:
        - name: kafka-0-external
          port: 9092

---
apiVersion: traefik.io/v1alpha1
kind: IngressRouteTCP
metadata:
  name: kafka-1-tcp
  namespace: kafka
spec:
  entryPoints:
    - kafka-1
  routes:
    - match: HostSNI(`*`)
      services:
        - name: kafka-1-external
          port: 9092

---
apiVersion: traefik.io/v1alpha1
kind: IngressRouteTCP
metadata:
  name: kafka-2-tcp
  namespace: kafka
spec:
  entryPoints:
    - kafka-2
  routes:
    - match: HostSNI(`*`)
      services:
        - name: kafka-2-external
          port: 9092
Enter fullscreen mode Exit fullscreen mode
kubectl apply -f kafka-traefik.yaml
Enter fullscreen mode Exit fullscreen mode

1.6 kafka-stateful-bootstrap.yaml

⚠️ Replace 192.168.1.119 with your Traefik IP before applying.

Includes:

  • EXTERNAL listener on port 9095 — Traefik routes external traffic here
  • PLAINTEXT listener on port 9094 — temporary, no auth, used to register SCRAM credentials
  • init container calculates the external port per broker: kafka-0 → 9092, kafka-1 → 9093, kafka-2 → 9094
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  namespace: kafka
spec:
  serviceName: kafka-headless
  replicas: 3
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      securityContext:
        fsGroup: 1001
      volumes:
        - name: kafka-config
          emptyDir: {}
        - name: kafka-jaas
          configMap:
            name: kafka-jaas
      initContainers:
        - name: init-node-id
          image: busybox:1.36
          command:
            - sh
            - -c
            - |
              ORDINAL=$(hostname | awk -F'-' '{print $NF}')
              echo "$ORDINAL" > /config/node-id
              EXTERNAL_PORT=$((9092 + ORDINAL))
              echo "$EXTERNAL_PORT" > /config/external-port
          volumeMounts:
            - name: kafka-config
              mountPath: /config
        - name: format-storage
          image: apache/kafka:4.2.0
          command:
            - sh
            - -c
            - |
              NODE_ID=$(cat /config/node-id)
              if [ ! -f "/data/meta.properties" ]; then
                echo "Formatting storage for node $NODE_ID..."
                echo "node.id=$NODE_ID" > /tmp/kraft.properties
                echo "process.roles=broker,controller" >> /tmp/kraft.properties
                echo "controller.quorum.voters=0@kafka-0.kafka-headless.kafka.svc.cluster.local:9093,1@kafka-1.kafka-headless.kafka.svc.cluster.local:9093,2@kafka-2.kafka-headless.kafka.svc.cluster.local:9093" >> /tmp/kraft.properties
                echo "listeners=PLAINTEXT://:9092,CONTROLLER://:9093" >> /tmp/kraft.properties
                echo "advertised.listeners=PLAINTEXT://localhost:9092" >> /tmp/kraft.properties
                echo "listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT" >> /tmp/kraft.properties
                echo "inter.broker.listener.name=PLAINTEXT" >> /tmp/kraft.properties
                echo "controller.listener.names=CONTROLLER" >> /tmp/kraft.properties
                echo "log.dirs=/data" >> /tmp/kraft.properties
                /opt/kafka/bin/kafka-storage.sh format \
                  --ignore-formatted \
                  --cluster-id q1Sh-9_ISia_zwGINzRvyQ \
                  --config /tmp/kraft.properties
              else
                echo "Already formatted, skipping."
              fi
          volumeMounts:
            - name: kafka-data
              mountPath: /data
            - name: kafka-config
              mountPath: /config
      containers:
        - name: kafka
          image: apache/kafka:4.2.0
          command:
            - sh
            - -c
            - |
              export KAFKA_NODE_ID=$(cat /config/node-id)
              export EXTERNAL_PORT=$(cat /config/external-port)
              export KAFKA_ADVERTISED_LISTENERS="INTERNAL://$(POD_NAME).kafka-headless.kafka.svc.cluster.local:9092,EXTERNAL://192.168.1.119:${EXTERNAL_PORT},PLAINTEXT://$(POD_NAME).kafka-headless.kafka.svc.cluster.local:9094"
              exec /etc/kafka/docker/run
          ports:
            - containerPort: 9092
            - containerPort: 9093
            - containerPort: 9094
            - containerPort: 9095
          env:
            - name: CLUSTER_ID
              value: "q1Sh-9_ISia_zwGINzRvyQ"
            - name: KAFKA_PROCESS_ROLES
              value: "broker,controller"
            - name: KAFKA_CONTROLLER_LISTENER_NAMES
              value: "CONTROLLER"
            - name: KAFKA_CONTROLLER_QUORUM_VOTERS
              value: "0@kafka-0.kafka-headless.kafka.svc.cluster.local:9093,1@kafka-1.kafka-headless.kafka.svc.cluster.local:9093,2@kafka-2.kafka-headless.kafka.svc.cluster.local:9093"
            - name: KAFKA_LISTENERS
              value: "INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9095,PLAINTEXT://:9094"
            - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
              value: "INTERNAL:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT,PLAINTEXT:PLAINTEXT"
            - name: KAFKA_INTER_BROKER_LISTENER_NAME
              value: "INTERNAL"
            - name: POD_NAME
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
            - name: KAFKA_SASL_ENABLED_MECHANISMS
              value: SCRAM-SHA-512
            - name: KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL
              value: SCRAM-SHA-512
            - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
              value: "3"
            - name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
              value: "3"
            - name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
              value: "2"
            - name: KAFKA_MIN_INSYNC_REPLICAS
              value: "2"
            - name: KAFKA_LOG_DIRS
              value: /data
            - name: KAFKA_OPTS
              value: "-Djava.security.auth.login.config=/opt/kafka/config/jaas/jaas.conf"
          volumeMounts:
            - name: kafka-data
              mountPath: /data
            - name: kafka-config
              mountPath: /config
            - name: kafka-jaas
              mountPath: /opt/kafka/config/jaas
          resources:
            requests:
              cpu: 500m
              memory: 1Gi
            limits:
              cpu: "2"
              memory: 4Gi
  volumeClaimTemplates:
    - metadata:
        name: kafka-data
      spec:
        accessModes: ["ReadWriteOnce"]
        resources:
          requests:
            storage: 10Gi
Enter fullscreen mode Exit fullscreen mode
kubectl apply -f kafka-stateful-bootstrap.yaml
kubectl rollout status statefulset/kafka -n kafka
Enter fullscreen mode Exit fullscreen mode

1.7 Register SCRAM credentials

Port 9094 is open and unauthenticated. Use it to write the admin user into
Kafka's metadata store. This is a one-time operation.

kubectl exec -n kafka kafka-0 -- \
  /opt/kafka/bin/kafka-configs.sh \
  --bootstrap-server kafka-0.kafka-headless.kafka.svc.cluster.local:9094 \
  --alter \
  --add-config 'SCRAM-SHA-512=[password=supersecret]' \
  --entity-type users \
  --entity-name admin
Enter fullscreen mode Exit fullscreen mode

Expected output:

Completed updating config for user admin.
Enter fullscreen mode Exit fullscreen mode

To add more users (application service accounts), repeat the command with
different --entity-name and password values while port 9094 is still open.


Stage 2 — Production (close port 9094)

Port 9094 is now unnecessary and a security risk. Apply the final StatefulSet
to remove it. No other files change.


2.1 kafka-stateful-final.yaml

⚠️ Replace 192.168.1.119 with your Traefik IP before applying.

Identical to the bootstrap version except:

  • PLAINTEXT://:9094 removed from KAFKA_LISTENERS
  • PLAINTEXT:PLAINTEXT removed from KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
  • PLAINTEXT entry removed from KAFKA_ADVERTISED_LISTENERS
  • containerPort: 9094 removed
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  namespace: kafka
spec:
  serviceName: kafka-headless
  replicas: 3
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      securityContext:
        fsGroup: 1001
      volumes:
        - name: kafka-config
          emptyDir: {}
        - name: kafka-jaas
          configMap:
            name: kafka-jaas
      initContainers:
        - name: init-node-id
          image: busybox:1.36
          command:
            - sh
            - -c
            - |
              ORDINAL=$(hostname | awk -F'-' '{print $NF}')
              echo "$ORDINAL" > /config/node-id
              EXTERNAL_PORT=$((9092 + ORDINAL))
              echo "$EXTERNAL_PORT" > /config/external-port
          volumeMounts:
            - name: kafka-config
              mountPath: /config
        - name: format-storage
          image: apache/kafka:4.2.0
          command:
            - sh
            - -c
            - |
              NODE_ID=$(cat /config/node-id)
              if [ ! -f "/data/meta.properties" ]; then
                echo "Formatting storage for node $NODE_ID..."
                echo "node.id=$NODE_ID" > /tmp/kraft.properties
                echo "process.roles=broker,controller" >> /tmp/kraft.properties
                echo "controller.quorum.voters=0@kafka-0.kafka-headless.kafka.svc.cluster.local:9093,1@kafka-1.kafka-headless.kafka.svc.cluster.local:9093,2@kafka-2.kafka-headless.kafka.svc.cluster.local:9093" >> /tmp/kraft.properties
                echo "listeners=PLAINTEXT://:9092,CONTROLLER://:9093" >> /tmp/kraft.properties
                echo "advertised.listeners=PLAINTEXT://localhost:9092" >> /tmp/kraft.properties
                echo "listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT" >> /tmp/kraft.properties
                echo "inter.broker.listener.name=PLAINTEXT" >> /tmp/kraft.properties
                echo "controller.listener.names=CONTROLLER" >> /tmp/kraft.properties
                echo "log.dirs=/data" >> /tmp/kraft.properties
                /opt/kafka/bin/kafka-storage.sh format \
                  --ignore-formatted \
                  --cluster-id q1Sh-9_ISia_zwGINzRvyQ \
                  --config /tmp/kraft.properties
              else
                echo "Already formatted, skipping."
              fi
          volumeMounts:
            - name: kafka-data
              mountPath: /data
            - name: kafka-config
              mountPath: /config
      containers:
        - name: kafka
          image: apache/kafka:4.2.0
          command:
            - sh
            - -c
            - |
              export KAFKA_NODE_ID=$(cat /config/node-id)
              export EXTERNAL_PORT=$(cat /config/external-port)
              export KAFKA_ADVERTISED_LISTENERS="INTERNAL://$(POD_NAME).kafka-headless.kafka.svc.cluster.local:9092,EXTERNAL://192.168.1.119:${EXTERNAL_PORT}"
              exec /etc/kafka/docker/run
          ports:
            - containerPort: 9092
            - containerPort: 9093
            - containerPort: 9095
          env:
            - name: CLUSTER_ID
              value: "q1Sh-9_ISia_zwGINzRvyQ"
            - name: KAFKA_PROCESS_ROLES
              value: "broker,controller"
            - name: KAFKA_CONTROLLER_LISTENER_NAMES
              value: "CONTROLLER"
            - name: KAFKA_CONTROLLER_QUORUM_VOTERS
              value: "0@kafka-0.kafka-headless.kafka.svc.cluster.local:9093,1@kafka-1.kafka-headless.kafka.svc.cluster.local:9093,2@kafka-2.kafka-headless.kafka.svc.cluster.local:9093"
            - name: KAFKA_LISTENERS
              value: "INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9095"
            - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
              value: "INTERNAL:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT"
            - name: KAFKA_INTER_BROKER_LISTENER_NAME
              value: "INTERNAL"
            - name: POD_NAME
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
            - name: KAFKA_SASL_ENABLED_MECHANISMS
              value: SCRAM-SHA-512
            - name: KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL
              value: SCRAM-SHA-512
            - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
              value: "3"
            - name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
              value: "3"
            - name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
              value: "2"
            - name: KAFKA_MIN_INSYNC_REPLICAS
              value: "2"
            - name: KAFKA_LOG_DIRS
              value: /data
            - name: KAFKA_OPTS
              value: "-Djava.security.auth.login.config=/opt/kafka/config/jaas/jaas.conf"
          volumeMounts:
            - name: kafka-data
              mountPath: /data
            - name: kafka-config
              mountPath: /config
            - name: kafka-jaas
              mountPath: /opt/kafka/config/jaas
          resources:
            requests:
              cpu: 500m
              memory: 1Gi
            limits:
              cpu: "2"
              memory: 4Gi
  volumeClaimTemplates:
    - metadata:
        name: kafka-data
      spec:
        accessModes: ["ReadWriteOnce"]
        resources:
          requests:
            storage: 10Gi
Enter fullscreen mode Exit fullscreen mode
kubectl apply -f kafka-stateful-final.yaml
kubectl rollout status statefulset/kafka -n kafka
Enter fullscreen mode Exit fullscreen mode

2.2 Verify

kubectl exec -n kafka kafka-0 -- \
  /opt/kafka/bin/kafka-metadata-quorum.sh \
  --bootstrap-controller kafka-0.kafka-headless.kafka.svc.cluster.local:9093 \
  describe --status
Enter fullscreen mode Exit fullscreen mode

Healthy output:

LeaderId:             0
MaxFollowerLag:       0
MaxFollowerLagTimeMs: 0
CurrentVoters:        [{"id": 0, ...}, {"id": 1, ...}, {"id": 2, ...}]
CurrentObservers:     []
Enter fullscreen mode Exit fullscreen mode

2.3 Python client

pip install kafka-python-ng
Enter fullscreen mode Exit fullscreen mode
from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError
import json, time

# Replace 192.168.1.119 with your Traefik IP
BOOTSTRAP_SERVERS = [
    "192.168.1.119:9092",  # kafka-0
    "192.168.1.119:9093",  # kafka-1
    "192.168.1.119:9094",  # kafka-2
]

SASL_CONFIG = {
    "security_protocol": "SASL_PLAINTEXT",
    "sasl_mechanism": "SCRAM-SHA-512",
    "sasl_plain_username": "admin",
    "sasl_plain_password": "supersecret",
}

TOPIC = "orders"


def create_topic(topic):
    admin = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS, **SASL_CONFIG)
    try:
        admin.create_topics([
            NewTopic(name=topic, num_partitions=3, replication_factor=3)
        ])
        print(f"Topic '{topic}' created.")
    except TopicAlreadyExistsError:
        print(f"Topic '{topic}' already exists.")
    finally:
        admin.close()


def produce(topic=TOPIC, count=10):
    producer = KafkaProducer(
        bootstrap_servers=BOOTSTRAP_SERVERS,
        **SASL_CONFIG,
        value_serializer=lambda v: json.dumps(v).encode(),
        key_serializer=lambda k: k.encode() if k else None,
        acks="all",
    )
    for i in range(count):
        meta = producer.send(
            topic,
            key=str(i),
            value={"id": i, "ts": time.time()}
        ).get(timeout=10)
        print(f"  sent [{i}] partition={meta.partition} offset={meta.offset}")
    producer.flush()
    producer.close()


def consume(topic=TOPIC):
    consumer = KafkaConsumer(
        topic,
        bootstrap_servers=BOOTSTRAP_SERVERS,
        **SASL_CONFIG,
        value_deserializer=lambda v: json.loads(v.decode()),
        key_deserializer=lambda k: k.decode() if k else None,
        group_id="my-group",
        auto_offset_reset="earliest",
        consumer_timeout_ms=5000,
    )
    for msg in consumer:
        print(
            f"  recv key={msg.key} "
            f"partition={msg.partition} "
            f"offset={msg.offset} "
            f"value={msg.value}"
        )
    consumer.close()


if __name__ == "__main__":
    create_topic(TOPIC)
    produce()
    consume()
Enter fullscreen mode Exit fullscreen mode

Troubleshooting

Pods not starting

kubectl logs -n kafka kafka-0 -c format-storage
kubectl logs -n kafka kafka-0 -c init-node-id
Enter fullscreen mode Exit fullscreen mode

SCRAM registration times out

Port 9094 is not reachable. Check the headless service includes it:

kubectl get svc kafka-headless -n kafka
Enter fullscreen mode Exit fullscreen mode

Python client connects but gets wrong broker addresses

The Traefik IP in the StatefulSet command block is wrong. Fix it and roll:

kubectl rollout restart statefulset/kafka -n kafka
Enter fullscreen mode Exit fullscreen mode

Traefik ports not appearing

kubectl rollout restart deployment/traefik -n kube-system
kubectl get svc traefik -n kube-system
Enter fullscreen mode Exit fullscreen mode

Top comments (0)