DEV Community

Tsuyoshi Ushio
Tsuyoshi Ushio

Posted on

Kafka ScaledJobs sample for KEDA with EventHubs

I wanted to create a sample for testing Kafka Scaler with ScaledJob. It requires simple go client that will finish once the message has consumed. The reason why I use go is, KEDA Kafka Scaler using Sarama for implementing Kafka Scaler. So that I thought it might help to understand the kafka scaler implementation.

Kafka Broker

I use the Kafka Broker as EventHubs for easily to create a broker. We can use EventHubs with Kafka API.

Configuration

Sarama

Configuration for Sarama with EventHubs are like this. You need to pass EventHubs Connection String as config.Net.SASL.Password. Other consideration, is, config.Consumer.Group.Session.Timeout If you don't configure the value, the client is easy to returns panic: Error from consumer: read tcp 172.26.24.68:34288->40.78.250.71:9093: i/o timeout. By configuring this value, your client works fine.

config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
config.Net.SASL.Enable = true
config.Net.SASL.User = "$ConnectionString"
config.Net.SASL.Password = os.Getenv(saslPassword)
config.Consumer.Group.Session.Timeout = 60 * time.Second
config.Net.TLS.Enable = true

consumer := Consumer{
    ready: make(chan bool),
}

ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
if err != nil {
    log.Panicf("Error creating consumer group client: %v", err)
}
Enter fullscreen mode Exit fullscreen mode

For the Scaled Job, I'd like to finish the pod once the app receive a message. We need to implement callback Struct for processing message. It is rough implementation for sample, however, I notify the message consuming by channel.


// Consumer struct
type Consumer struct {
    ready chan bool
}

// Setup method
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
    close(consumer.ready)
    return nil
}

// Cleanup method
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

// ConsumeClaim method
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for message := range claim.Messages() {
        log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s\n", string(message.Value), message.Timestamp, message.Topic)
        session.MarkMessage(message, "")
        consumed <- true
        break
    }
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Scaled Job

Secret

For the Kafka Scaled Job, I create two secrets. One for Authentication for the Kafka Scaler, the other is Environment Variables for the Pod. Replace the value of YOUR_EVENT_HUBS_CONNECTION_STRING_BASE64 with your EventHubs connection string.Don't forget to convert into base64.

secret.yaml

apiVersion: v1
kind: Secret
metadata:
  name: keda-kafka-secrets
  namespace: default
data:
  sasl: "plaintext as base64"
  username: "$ConnectionString as base64"
  password: "YOUR_EVENT_HUBS_CONNECTION_STRING_BASE64"
  tls: "enable as base64"
Enter fullscreen mode Exit fullscreen mode

pod-secret.yaml

apiVersion: v1
kind: Secret
metadata:
  name: keda-kafka-pod-secrets
  namespace: default
data:
  SASL_PASSWORD: "YOUR_EVENT_HUBS_CONNECTION_STRING_BASE64"
  BROKER_LIST: "BROKER_LIST_BASE64"
  TOPICS: "TOPIC_BASE64"
Enter fullscreen mode Exit fullscreen mode

ScaledJob

apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata: 
  name: kafka-consumer
  namespace: default
spec:
  jobTargetRef:
    template:
      spec:
        containers:
        - name: kafka-consumer
          image: tsuyoshiushio/kafka-consumer:0.2
          imagePullPolicy: Always
          envFrom:
          - secretRef:
              name: keda-kafka-pod-secrets
        restartPolicy: Never
  pollingInterval: 5
  maxReplicaCount: 10
  successfulJobsHistoryLimit: 10
  failedJobsHistoryLimit: 10
  scalingStrategy:
    strategy: accurate
  triggers:
  - type: kafka
    metadata:
      bootstrapServers: MY_EVENT_HUB_NAMESPACE.servicebus.windows.net:9093
      consumerGroup: $Default
      topic: multi-topic
      lagThreshold: '5'
    authenticationRef:
      name: kafka-trigger-auth
---
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
  name: kafka-trigger-auth
  namespace: default
spec:
  secretTargetRef:
  - parameter: sasl
    name: keda-kafka-secrets
    key: sasl
  - parameter: username
    name: keda-kafka-secrets
    key: username
  - parameter: password
    name: keda-kafka-secrets
    key: password    
  - parameter: tls
    name: keda-kafka-secrets
    key: tls
Enter fullscreen mode Exit fullscreen mode

Run on kubernets

Install KEDA and make sure
two keda pods working. You can see the operator logs by kubectl logs -f keda-operator-6b546dc696-gzc72 in this case. It helps to understand how keda scale jobs works.

kubectl get pods -n keda
NAME                                      READY   STATUS    RESTARTS   AGE
keda-metrics-apiserver-6dff8c4f7f-z9szt   1/1     Running   0          5d1h
keda-operator-6b546dc696-gzc72            1/1     Running   0          51m
Enter fullscreen mode Exit fullscreen mode

Clone and build and publish the image on samples. KafkaScaledJobWithGo

$ docker build -f tsuyoshiushio/kafka-consumer:0.2
$ docker push tsuyoshiushio/kafka-consumer:0.2
Enter fullscreen mode Exit fullscreen mode

If you update the version or name, change the scaledjob.yaml on the image section.

Apply the Secret and ScaledJob.

$ kubectl apply -f secret.yaml
$ kubectl apply -f pod-secret.yaml
$ kubectl apply -f scaledjob.yaml
Enter fullscreen mode Exit fullscreen mode

Then send message to the target EventHub, then you will see the job is created according to the number of the messages.

$ kubectl get pods
NAME                                   READY   STATUS      RESTARTS   AGE
func-deployment-http-7dffc56bc-6494h   1/1     Running     0          4d8h
kafka-consumer-5bmdd-dwb44             0/1     Completed   0          73s
kafka-consumer-n4vvx-gwrhf             0/1     Completed   0          82s
kafka-consumer-t2hth-cthlh             1/1     Running     0          64s
kafka-consumer-vzh4d-rjjt9             0/1     Completed   0          91s
Enter fullscreen mode Exit fullscreen mode

Resource

Top comments (0)