3-broker Kafka cluster on k3s with KRaft, SASL/SCRAM, and external access via Traefik.
Before you start — get your Traefik IP:
kubectl get svc traefik -n kube-system
Copy any IP from the EXTERNAL-IP column. Replace every occurrence of
192.168.1.119 in the YAMLs below with your actual IP.
Stage 1 — Bootstrap
Apply all of these files in order. This stage starts the cluster with port 9094
open (no auth) so you can register SCRAM credentials.
1.1 namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: kafka
kubectl apply -f namespace.yaml
1.2 kafka-jaas.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-jaas
namespace: kafka
data:
jaas.conf: |
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="supersecret";
};
kubectl apply -f kafka-jaas.yaml
1.3 kafka-sasl.yaml
apiVersion: v1
kind: Secret
metadata:
name: kafka-sasl
namespace: kafka
type: Opaque
stringData:
username: admin
password: supersecret
kubectl apply -f kafka-sasl.yaml
1.4 traefik-config.yaml
Opens three TCP entrypoints on Traefik — one per broker.
apiVersion: helm.cattle.io/v1
kind: HelmChartConfig
metadata:
name: traefik
namespace: kube-system
spec:
valuesContent: |-
ports:
kafka-0:
port: 9092
expose:
default: true
exposedPort: 9092
protocol: TCP
kafka-1:
port: 9093
expose:
default: true
exposedPort: 9093
protocol: TCP
kafka-2:
port: 9094
expose:
default: true
exposedPort: 9094
protocol: TCP
kubectl apply -f traefik-config.yaml
kubectl rollout status deployment/traefik -n kube-system
Verify ports appeared:
kubectl get svc traefik -n kube-system
# PORT(S) should include 9092, 9093, 9094 alongside 80 and 443
1.5 kafka-traefik.yaml
Headless service for internal pod DNS, one ClusterIP service per broker
(Traefik v3 routes to port 9095 on each pod), and IngressRouteTCP rules.
apiVersion: v1
kind: Service
metadata:
name: kafka-headless
namespace: kafka
spec:
clusterIP: None
selector:
app: kafka
ports:
- name: internal
port: 9092
- name: controller
port: 9093
- name: plaintext-bootstrap
port: 9094
---
apiVersion: v1
kind: Service
metadata:
name: kafka-0-external
namespace: kafka
spec:
type: ClusterIP
selector:
app: kafka
statefulset.kubernetes.io/pod-name: kafka-0
ports:
- name: external
port: 9092
targetPort: 9095
---
apiVersion: v1
kind: Service
metadata:
name: kafka-1-external
namespace: kafka
spec:
type: ClusterIP
selector:
app: kafka
statefulset.kubernetes.io/pod-name: kafka-1
ports:
- name: external
port: 9092
targetPort: 9095
---
apiVersion: v1
kind: Service
metadata:
name: kafka-2-external
namespace: kafka
spec:
type: ClusterIP
selector:
app: kafka
statefulset.kubernetes.io/pod-name: kafka-2
ports:
- name: external
port: 9092
targetPort: 9095
---
apiVersion: traefik.io/v1alpha1
kind: IngressRouteTCP
metadata:
name: kafka-0-tcp
namespace: kafka
spec:
entryPoints:
- kafka-0
routes:
- match: HostSNI(`*`)
services:
- name: kafka-0-external
port: 9092
---
apiVersion: traefik.io/v1alpha1
kind: IngressRouteTCP
metadata:
name: kafka-1-tcp
namespace: kafka
spec:
entryPoints:
- kafka-1
routes:
- match: HostSNI(`*`)
services:
- name: kafka-1-external
port: 9092
---
apiVersion: traefik.io/v1alpha1
kind: IngressRouteTCP
metadata:
name: kafka-2-tcp
namespace: kafka
spec:
entryPoints:
- kafka-2
routes:
- match: HostSNI(`*`)
services:
- name: kafka-2-external
port: 9092
kubectl apply -f kafka-traefik.yaml
1.6 kafka-stateful-bootstrap.yaml
⚠️ Replace
192.168.1.119with your Traefik IP before applying.
Includes:
-
EXTERNALlistener on port 9095 — Traefik routes external traffic here -
PLAINTEXTlistener on port 9094 — temporary, no auth, used to register SCRAM credentials - init container calculates the external port per broker: kafka-0 → 9092, kafka-1 → 9093, kafka-2 → 9094
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
namespace: kafka
spec:
serviceName: kafka-headless
replicas: 3
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
securityContext:
fsGroup: 1001
volumes:
- name: kafka-config
emptyDir: {}
- name: kafka-jaas
configMap:
name: kafka-jaas
initContainers:
- name: init-node-id
image: busybox:1.36
command:
- sh
- -c
- |
ORDINAL=$(hostname | awk -F'-' '{print $NF}')
echo "$ORDINAL" > /config/node-id
EXTERNAL_PORT=$((9092 + ORDINAL))
echo "$EXTERNAL_PORT" > /config/external-port
volumeMounts:
- name: kafka-config
mountPath: /config
- name: format-storage
image: apache/kafka:4.2.0
command:
- sh
- -c
- |
NODE_ID=$(cat /config/node-id)
if [ ! -f "/data/meta.properties" ]; then
echo "Formatting storage for node $NODE_ID..."
echo "node.id=$NODE_ID" > /tmp/kraft.properties
echo "process.roles=broker,controller" >> /tmp/kraft.properties
echo "controller.quorum.voters=0@kafka-0.kafka-headless.kafka.svc.cluster.local:9093,1@kafka-1.kafka-headless.kafka.svc.cluster.local:9093,2@kafka-2.kafka-headless.kafka.svc.cluster.local:9093" >> /tmp/kraft.properties
echo "listeners=PLAINTEXT://:9092,CONTROLLER://:9093" >> /tmp/kraft.properties
echo "advertised.listeners=PLAINTEXT://localhost:9092" >> /tmp/kraft.properties
echo "listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT" >> /tmp/kraft.properties
echo "inter.broker.listener.name=PLAINTEXT" >> /tmp/kraft.properties
echo "controller.listener.names=CONTROLLER" >> /tmp/kraft.properties
echo "log.dirs=/data" >> /tmp/kraft.properties
/opt/kafka/bin/kafka-storage.sh format \
--ignore-formatted \
--cluster-id q1Sh-9_ISia_zwGINzRvyQ \
--config /tmp/kraft.properties
else
echo "Already formatted, skipping."
fi
volumeMounts:
- name: kafka-data
mountPath: /data
- name: kafka-config
mountPath: /config
containers:
- name: kafka
image: apache/kafka:4.2.0
command:
- sh
- -c
- |
export KAFKA_NODE_ID=$(cat /config/node-id)
export EXTERNAL_PORT=$(cat /config/external-port)
export KAFKA_ADVERTISED_LISTENERS="INTERNAL://$(POD_NAME).kafka-headless.kafka.svc.cluster.local:9092,EXTERNAL://192.168.1.119:${EXTERNAL_PORT},PLAINTEXT://$(POD_NAME).kafka-headless.kafka.svc.cluster.local:9094"
exec /etc/kafka/docker/run
ports:
- containerPort: 9092
- containerPort: 9093
- containerPort: 9094
- containerPort: 9095
env:
- name: CLUSTER_ID
value: "q1Sh-9_ISia_zwGINzRvyQ"
- name: KAFKA_PROCESS_ROLES
value: "broker,controller"
- name: KAFKA_CONTROLLER_LISTENER_NAMES
value: "CONTROLLER"
- name: KAFKA_CONTROLLER_QUORUM_VOTERS
value: "0@kafka-0.kafka-headless.kafka.svc.cluster.local:9093,1@kafka-1.kafka-headless.kafka.svc.cluster.local:9093,2@kafka-2.kafka-headless.kafka.svc.cluster.local:9093"
- name: KAFKA_LISTENERS
value: "INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9095,PLAINTEXT://:9094"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "INTERNAL:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT,PLAINTEXT:PLAINTEXT"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "INTERNAL"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: KAFKA_SASL_ENABLED_MECHANISMS
value: SCRAM-SHA-512
- name: KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL
value: SCRAM-SHA-512
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "3"
- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: "3"
- name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
value: "2"
- name: KAFKA_MIN_INSYNC_REPLICAS
value: "2"
- name: KAFKA_LOG_DIRS
value: /data
- name: KAFKA_OPTS
value: "-Djava.security.auth.login.config=/opt/kafka/config/jaas/jaas.conf"
volumeMounts:
- name: kafka-data
mountPath: /data
- name: kafka-config
mountPath: /config
- name: kafka-jaas
mountPath: /opt/kafka/config/jaas
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: "2"
memory: 4Gi
volumeClaimTemplates:
- metadata:
name: kafka-data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 10Gi
kubectl apply -f kafka-stateful-bootstrap.yaml
kubectl rollout status statefulset/kafka -n kafka
1.7 Register SCRAM credentials
Port 9094 is open and unauthenticated. Use it to write the admin user into
Kafka's metadata store. This is a one-time operation.
kubectl exec -n kafka kafka-0 -- \
/opt/kafka/bin/kafka-configs.sh \
--bootstrap-server kafka-0.kafka-headless.kafka.svc.cluster.local:9094 \
--alter \
--add-config 'SCRAM-SHA-512=[password=supersecret]' \
--entity-type users \
--entity-name admin
Expected output:
Completed updating config for user admin.
To add more users (application service accounts), repeat the command with
different --entity-name and password values while port 9094 is still open.
Stage 2 — Production (close port 9094)
Port 9094 is now unnecessary and a security risk. Apply the final StatefulSet
to remove it. No other files change.
2.1 kafka-stateful-final.yaml
⚠️ Replace
192.168.1.119with your Traefik IP before applying.
Identical to the bootstrap version except:
-
PLAINTEXT://:9094removed fromKAFKA_LISTENERS -
PLAINTEXT:PLAINTEXTremoved fromKAFKA_LISTENER_SECURITY_PROTOCOL_MAP - PLAINTEXT entry removed from
KAFKA_ADVERTISED_LISTENERS -
containerPort: 9094removed
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
namespace: kafka
spec:
serviceName: kafka-headless
replicas: 3
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
securityContext:
fsGroup: 1001
volumes:
- name: kafka-config
emptyDir: {}
- name: kafka-jaas
configMap:
name: kafka-jaas
initContainers:
- name: init-node-id
image: busybox:1.36
command:
- sh
- -c
- |
ORDINAL=$(hostname | awk -F'-' '{print $NF}')
echo "$ORDINAL" > /config/node-id
EXTERNAL_PORT=$((9092 + ORDINAL))
echo "$EXTERNAL_PORT" > /config/external-port
volumeMounts:
- name: kafka-config
mountPath: /config
- name: format-storage
image: apache/kafka:4.2.0
command:
- sh
- -c
- |
NODE_ID=$(cat /config/node-id)
if [ ! -f "/data/meta.properties" ]; then
echo "Formatting storage for node $NODE_ID..."
echo "node.id=$NODE_ID" > /tmp/kraft.properties
echo "process.roles=broker,controller" >> /tmp/kraft.properties
echo "controller.quorum.voters=0@kafka-0.kafka-headless.kafka.svc.cluster.local:9093,1@kafka-1.kafka-headless.kafka.svc.cluster.local:9093,2@kafka-2.kafka-headless.kafka.svc.cluster.local:9093" >> /tmp/kraft.properties
echo "listeners=PLAINTEXT://:9092,CONTROLLER://:9093" >> /tmp/kraft.properties
echo "advertised.listeners=PLAINTEXT://localhost:9092" >> /tmp/kraft.properties
echo "listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT" >> /tmp/kraft.properties
echo "inter.broker.listener.name=PLAINTEXT" >> /tmp/kraft.properties
echo "controller.listener.names=CONTROLLER" >> /tmp/kraft.properties
echo "log.dirs=/data" >> /tmp/kraft.properties
/opt/kafka/bin/kafka-storage.sh format \
--ignore-formatted \
--cluster-id q1Sh-9_ISia_zwGINzRvyQ \
--config /tmp/kraft.properties
else
echo "Already formatted, skipping."
fi
volumeMounts:
- name: kafka-data
mountPath: /data
- name: kafka-config
mountPath: /config
containers:
- name: kafka
image: apache/kafka:4.2.0
command:
- sh
- -c
- |
export KAFKA_NODE_ID=$(cat /config/node-id)
export EXTERNAL_PORT=$(cat /config/external-port)
export KAFKA_ADVERTISED_LISTENERS="INTERNAL://$(POD_NAME).kafka-headless.kafka.svc.cluster.local:9092,EXTERNAL://192.168.1.119:${EXTERNAL_PORT}"
exec /etc/kafka/docker/run
ports:
- containerPort: 9092
- containerPort: 9093
- containerPort: 9095
env:
- name: CLUSTER_ID
value: "q1Sh-9_ISia_zwGINzRvyQ"
- name: KAFKA_PROCESS_ROLES
value: "broker,controller"
- name: KAFKA_CONTROLLER_LISTENER_NAMES
value: "CONTROLLER"
- name: KAFKA_CONTROLLER_QUORUM_VOTERS
value: "0@kafka-0.kafka-headless.kafka.svc.cluster.local:9093,1@kafka-1.kafka-headless.kafka.svc.cluster.local:9093,2@kafka-2.kafka-headless.kafka.svc.cluster.local:9093"
- name: KAFKA_LISTENERS
value: "INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9095"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "INTERNAL:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "INTERNAL"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: KAFKA_SASL_ENABLED_MECHANISMS
value: SCRAM-SHA-512
- name: KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL
value: SCRAM-SHA-512
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "3"
- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: "3"
- name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
value: "2"
- name: KAFKA_MIN_INSYNC_REPLICAS
value: "2"
- name: KAFKA_LOG_DIRS
value: /data
- name: KAFKA_OPTS
value: "-Djava.security.auth.login.config=/opt/kafka/config/jaas/jaas.conf"
volumeMounts:
- name: kafka-data
mountPath: /data
- name: kafka-config
mountPath: /config
- name: kafka-jaas
mountPath: /opt/kafka/config/jaas
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: "2"
memory: 4Gi
volumeClaimTemplates:
- metadata:
name: kafka-data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 10Gi
kubectl apply -f kafka-stateful-final.yaml
kubectl rollout status statefulset/kafka -n kafka
2.2 Verify
kubectl exec -n kafka kafka-0 -- \
/opt/kafka/bin/kafka-metadata-quorum.sh \
--bootstrap-controller kafka-0.kafka-headless.kafka.svc.cluster.local:9093 \
describe --status
Healthy output:
LeaderId: 0
MaxFollowerLag: 0
MaxFollowerLagTimeMs: 0
CurrentVoters: [{"id": 0, ...}, {"id": 1, ...}, {"id": 2, ...}]
CurrentObservers: []
2.3 Python client
pip install kafka-python-ng
from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError
import json, time
# Replace 192.168.1.119 with your Traefik IP
BOOTSTRAP_SERVERS = [
"192.168.1.119:9092", # kafka-0
"192.168.1.119:9093", # kafka-1
"192.168.1.119:9094", # kafka-2
]
SASL_CONFIG = {
"security_protocol": "SASL_PLAINTEXT",
"sasl_mechanism": "SCRAM-SHA-512",
"sasl_plain_username": "admin",
"sasl_plain_password": "supersecret",
}
TOPIC = "orders"
def create_topic(topic):
admin = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS, **SASL_CONFIG)
try:
admin.create_topics([
NewTopic(name=topic, num_partitions=3, replication_factor=3)
])
print(f"Topic '{topic}' created.")
except TopicAlreadyExistsError:
print(f"Topic '{topic}' already exists.")
finally:
admin.close()
def produce(topic=TOPIC, count=10):
producer = KafkaProducer(
bootstrap_servers=BOOTSTRAP_SERVERS,
**SASL_CONFIG,
value_serializer=lambda v: json.dumps(v).encode(),
key_serializer=lambda k: k.encode() if k else None,
acks="all",
)
for i in range(count):
meta = producer.send(
topic,
key=str(i),
value={"id": i, "ts": time.time()}
).get(timeout=10)
print(f" sent [{i}] partition={meta.partition} offset={meta.offset}")
producer.flush()
producer.close()
def consume(topic=TOPIC):
consumer = KafkaConsumer(
topic,
bootstrap_servers=BOOTSTRAP_SERVERS,
**SASL_CONFIG,
value_deserializer=lambda v: json.loads(v.decode()),
key_deserializer=lambda k: k.decode() if k else None,
group_id="my-group",
auto_offset_reset="earliest",
consumer_timeout_ms=5000,
)
for msg in consumer:
print(
f" recv key={msg.key} "
f"partition={msg.partition} "
f"offset={msg.offset} "
f"value={msg.value}"
)
consumer.close()
if __name__ == "__main__":
create_topic(TOPIC)
produce()
consume()
Troubleshooting
Pods not starting
kubectl logs -n kafka kafka-0 -c format-storage
kubectl logs -n kafka kafka-0 -c init-node-id
SCRAM registration times out
Port 9094 is not reachable. Check the headless service includes it:
kubectl get svc kafka-headless -n kafka
Python client connects but gets wrong broker addresses
The Traefik IP in the StatefulSet command block is wrong. Fix it and roll:
kubectl rollout restart statefulset/kafka -n kafka
Traefik ports not appearing
kubectl rollout restart deployment/traefik -n kube-system
kubectl get svc traefik -n kube-system
Top comments (0)