DEV Community

Cover image for Deploy Kafka connector on GKE cluster
Mohamed Rasvi
Mohamed Rasvi

Posted on

Deploy Kafka connector on GKE cluster

Kafkta connector deploy on GKE and sync with pubsub

Here are some scenarios in which you might use the Pub/Sub Group Kafka Connector:

  1. You are migrating a Kafka-based architecture to Google Cloud.
  2. You have a frontend system that stores events in Kafka outside of Google Cloud, but you also use Google Cloud to run some of your backend services, which need to receive the Kafka events.
  3. You collect logs from an on-premises Kafka solution and send them to Google Cloud for data analytics.
  4. You have a frontend system that uses Google Cloud, but you also store data on-premises using Kafka.

As with Kafka, you can use Pub/Sub to communicate between components in your cloud architecture.

The Pub/Sub Group Kafka Connector allows you to integrate these two systems. The following connectors are packaged in the Connector JAR:

  1. The sink connector reads records from one or more Kafka topics and publishes them to Pub/Sub.
  2. The source connector reads messages from a Pub/Sub topic and publishes them to Kafka.

This document we are going to walk through how we can set up sink connector
More information

This section walks you through the following tasks:

  • Configure the Pub/Sub Group Kafka Connector.
  • Send events from Kafka to Pub/Sub.

Authenticate
The Pub/Sub Group Kafka Connector must authenticate with Pub/Sub in order to send Pub/Sub messages. To set up authentication, perform the following steps:

Grant roles to your Google Service Account, IAM roles: roles/pubsub.admin

git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
cd java-pubsub-group-kafka-connector
Enter fullscreen mode Exit fullscreen mode

Download the connector JAR

Download JAR

