DEV Community

Abhishek Gupta for Microsoft Azure

Posted on

Kafka on Kubernetes, the Strimzi way! (Part 3)

Over the course of the first two parts of this blog series, we setup a single-node Kafka cluster on Kubernetes, secured it using TLS encryption and accessed the broker using both internal and external clients. Let's keep iterating! In this post, we will continue the Kafka on Kubernetes journey with Strimzi and cover:

  • How to apply different authentication types: TLS and SASL SCRAM-SHA-512
  • Use Strimzi Entity operator to manage Kafka users and topics
  • How to configure Kafka CLI and Go client applications to securely connect to the Kafka cluster

The code is available on GitHub - https://github.com/abhirockzz/kafka-kubernetes-strimzi/

What do I need to go through this tutorial?

kubectl - https://kubernetes.io/docs/tasks/tools/install-kubectl/

I will be using Azure Kubernetes Service (AKS) to demonstrate the concepts, but by and large it is independent of the Kubernetes provider (e.g. feel free to use a local setup such as minikube). If you want to use AKS, all you need is a Microsoft Azure account which you can get for FREE if you don't have one already.

I will not be repeating some of the common sections (such as Installation/Setup (Helm, Strimzi, Azure Kubernetes Service), Strimzi overview) in this or subsequent part of this series and would request you to refer to part one

Create a Kafka cluster with TLS authentication

To enforce 2-way mutual TLS auth, all we need to do is tweak the Strimzi Kafka resource. I am highlighting the key part below. The other parts remain the same (here is the manifest from part 2) i.e. single node Kafka and Zookeeper, ephemeral storage along with TLS encryption

      external:
        type: loadbalancer
        tls: true
        authentication:
          type: tls
Enter fullscreen mode Exit fullscreen mode

All we did is all the tls authentication type as a property of the external listener. In addition to this, we also include the entityOperator configuration as such:

  entityOperator:
    userOperator: {}
    topicOperator: {}
Enter fullscreen mode Exit fullscreen mode

This activates the Strimzi Entity Operator which in turn comprises of the Topic Operator and User Operator. Just as the Kafka CRD allows you to control Kafka clusters on Kubernetes, a Topic Operator allows you to manage topics in a Kafka cluster through a custom resource called KafkaTopic i.e. you can create, delete and update topics in your Kafka cluster.

The interesting part is that it's a two-way sync i.e. you can still create topics by accessing the Kafka cluster directly and it would reflect in the KafkaTopic resources being created/updated/deleted

The goal of the User Operator is to make Kafka user management easier with help of a KafkaUser CRD. All you do is create instances of KafkaUser CRDs and Strimzi takes care of the Kafka specific user management parts

Unlike Topic Operator, this is not a two-way sync

Read more about Entity Operator here https://strimzi.io/docs/operators/master/using.html#assembly-kafka-entity-operator-deployment-configuration-kafka

We will dive into the practical bit of these two operators in upcoming sections.

To create the Kafka cluster:

kubectl apply -f https://raw.githubusercontent.com/abhirockzz/kafka-kubernetes-strimzi/master/part-3/kafka-tls-auth.yaml
Enter fullscreen mode Exit fullscreen mode

What did the Strimzi Operator do for us in this case?

We covered most of these in part 1 - StatefulSet (and Pods), LoadBalancer Service, ConfigMap, Secret etc. How is the TLS auth config enforced? To figure that out, let's introspect the Kafka server configuration

As explained in part 1, this is stored in a ConfigMap

export CLUSTER_NAME=my-kafka-cluster
kubectl get configmap/${CLUSTER_NAME}-kafka-config -o yaml
Enter fullscreen mode Exit fullscreen mode

Look at the External listener section in server.config:

    listener.name.external-9094.ssl.client.auth=required
    listener.name.external-9094.ssl.truststore.location=/tmp/kafka/clients.truststore.p12
    listener.name.external-9094.ssl.truststore.password=${CERTS_STORE_PASSWORD}
    listener.name.external-9094.ssl.truststore.type=PKCS12
