I wanted to create a sample for testing Kafka Scaler with ScaledJob. It requires simple go client that will finish once the message has consumed. The reason why I use go is, KEDA Kafka Scaler using Sarama for implementing Kafka Scaler. So that I thought it might help to understand the kafka scaler implementation.
Kafka Broker
I use the Kafka Broker as EventHubs for easily to create a broker. We can use EventHubs with Kafka API.
Configuration
Sarama
Configuration for Sarama with EventHubs are like this. You need to pass EventHubs Connection String
as config.Net.SASL.Password
. Other consideration, is, config.Consumer.Group.Session.Timeout
If you don't configure the value, the client is easy to returns panic: Error from consumer: read tcp 172.26.24.68:34288->40.78.250.71:9093: i/o timeout
. By configuring this value, your client works fine.
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
config.Net.SASL.Enable = true
config.Net.SASL.User = "$ConnectionString"
config.Net.SASL.Password = os.Getenv(saslPassword)
config.Consumer.Group.Session.Timeout = 60 * time.Second
config.Net.TLS.Enable = true
consumer := Consumer{
ready: make(chan bool),
}
ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
if err != nil {
log.Panicf("Error creating consumer group client: %v", err)
}
For the Scaled Job, I'd like to finish the pod once the app receive a message. We need to implement callback Struct for processing message. It is rough implementation for sample, however, I notify the message consuming by channel.
// Consumer struct
type Consumer struct {
ready chan bool
}
// Setup method
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
close(consumer.ready)
return nil
}
// Cleanup method
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim method
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s\n", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
consumed <- true
break
}
return nil
}
Scaled Job
Secret
For the Kafka Scaled Job, I create two secrets. One for Authentication for the Kafka Scaler, the other is Environment Variables for the Pod. Replace the value of YOUR_EVENT_HUBS_CONNECTION_STRING_BASE64
with your EventHubs connection string.Don't forget to convert into base64.
secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: keda-kafka-secrets
namespace: default
data:
sasl: "plaintext as base64"
username: "$ConnectionString as base64"
password: "YOUR_EVENT_HUBS_CONNECTION_STRING_BASE64"
tls: "enable as base64"
pod-secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: keda-kafka-pod-secrets
namespace: default
data:
SASL_PASSWORD: "YOUR_EVENT_HUBS_CONNECTION_STRING_BASE64"
BROKER_LIST: "BROKER_LIST_BASE64"
TOPICS: "TOPIC_BASE64"
ScaledJob
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: kafka-consumer
namespace: default
spec:
jobTargetRef:
template:
spec:
containers:
- name: kafka-consumer
image: tsuyoshiushio/kafka-consumer:0.2
imagePullPolicy: Always
envFrom:
- secretRef:
name: keda-kafka-pod-secrets
restartPolicy: Never
pollingInterval: 5
maxReplicaCount: 10
successfulJobsHistoryLimit: 10
failedJobsHistoryLimit: 10
scalingStrategy:
strategy: accurate
triggers:
- type: kafka
metadata:
bootstrapServers: MY_EVENT_HUB_NAMESPACE.servicebus.windows.net:9093
consumerGroup: $Default
topic: multi-topic
lagThreshold: '5'
authenticationRef:
name: kafka-trigger-auth
---
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: kafka-trigger-auth
namespace: default
spec:
secretTargetRef:
- parameter: sasl
name: keda-kafka-secrets
key: sasl
- parameter: username
name: keda-kafka-secrets
key: username
- parameter: password
name: keda-kafka-secrets
key: password
- parameter: tls
name: keda-kafka-secrets
key: tls
Run on kubernets
Install KEDA and make sure
two keda pods working. You can see the operator logs by kubectl logs -f keda-operator-6b546dc696-gzc72
in this case. It helps to understand how keda scale jobs works.
kubectl get pods -n keda
NAME READY STATUS RESTARTS AGE
keda-metrics-apiserver-6dff8c4f7f-z9szt 1/1 Running 0 5d1h
keda-operator-6b546dc696-gzc72 1/1 Running 0 51m
Clone and build and publish the image on samples. KafkaScaledJobWithGo
$ docker build -f tsuyoshiushio/kafka-consumer:0.2
$ docker push tsuyoshiushio/kafka-consumer:0.2
If you update the version or name, change the scaledjob.yaml
on the image
section.
Apply the Secret and ScaledJob.
$ kubectl apply -f secret.yaml
$ kubectl apply -f pod-secret.yaml
$ kubectl apply -f scaledjob.yaml
Then send message to the target EventHub, then you will see the job is created according to the number of the messages.
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
func-deployment-http-7dffc56bc-6494h 1/1 Running 0 4d8h
kafka-consumer-5bmdd-dwb44 0/1 Completed 0 73s
kafka-consumer-n4vvx-gwrhf 0/1 Completed 0 82s
kafka-consumer-t2hth-cthlh 1/1 Running 0 64s
kafka-consumer-vzh4d-rjjt9 0/1 Completed 0 91s
Top comments (0)