DEV Community

loading...

Connecting to Kafka cluster using SSL with Python

adityakanekar profile image Aditya Kanekar Updated on ・3 min read

This article specifically talks about how to write producer and consumer for Kafka cluster secured with SSL using Python. I won't be getting into how to generate client certificates in this article, that's the topic reserved for another article :).

Pre-Requisites

  • Kafka Cluster with SSL
  • Client certificate (KeyStore) in JKS format
  • Linux environment with keytool and openssl installed
  • Python 3.6

Step 1 - Converting JKS to PEM file

Why I need this step?

Unlike Java, Python and C# uses .pem files to connect to Kafka. For this purpose we will have to convert the JKS files to PEM with the help of keytool and openssl commands. If you are working on Windows 10 you can refer to my article on how to run WSL on Windows here.

To make your life easy I have created a shell script to quickly convert JKS to PEM.

#!/bin/bash
srcFolder=$1
keyStore=$1/$2
password=$3
alias=$4
outputFolder=$5

echo $keyStore
echo "Generating certificate.pem"
keytool -exportcert -alias $alias -keystore $keyStore -rfc -file $outputFolder/certificate.pem -storepass $password

echo "Generating key.pem"
keytool -v -importkeystore -srckeystore $keyStore -srcalias $alias -destkeystore $outputFolder/cert_and_key.p12 -deststoretype PKCS12 -storepass $password -srcstorepass $password
openssl pkcs12 -in $outputFolder/cert_and_key.p12 -nodes -nocerts -out $outputFolder/key.pem -passin pass:$password

echo "Generating CARoot.pem"
keytool -exportcert -alias $alias -keystore $keyStore -rfc -file $outputFolder/CARoot.pem -storepass $password
Enter fullscreen mode Exit fullscreen mode

The script generates following files from the keystore file,

  • key.pem
  • certificate.pem
  • CARoot.pem

How to run this script?

Save the script in a file e.g. jkstopem.sh and give execute permissions like below,

chmod +x jkstopem.sh
Enter fullscreen mode Exit fullscreen mode

To generate the PEM files. Run the shell script as shown in the below example,

./jkstopem.sh <source_path_to_jks> <keystore_file_name> <keystore_password> <alias> <output_folder>
Enter fullscreen mode Exit fullscreen mode
How to find Alias?

If you are not aware of what alias your certificate has. Run following command in the folder where you have the keystore file.

keytool -list -v -keystore kafka.client.keystore.jks
Enter fullscreen mode Exit fullscreen mode

You will be prompted to enter the password. Enter the keystore password, this will list the contents of the keystore file. You will be able to see *Alias name.

Following is the example to run the shell script,

./jkstopem.sh ~/client-cert kafka.client.keystore.jks welcome123 client-alias ~/client-cert/pem
Enter fullscreen mode Exit fullscreen mode

Now you should be able to see following files in the output folder,

  • key.pem
  • certificate.pem
  • CARoot.pem

Now as we have all the PEM files, lets get cracking.

Step 2 - Writing Kafka Producer in Python

We will be using 'kafka-python' package to connect to Kafka. You can install it using pip,

pip install kafka-python
Enter fullscreen mode Exit fullscreen mode

Now, lets write our producer.

#Producer.py
from kafka import KafkaProducer

kafkaBrokers='kafka1.xyz.com:443,kafka2.xyz.com:443,kafka3.xyz.com:443'
caRootLocation='CARoote.pem'
certLocation='certificate.pem'
keyLocation='key.pem'
topic='test-topic'
password='welcome123'

producer = KafkaProducer(bootstrap_servers=kafkaBrokers,
                          security_protocol='SSL',
                          ssl_check_hostname=True,
                          ssl_cafile=caRootLocation,
                          ssl_certfile=certLocation,
                          ssl_keyfile=keyLocation,
                          ssl_password=password)

producer.send(topic, bytes('Hello Kafka!','utf-8'))

# Send to a particular partition
producer.send(topic, bytes('Hello Kafka!','utf-8'),partition=0)
producer.flush()
Enter fullscreen mode Exit fullscreen mode

In the above example we are using the pem files we generated in the last step with the password to read the pem file.

kafkaBrokers='kafka1.xyz.com:443,kafka2.xyz.com:443,kafka3.xyz.com:443'
caRootLocation='CARoote.pem'
certLocation='certificate.pem'
keyLocation='key.pem'
password='welcome123'
producer = KafkaProducer(bootstrap_servers=kafkaBrokers,
                          security_protocol='SSL',
                          ssl_check_hostname=True,
                          ssl_cafile=caRootLocation,
                          ssl_certfile=certLocation,
                          ssl_keyfile=keyLocation,
                          ssl_password=password)
Enter fullscreen mode Exit fullscreen mode

Sending data to random topic partition

Below code snippet will send data to random partition decided by Kafka.

producer.send(topic, bytes('Hello Kafka!','utf-8'))
producer.flush()
Enter fullscreen mode Exit fullscreen mode

Sending data to specific topic partition

To send data to a specific partition, you just need to specify the partition as shown in below code snippet,

producer.send(topic, bytes('Hello Kafka - Partition 0!','utf-8'),partition=0)
producer.flush()
Enter fullscreen mode Exit fullscreen mode

So we have built our Python producer for Kafka. In the next part we will write consumer to consume the message from the topic.

Discussion (2)

pic
Editor guide
Collapse
shubhendumadhukar profile image
Shubhendu Madhukar • Edited

I have a truststore.jks and password. But I don't have a keystore.jks. I am able to use the truststore and password to connect to the cluster using java code. Will this method work for truststore.jks also or is it only for keystore? I tried to generate those files and got certificate.pem but then I got an error stating "java.lang.Exception: Alias does not exist" and other files were not generated.

Collapse
adityakanekar profile image
Aditya Kanekar Author

The method will work for keystore.jks. It appears to me that you are using SSL Enabled Kafka without any ACL enabled. If thats true you might not be providing ssl_keyfile and ssl_certfile while connecting to Kafka through your Java code. In my view you should only provide the CARoot.pem to connect to Kafka other properties might not be required. Please try and let me know if that works.