I wanted to create a sample for testing Kafka Scaler with ScaledJob. It requires simple go client that will finish once the message has consumed. The reason why I use go is, KEDA Kafka Scaler using Sarama for implementing Kafka Scaler. So that I thought it might help to understand the kafka scaler implementation.
Kafka Broker
I use the Kafka Broker as EventHubs for easily to create a broker. We can use EventHubs with Kafka API.
Configuration
Sarama
Configuration for Sarama with EventHubs are like this. You need to pass EventHubs Connection String as config.Net.SASL.Password. Other consideration, is, config.Consumer.Group.Session.Timeout If you don't configure the value, the client is easy to returns panic: Error from consumer: read tcp 172.26.24.68:34288->40.78.250.71:9093: i/o timeout. By configuring this value, your client works fine.
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
config.Net.SASL.Enable = true
config.Net.SASL.User = "$ConnectionString"
config.Net.SASL.Password = os.Getenv(saslPassword)
config.Consumer.Group.Session.Timeout = 60 * time.Second
config.Net.TLS.Enable = true
consumer := Consumer{
ready: make(chan bool),
}
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
if err != nil {
log.Panicf("Error creating consumer group client: %v", err)
}
For the Scaled Job, I'd like to finish the pod once the app receive a message. We need to implement callback Struct for processing message. It is rough implementation for sample, however, I notify the message consuming by channel.
// Consumer struct
type Consumer struct {
ready chan bool
}
// Setup method
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
close(consumer.ready)
return nil
}
// Cleanup method
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim method
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s\n", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
consumed <- true
break
}
return nil
}
Scaled Job
Secret
For the Kafka Scaled Job, I create two secrets. One for Authentication for the Kafka Scaler, the other is Environment Variables for the Pod. Replace the value of YOUR_EVENT_HUBS_CONNECTION_STRING_BASE64 with your EventHubs connection string.Don't forget to convert into base64.
secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: keda-kafka-secrets
namespace: default
data:
sasl: "plaintext as base64"
username: "$ConnectionString as base64"
password: "YOUR_EVENT_HUBS_CONNECTION_STRING_BASE64"
tls: "enable as base64"
pod-secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: keda-kafka-pod-secrets
namespace: default
data:
SASL_PASSWORD: "YOUR_EVENT_HUBS_CONNECTION_STRING_BASE64"
BROKER_LIST: "BROKER_LIST_BASE64"
TOPICS: "TOPIC_BASE64"
ScaledJob
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: kafka-consumer
namespace: default
spec:
jobTargetRef:
template:
spec:
containers:
- name: kafka-consumer
image: tsuyoshiushio/kafka-consumer:0.2
imagePullPolicy: Always
envFrom:
- secretRef:
name: keda-kafka-pod-secrets
restartPolicy: Never
pollingInterval: 5
maxReplicaCount: 10
successfulJobsHistoryLimit: 10
failedJobsHistoryLimit: 10
scalingStrategy:
strategy: accurate
triggers:
- type: kafka
metadata:
bootstrapServers: MY_EVENT_HUB_NAMESPACE.servicebus.windows.net:9093
consumerGroup: $Default
topic: multi-topic
lagThreshold: '5'
authenticationRef:
name: kafka-trigger-auth
---
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: kafka-trigger-auth
namespace: default
spec:
secretTargetRef:
- parameter: sasl
name: keda-kafka-secrets
key: sasl
- parameter: username
name: keda-kafka-secrets
key: username
- parameter: password
name: keda-kafka-secrets
key: password
- parameter: tls
name: keda-kafka-secrets
key: tls
Run on kubernets
Install KEDA and make sure
two keda pods working. You can see the operator logs by kubectl logs -f keda-operator-6b546dc696-gzc72 in this case. It helps to understand how keda scale jobs works.
kubectl get pods -n keda
NAME READY STATUS RESTARTS AGE
keda-metrics-apiserver-6dff8c4f7f-z9szt 1/1 Running 0 5d1h
keda-operator-6b546dc696-gzc72 1/1 Running 0 51m
Clone and build and publish the image on samples. KafkaScaledJobWithGo
$ docker build -f tsuyoshiushio/kafka-consumer:0.2
$ docker push tsuyoshiushio/kafka-consumer:0.2
If you update the version or name, change the scaledjob.yaml on the image section.
Apply the Secret and ScaledJob.
$ kubectl apply -f secret.yaml
$ kubectl apply -f pod-secret.yaml
$ kubectl apply -f scaledjob.yaml
Then send message to the target EventHub, then you will see the job is created according to the number of the messages.
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
func-deployment-http-7dffc56bc-6494h 1/1 Running 0 4d8h
kafka-consumer-5bmdd-dwb44 0/1 Completed 0 73s
kafka-consumer-n4vvx-gwrhf 0/1 Completed 0 82s
kafka-consumer-t2hth-cthlh 1/1 Running 0 64s
kafka-consumer-vzh4d-rjjt9 0/1 Completed 0 91s
Top comments (0)