DEV Community

Cover image for Connecting to Kafka Cluster running on Kubernetes from your Local Machine : CLI & Programatic Access
karan singh
karan singh

Posted on

2 2

Connecting to Kafka Cluster running on Kubernetes from your Local Machine : CLI & Programatic Access

Introduction

Why do you need this?

  • For local development you want to connect to a remote Kafka Cluster running on OpenShift , that is deployed using Strimzi Operator

Prerequisite

  • OpenShift Container Platform or OKD

  • Strimzi Operator deployed

Deploy Kafka Cluster

  • Create a YAML file with these contents (only for dev/test clusters)
    apiVersion: kafka.strimzi.io/v1beta2
    kind: Kafka
    metadata:
      name: my-cluster
      namespace: nestjs-testing
    spec:
      entityOperator:
        topicOperator: {}
        userOperator: {}
      kafka:
        config:
          inter.broker.protocol.version: "2.8"
          log.message.format.version: "2.8"
          offsets.topic.replication.factor: 3
          transaction.state.log.min.isr: 2
          transaction.state.log.replication.factor: 3
        listeners:
        - name: plain
          port: 9092
          tls: false
          type: internal
        - name: tls
          port: 9093
          tls: true
          type: internal
        - name: route
          port: 9094
          tls: true
          type: route
        replicas: 3
        storage:
          type: ephemeral
        version: 2.8.0
      zookeeper:
        replicas: 3
        storage:
          type: ephemeral
Enter fullscreen mode Exit fullscreen mode

Preparing to Connect

    oc get secret my-cluster-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 -d > ca.crt

    keytool -import -trustcacerts -alias root -file ca.crt -keystore truststore.jks -storepass password -noprompt

    # This should create 2 files in PWD

    ls -l *.crt *.jks
Enter fullscreen mode Exit fullscreen mode

Grab Kafka Endpoint

KAFKA_ENDPOINT=$(oc get kafka my-cluster -o=jsonpath='{.status.listeners[?(@.type=="route")].bootstrapServers}{"\n"}')
Enter fullscreen mode Exit fullscreen mode

Connecting from CLI (Kafka Console Producer/Consumer)

  • Get Kafka Console Producer & Consumer script files
    wget [https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz](https://dlcdn.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz) ; tar -xvf kafka_2.13-3.0.0.tgz
Enter fullscreen mode Exit fullscreen mode
  • Console Producer
    kafka_2.13-3.0.0/bin/kafka-console-producer.sh --broker-list $KAFKA_ENDPOINT --producer-property security.protocol=SSL --producer-property ssl.truststore.password=password --producer-property ssl.truststore.location=truststore.jks --topic my-topic
Enter fullscreen mode Exit fullscreen mode
  • Console Consumer
    kafka_2.13-3.0.0/bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_ENDPOINT --topic my-topic --from-beginning  --consumer-property security.protocol=SSL --consumer-property ssl.truststore.password=password --consumer-property ssl.truststore.location=truststore.jks
Enter fullscreen mode Exit fullscreen mode

Connecting from Python Client (running locally)

from kafka import KafkaProducer, KafkaConsumer
import json
from bson import json_util

bootstrap_server = 'my-cluster-kafka-route-bootstrap-nestjs-testing.apps.ocp.ceph-s3.com:443'

print("Producing messages to Kafka topic ...")
producer = KafkaProducer(bootstrap_servers=bootstrap_server, ssl_cafile='ca.crt', security_protocol="SSL")

for i in range(10):
    message = {'value': i}
    producer.send('my-topic', json.dumps(message, default=json_util.default).encode('utf-8'))

print("Consuming messages from Kafka topic ...")

consumer = KafkaConsumer('my-topic',  group_id='my-group', bootstrap_servers=bootstrap_server, ssl_cafile='ca.crt', security_protocol="SSL", consumer_timeout_ms=10000, enable_auto_commit=True)
for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: value=%s" % (message.topic, message.partition,message.offset,message.value))
Enter fullscreen mode Exit fullscreen mode

Output of Kafka Python Producer & Consumer example

This is how you can connect to a remote Kafka cluster from your local machine. This is handy when you are developing locally and eventually deploying that to your OpenShift environment.

Image of Timescale

🚀 pgai Vectorizer: SQLAlchemy and LiteLLM Make Vector Search Simple

We built pgai Vectorizer to simplify embedding management for AI applications—without needing a separate database or complex infrastructure. Since launch, developers have created over 3,000 vectorizers on Timescale Cloud, with many more self-hosted.

Read full post →

Top comments (0)

Image of Docusign

🛠️ Bring your solution into Docusign. Reach over 1.6M customers.

Docusign is now extensible. Overcome challenges with disconnected products and inaccessible data by bringing your solutions into Docusign and publishing to 1.6M customers in the App Center.

Learn more