Enter fullscreen mode Exit fullscreen mode

The snippet highlighted above is the part which was added - notice listener.name.external-9094.ssl.client.auth=required was added along with the truststore details.

Let's not forget the Entity Operator

The Entity Operator runs a separate Deployment

export CLUSTER_NAME=my-kafka-cluster
kubectl get deployment $CLUSTER_NAME-entity-operator
kubectl get pod -l=app.kubernetes.io/name=entity-operator

NAME                                                READY   STATUS     
my-kafka-cluster-entity-operator-666f8758f6-gj54h   3/3     Running         
Enter fullscreen mode Exit fullscreen mode

The entity operator Pod runs three containers - topic-operator, user-operator, tls-sidecar

We have configured our cluster to authenticate client connections, but what about the user credentials which will be used by client apps?

Time to use the User Operator!

The User Operator allows us to create KafkaUsers to represent client authentication credentials. As mentioned in the beginning of the blog post, supported authentication types include TLS and SCRAM-SHA-512. Behind the scenes, a Kubernetes Secret is created by Strimzi to store the credentials

OAuth 2.0 is also supported but its not handled by the User Operator

Let's create a KafkaUser to store client credentials for TLS auth. Here is what the user info looks like:

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: kafka-tls-client-credentials
  labels:
    strimzi.io/cluster: my-kafka-cluster
spec:
  authentication:
    type: tls
Enter fullscreen mode Exit fullscreen mode

We name the user kafka-tls-client-credentials, associate with the Kafka cluster we created earlier (using the label strimzi.io/cluster: my-kafka-cluster) and specify the tls authentication type

You can also define authorization rules (not covered in this blog) within a KafkaUser definition - see https://strimzi.io/docs/operators/master/using.html#type-KafkaUser-reference

kubectl apply -f https://raw.githubusercontent.com/abhirockzz/kafka-kubernetes-strimzi/master/part-3/user-tls-auth.yaml
Enter fullscreen mode Exit fullscreen mode

Introspect the Secret (it has the same name as the KafkaUser):

kubectl get secret/kafka-tls-client-credentials -o yaml
Enter fullscreen mode Exit fullscreen mode

TLS client authentication

That's it! Now its up to the client to use the credentials. We will use a Kafka CLI and Go client application to try this out. First things first:

Extract and configure the user credentials

export KAFKA_USER_NAME=kafka-tls-client-credentials
kubectl get secret $KAFKA_USER_NAME -o jsonpath='{.data.user\.crt}' | base64 --decode > user.crt
kubectl get secret $KAFKA_USER_NAME -o jsonpath='{.data.user\.key}' | base64 --decode > user.key
kubectl get secret $KAFKA_USER_NAME -o jsonpath='{.data.user\.p12}' | base64 --decode > user.p12
kubectl get secret $KAFKA_USER_NAME -o jsonpath='{.data.user\.password}' | base64 --decode > user.password
Enter fullscreen mode Exit fullscreen mode

Import the entry in user.p12 into another keystore

export USER_P12_FILE_PATH=user.p12
export USER_KEY_PASSWORD_FILE_PATH=user.password
export KEYSTORE_NAME=kafka-auth-keystore.jks
export KEYSTORE_PASSWORD=foobar
export PASSWORD=`cat $USER_KEY_PASSWORD_FILE_PATH`

sudo keytool -importkeystore -deststorepass $KEYSTORE_PASSWORD -destkeystore $KEYSTORE_NAME -srckeystore $USER_P12_FILE_PATH -srcstorepass $PASSWORD -srcstoretype PKCS12

sudo keytool -list -alias $KAFKA_USER_NAME -keystore $KEYSTORE_NAME
Enter fullscreen mode Exit fullscreen mode

Just like we did in part 2, TLS encryption config requires importing the cluster CA cert in the client truststore

Extract and configure server CA cert

Extract the cluster CA certificate and password

export CLUSTER_NAME=my-kafka-cluster