cp config/* [path to Kafka installation]/config/
Enter fullscreen mode Exit fullscreen mode

Update your Kafka Connect configuration

  • Navigate to your Kafka directory.
  • Open the file named config/connect-standalone.properties in a text editor.
  • If the plugin.path property is commented out, uncomment it.
  • Update the plugin.path property to include the path to the connector JAR. Example:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Enter fullscreen mode Exit fullscreen mode

Set the offset.storage.file.filename property to a local file name. In standalone mode, Kafka uses this file to store offset data.
Example:

offset.storage.file.filename=/tmp/connect.offsets
Enter fullscreen mode Exit fullscreen mode

Forward events from Kafka to Pub/Sub

  1. Open the file /config/cps-sink-connector.properties in a text editor. Add values for the following properties, which are marked "TODO" in the comments:
  2. topics=KAFKA_TOPICS
  3. cps.project=PROJECT_ID
  4. cps.topic=PUBSUB_TOPIC
  5. gcp.credentials.file.path=PATH
  6. gcp.credentials.json = JSON_FILE
  7. Replace the following:
  • KAFKA_TOPICS: A comma-separated list of Kafka topics to read from.
  • PROJECT_ID: The Google Cloud project that contains your Pub/Sub topic.
  • SUB_TOPIC: The Pub/Sub topic to receive the messages from Kafka
  • PATH String Optional. The path to a file that stores Google Cloud credentials for authenticating Pub/Sub (DOC says pub/sub lite but I guess should work on pub/sub also)
  • JSON_FILE Optional. A JSON blob that contains Google Cloud for authenticating Pub/Sub (DOC says pub/sub lite but I guess should work on pub/sub also)

More info about settings
File: cps-sink-connector.properties

# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Unique name for the Pub/Sub sink connector.
name=CPSSinkConnector
# Tha Java class for the Pub/Sub sink connector.
connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector
# The maximum number of tasks that should be created for this connector.
tasks.max=10
# Set the key converter for the Pub/Sub sink connector.
key.converter=org.apache.kafka.connect.storage.StringConverter
# Set the value converter for the Pub/Sub sink connector.
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
# A comma-seperated list of Kafka topics to use as input for the connector.
# TODO (developer): update to your Kafka topic name(s).
topics=scotia-wealth
cps.project=PROJECT-ID
# TODO (developer): update to your Pub/Sub topic ID, e.g.
# where data should be written.
cps.topic=kafka-topic-consumer
# Optional. A JSON file path and JSON blob that contains Google Cloud for authenticating Pub/Sub Lite.
gcp.credentials.file.path=PATH
gcp.credentials.json=JSON_FILE
Enter fullscreen mode Exit fullscreen mode
  1. From the Kafka directory, run the following command:
bin/connect-standalone.sh \
  config/connect-standalone.properties \
  config/cps-sink-connector.properties
Enter fullscreen mode Exit fullscreen mode
  1. Follow the steps in the Apache Kafka quickstart to write some events to your Kafka topic.
  2. Use the gcloud CLI to read the events from Pub/Sub.
  3. gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack

more info about kafka
GCP repo :https://github.com/googleapis/java-pubsub-group-kafka-connector

Yes, we have tested the Kafka connector locally with Pub/Sub.
Ok, we have tested locally. Now it is a time to containerize and deploy the Kafka connector on GKE

Deploy kafka connector on GKE as following

  • Build the image from the following based image
[https://github.com/bitnami/containers/tree/main/bitnami/kafka](https://github.com/bitnami/containers/tree/main/bitnami/kafka)
Enter fullscreen mode Exit fullscreen mode

Built a custom Image with connector :- Pub/Sub Group Kafka Connector .Jar
example dockerfile :

FROM bitnami/kafka:3.4.0-debian-11-r15 AS build-stage
COPY YOUR_pubsubKafkaConnector_FOLDER /opt/bitnami/kafka/
COPY setup /opt/bitnami/kafka/
Enter fullscreen mode Exit fullscreen mode
  • Create a pubsub
    Topic
    Subscription

  • Deploy the helm chart :

Installing the Chart

Build the image from Dockerfile
## Prerequisites
https://cloud.google.com/artifact-registry/docs/docker/pushing-and-pulling

docker build -t LOCATION-docker.pkg.dev/PROJECT-ID/REPOSITORY/IMAGE:TAG .
docker push LOCATION-docker.pkg.dev/PROJECT-ID/REPOSITORY/IMAGE:TAG

## Helm installs the Kafka instance with a pub/sub connector from a custom image (a custom image that we have built previously)
helm repo add bitnami https://charts.bitnami.com/bitnami

change the docker image in kafka chart (kafka/value.yaml)

## Bitnami Kafka image version
## ref: https://hub.docker.com/r/bitnami/kafka/tags/
## @param image.registry Kafka image registry
## @param image.repository Kafka image repository
## @param image.tag Kafka image tag (immutable tags are recommended)
## @param image.digest Kafka image digest in the way sha256:aa.... Please note this parameter, if set, will override the tag
## @param image.pullPolicy Kafka image pull policy
## @param image.pullSecrets Specify docker-registry secret names as an array
## @param image.debug Specify if debug values should be set
##
image:
  registry: LOCATION-docker.pkg.dev/PROJECT-ID/REPOSITORY
  repository: IMAGE
  tag: TAG
  digest: ""
Enter fullscreen mode Exit fullscreen mode

Your GKE cluster looks like After deploying the helm chart

1. Statefulset
     Kafka instance 
     Kafka zookeeper

2. K8s service 
     Exposed GKE service to publish and consume (kafka topic and messages)
Enter fullscreen mode Exit fullscreen mode
  • impersonate GCP Service account via workload identity

    Service account which has permissions to publish message (k8s
    SA will impersonate GCP SA via workload identity ) more info

following example:

## Workload identity to impersonate gcp SA
gcloud iam service-accounts add-iam-policy-binding $GSA \
    --role roles/iam.workloadIdentityUser \
    --member "serviceAccount:$PROJECT_ID.svc.id.goog[kafka/kafka]"


## Annotate k8s serviceaccount kafka
kubectl annotate serviceaccount kafka \
    --namespace kafka \
    iam.gke.io/gcp-service-account=$GSA
Enter fullscreen mode Exit fullscreen mode
  • Create the ConfigMap from previously downloaded config map files
  • Following the example, we have created a folder called config and saved config files
kubectl create configmap pubsub-connector --from-file=config/ -n kafka
Enter fullscreen mode Exit fullscreen mode

Top comments (0)