kubectl get secret $CLUSTER_NAME-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 --decode > ca.crt
kubectl get secret $CLUSTER_NAME-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 --decode > ca.password
Enter fullscreen mode Exit fullscreen mode

Import it into truststore - I am using the built-in truststore which comes in with a JDK (Java) installation - but this is just for convenience and you're free to use other truststore

export CERT_FILE_PATH=ca.crt
export CERT_PASSWORD_FILE_PATH=ca.password

# replace this with the path to your truststore

export KEYSTORE_LOCATION=/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/lib/security/cacerts
export PASSWORD=`cat $CERT_PASSWORD_FILE_PATH`

# you will prompted for the truststore password. for JDK truststore, the default password is "changeit"
# Type yes in response to the 'Trust this certificate? [no]:' prompt

sudo keytool -importcert -alias strimzi-kafka-cert -file $CERT_FILE_PATH -keystore $KEYSTORE_LOCATION -keypass $PASSWORD

sudo keytool -list -alias strimzi-kafka-cert -keystore $KEYSTORE_LOCATION
Enter fullscreen mode Exit fullscreen mode

You should now be able to authenticate to the Kafka cluster using the Kafka CLI client

Please note that the configuration steps for the Kafka CLI as detailed below will also work for the Java clients as well - feel free to try that out as well

Create properties file for Kafka CLI clients

Extract the LoadBalancer public IP for Kafka cluster

export KAFKA_CLUSTER_NAME=my-kafka-cluster

kubectl get service/${KAFKA_CLUSTER_NAME}-kafka-external-bootstrap --output=jsonpath={.status.loadBalancer.ingress[0].ip}
Enter fullscreen mode Exit fullscreen mode

Create a file called client-ssl-auth.properties with the following contents:

bootstrap.servers=[LOADBALANCER_PUBLIC_IP]:9094
security.protocol=SSL
ssl.truststore.location=[TRUSTSTORE_LOCATION]
ssl.truststore.password=changeit
ssl.keystore.location=kafka-auth-keystore.jks
ssl.keystore.password=foobar
ssl.key.password=[contents of user.password file]
Enter fullscreen mode Exit fullscreen mode

changeit is the default truststore password. Please use a different one if needed

Download Kafka if you don't have it already - https://kafka.apache.org/downloads

One last thing before you proceed

Create a KafkaTopic

As I mentioned earlier, the Topic Operator makes this possible to embed topic info in form of a KafkaTopic manifest as such:

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: strimzi-test-topic
  labels:
    strimzi.io/cluster: my-kafka-cluster
spec:
  partitions: 3
  replicas: 1
Enter fullscreen mode Exit fullscreen mode

To create the topic:

kubectl apply -f https://raw.githubusercontent.com/abhirockzz/kafka-kubernetes-strimzi/master/part-3/topic.yaml
Enter fullscreen mode Exit fullscreen mode

Here is the reference for a KafkaTopic CRD https://strimzi.io/docs/operators/master/using.html#type-KafkaTopic-reference

All you need to do is use the kafka-console-producer and kafka-console-consumer by pointing it to the client-ssl-auth.properties file you just created

export KAFKA_HOME=[replace with kafka installation] e.g. /Users/foobar/kafka_2.12-2.3.0
export LOADBALANCER_PUBLIC_IP=[replace with public-ip]
export TOPIC_NAME=strimzi-test-topic

# on a terminal, start producer and send a few messages
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $LOADBALANCER_PUBLIC_IP:9094 --topic $TOPIC_NAME --producer.config client-ssl-auth.properties

# on another terminal, start consumer
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $LOADBALANCER_PUBLIC_IP:9094 --topic $TOPIC_NAME --consumer.config client-ssl-auth.properties --from-beginning
Enter fullscreen mode Exit fullscreen mode

You should see producer and consumer working in tandem. Great!

If you face SSL Handshake errors, please check whether keys and certificates has been correctly imported and you're using the correct password. If the Kafka cluster is not reachable, ensure you are using the right value for the public IP

Now, let's try a programmatic client. Since the Java client behavior (required config properties) are same as the CLI, I am using a Go client to try something different. Don't worry, if you are not a Go programmer, it should be easy to follow along.

I will not walk through the entire program, just the part where we create the connection related configuration. Here is the snippet:

    bootstrapServers = os.Getenv("KAFKA_BOOTSTRAP_SERVERS")
    caLocation = os.Getenv("CA_CERT_LOCATION")
    topic = os.Getenv("KAFKA_TOPIC")

    userCertLocation = os.Getenv("USER_CERT_LOCATION")
    userKeyLocation = os.Getenv("USER_KEY_LOCATION")
    userKeyPassword = os.Getenv("USER_KEY_PASSWORD")

    producerConfig := &kafka.ConfigMap{"bootstrap.servers": bootstrapServers, "security.protocol": "SSL", "ssl.ca.location": caLocation, "ssl.certificate.location": userCertLocation, "ssl.key.location": userKeyLocation, "ssl.key.password": userKeyPassword}
Enter fullscreen mode Exit fullscreen mode

Notice that the bootstrap.servers and security.protocol are the same as ones you used in the Kafka CLI client (same for Java as well).

  • For TLS encryption: ssl.ca.location is used to point to the CA certificate directly as opposed to a truststore
  • For client authentication: ssl.certificate.location, ssl.key.location and ssl.key.password refer to the user certificate, user key and password respectively

If you have Go installed, you can try it out. Clone the Git repo

git clone https://github.com/abhirockzz/kafka-kubernetes-strimzi
cd part-3/go-client-app
Enter fullscreen mode Exit fullscreen mode

.. and run the program:

export KAFKA_BOOTSTRAP_SERVERS=[replace with public-ip:9094] e.g. 20.43.176.7:9094
export CA_CERT_LOCATION=[replace with location of ca.crt file] e.g. /Users/code/kafka-kubernetes-strimzi/part-3/ca.crt
export KAFKA_TOPIC=test-strimzi-topic

export USER_CERT_LOCATION=[path to user.crt file] e.g. /Users/code/kafka-kubernetes-strimzi/part-3/user.crt
export USER_KEY_LOCATION=[path to user.key file] e.g. /Users/code/kafka-kubernetes-strimzi/part-3/user.key
export USER_KEY_PASSWORD=[contents of user.password file]

go run kafka-tls-auth-client.go
Enter fullscreen mode Exit fullscreen mode

The logs should confirm whether messages are being produced and consumed

Enforce SCRAM-SHA-512 auth

SCRAM stands for "Salted Challenge Response Authentication Mechanism". I will not pretend to be a security or SCRAM expert, but do want to highlight that it is one of the supported and commonly used authentication mechanism in Kafka (in addition to other such as PLAIN)

Please note that Strimzi does not support SASL PLAIN auth at the time of writing

Update the Kafka cluster

To apply the SCRAM authentication scheme - all you need is to set the authentication.type to scram-sha-512

      external:
        type: loadbalancer
        tls: true
        authentication:
          type: scram-sha-512
Enter fullscreen mode Exit fullscreen mode

Update the Kafka cluster to use SCRAM-SHA authentication

kubectl apply -f https://raw.githubusercontent.com/abhirockzz/kafka-kubernetes-strimzi/master/part-3/kafka-tls-auth.yaml
Enter fullscreen mode Exit fullscreen mode

Let's take a look at how the Kafka server config looks like in this case:

export CLUSTER_NAME=my-kafka-cluster
kubectl get configmap/${CLUSTER_NAME}-kafka-config -o yaml
Enter fullscreen mode Exit fullscreen mode

Introspect External listener section in server.config and notice how the the config has been updated to reflect

    listener.name.external-9094.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required;
    listener.name.external-9094.sasl.enabled.mechanisms=SCRAM-SHA-512
Enter fullscreen mode Exit fullscreen mode

Create SCRAM credentials (KafkaUser)

Just like we did with TLS auth, we need to create client credentials for SCRAM as well. It only differs from its TLS equivalent in terms of name and the type (of course!)

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: kafka-scram-client-credentials
  labels:
    strimzi.io/cluster: my-kafka-cluster
spec:
  authentication:
    type: scram-sha-512
Enter fullscreen mode Exit fullscreen mode

notice that authentication.type is scram-sha-512

Create the KafkaUser

kubectl apply -f https://raw.githubusercontent.com/abhirockzz/kafka-kubernetes-strimzi/master/part-3/user-scram-auth.yaml
Enter fullscreen mode Exit fullscreen mode

Introspect the Secret (it has the same name as the KafkaUser):

kubectl get secret/kafka-scram-client-credentials -o yaml
Enter fullscreen mode Exit fullscreen mode

The Secret contains the password in base64 encoded form

apiVersion: v1
kind: Secret
name: kafka-scram-client-credentials
data:
  password: SnpteEQwek1DNkdi
...
Enter fullscreen mode Exit fullscreen mode

Username is same as the KafkaUser/Secret name, which is kafka-scram-client-credentials in this example

Run client applications

In order run the client examples, download the the password:

export USER_NAME=kafka-scram-client-credentials
kubectl get secret $USER_NAME -o jsonpath='{.data.password}' | base64 --decode > user-scram.password
Enter fullscreen mode Exit fullscreen mode

To test the Kafka CLI client, create a file client-scram-auth.properties with the following contents:

bootstrap.servers=[replace with public-ip:9094]
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=[replace with path to truststore with kafka CA cert]
# "changeit" is the default password for JDK truststore, please use the one applicable to yours
ssl.truststore.password=changeit
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-scram-client-credentials" password="[replace with contents of user-scram.password file]";
Enter fullscreen mode Exit fullscreen mode

Refer to the instructions above to run the console producer and consumer

please make sure you use the client-scram-auth.properties and not the client-tls-auth.properties file

Before wrapping up, lets look at the Go client and see how it handles SCRAM authentication. As always, I will only highlight the part which showcases the configuration:

    bootstrapServers = os.Getenv("KAFKA_BOOTSTRAP_SERVERS")
    caLocation = os.Getenv("CA_CERT_LOCATION")
    topic = os.Getenv("KAFKA_TOPIC")

    kafkaScramUsername = os.Getenv("SCRAM_USERNAME")
    kafkaScramPassword = os.Getenv("SCRAM_PASSWORD")

    producerConfig := &kafka.ConfigMap{"bootstrap.servers": bootstrapServers, "security.protocol": "SASL_SSL", "ssl.ca.location": caLocation, "sasl.mechanism": "SCRAM-SHA-512", "sasl.username": kafkaScramUsername, "sasl.password": kafkaScramPassword}
Enter fullscreen mode Exit fullscreen mode

The security.protocol and sasl.mechanism have been updated to SASL_SSL and SCRAM-SHA-512 respectively. Along with that, we use the sasl.username and sasl.password to specify the client credentials

To run the Go client app:

export KAFKA_BOOTSTRAP_SERVERS=[replace with public-ip:9094]
export CA_CERT_LOCATION=[path to ca.crt file] f.g. /Users/code/kafka-kubernetes-strimzi/part-3/ca.crt
export KAFKA_TOPIC=strimzi-test-topic

export SCRAM_USERNAME=kafka-scram-client-credentials
export SCRAM_PASSWORD=[contents of user-scram.password file]

go run kafka-scram-auth-client.go
Enter fullscreen mode Exit fullscreen mode

Wrap up.. for now

This post covered a decent amount of ground! We learnt how to apply different authentication types, use Entity Operators to manage Kafka users and topics and more importantly, understand how client applications need to configured to connect securely using a combination of TLS encryption and the chosen authentication scheme.

We're far from done! All this while, we've been creating ephemeral clusters with no persistence - we will fix that in upcoming posts.

Latest comments (2)

Collapse
 
archupsg03 profile image
archupsg03

What are the configurations that I need to do for internal consumer. Like *.properties, should we have something ?

Collapse
 
martineliasq profile image
MartinEliasQ

Thank you very much, the publications are really good, I can't wait to see the next ones.