DEV Community

maghsood esmaeili
maghsood esmaeili

Posted on

How to Sync Data from an Oracle Table to Elasticsearch using Kafka Connect

In this document, I’ll walk you through the challenge I faced when fetching data from an Oracle database, streaming it into Kafka, and finally consuming and writing that data into Elasticsearch. My goal is that this guide will help other teams build a reliable data pipeline using similar components.
Before we begin, you’ll need a working Kafka setup. In my case, I prepared a Kafka cluster and verified that all the pods were running correctly. This ensures the environment is ready before configuring Kafka Connect and the connectors for Oracle and Elasticsearch.
The general flow we’ll cover is:

  • Prerequisite: Installing and running a Kafka cluster using the Strimzi operator.

  • Configure Kafka Connect: Create a Kafka Connect instance.

  • Create a custom Dockerfile: Create a custom image and push it to the local registry.

  • Configure JDBC Connector: Configure the JDBC connector in the Kafka cluster.

  • Configure ElasticSearch Connector: Add sink configuration to insert and create an index in ElasticSearch.

  • Configure ingest pipeline(optional): convert Oracle timestamp field to Elastic
    search timestamp field.

By the end of this guide, you should have a running pipeline that automatically streams changes from Oracle to Elasticsearch with minimal manual intervention.
Before starting, ensure that Kafka is installed and running in your cluster. In my setup, I deployed a Kafka cluster and confirmed that all pods are up and running:

list of kafka pods

Afterward, Kafka Connect must be installed using the Strimzi operator. This requires creating a Kafka Connect Custom Resource Definition (CRD) instance on the cluster. The following configuration provides a sample cluster file:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  annotations:
    strimzi.io/use-connector-resources: 'true'
  name: my-connect-cluster
  namespace: kafka-infra
spec:
  bootstrapServers: 'my-cluster-kafka-bootstrap:9092'
  config:
    config.storage.replication.factor: -1
    config.storage.topic: connect-cluster-configs
    group.id: connect-cluster
    offset.storage.replication.factor: -1
    offset.storage.topic: connect-cluster-offsets
    status.storage.replication.factor: -1
    status.storage.topic: connect-cluster-status
  image: 'private-registery.com/strimzi/kafka-custom:v2'
  replicas: 1
  version: 4.0.0
Enter fullscreen mode Exit fullscreen mode

The next step, and one of the most critical components of the data pipeline, is the installation of the ElasticsearchSink and ElasticsearchSink plugins within the Kafka Connect pod. By default, Kafka Connect does not include these plugins, which means a custom image must be built.
This process involves creating a Docker image that packages the required connector plugins and then updating the Kafka Connect custom resource to reference this new image.

Doing so ensures that Kafka Connect can interact with both Elasticsearch and relational databases, enabling seamless data integration across the pipeline. The following sample Dockerfile demonstrates how to add the ElasticsearchSink and JDBCConnector plugins to the image:

FROM quay.io/strimzi/kafka:0.46.0-kafka-4.0.0

USER root
RUN curl -o /tmp/confluentinc-kafka-connect-elasticsearch-15.0.1.zip https://hub-downloads.confluent.io/api/plugins/confluentinc/kafka-connect-elasticsearch/versions/15.0.1/confluentinc-kafka-connect-elasticsearch-15.0.1.zip

RUN curl -o /tmp/confluentinc-kafka-connect-jdbc-10.8.4.zip https://hub-downloads.confluent.io/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.8.4/confluentinc-kafka-connect-jdbc-10.8.4.zip

RUN unzip /tmp/confluentinc-kafka-connect-jdbc-10.8.4.zip -d /opt/kafka/plugins/ && \
    rm /tmp/confluentinc-kafka-connect-jdbc-10.8.4.zip


RUN unzip /tmp/confluentinc-kafka-connect-elasticsearch-15.0.1.zip -d /opt/kafka/plugins/ && \
    rm /tmp/confluentinc-kafka-connect-elasticsearch-15.0.1.zip

USER 1001
Enter fullscreen mode Exit fullscreen mode

Finally, we need to add a Kafka connector to capture data from Oracle and publish it to Kafka. The configuration below illustrates how to define the JDBC connector configuration file:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  labels:
    strimzi.io/cluster: my-connect-cluster
  name: my-source-connector-jdbc-testdb2-v0
  namespace: kafka-infra
spec:
  class: io.confluent.connect.jdbc.JdbcSourceConnector
  config:
    poll.interval.ms: 3600000
    transforms.extractInt.field: ORACLE_TIME_FIELD
    mode: timestamp
    query: select * from schema_name.table_name
    timestamp.column.name: ORACLE_TIME_FIELD
    connection.password: *****
    topic.prefix: oracle-testdb2-audit-table
    connection.user: thinuser
    connection.url: 'jdbc:oracle:driver-name:@hostname:port/db-name' 
  tasksMax: 2

Enter fullscreen mode Exit fullscreen mode

We can now verify in Kafka that the Oracle table topic defined in the JDBC connector configuration has been created successfully and is receiving messages :
topic name(by running):

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

output:
oracle-testdb2-audit-table

messages(by running):

bin/kafka-console-consumer.sh -–topic oracle-testdb2-audit-table --bootstrap-server localhost:9092 --from-beginning --max-messages 1
Enter fullscreen mode Exit fullscreen mode

Later, I will provide a Kafka consumer implemented in Go.
The next section of the documentation will cover how to synchronize data from Kafka to Elasticsearch.
First, we need to install a Kafka connector to fetch data from Kafka and synchronize it with Elasticsearch. The configuration for the Kafka connector is shown below:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  labels:
    strimzi.io/cluster: my-connect-cluster
  name: my-source-connector-oracle-v1
  namespace: kafka-infra
spec:
  class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
  config:
    key.ignore: true
    value.converter: org.apache.kafka.connect.json.JsonConverter
    connection.username: elastic
    value.converter.schemas.enable: false
    name: elasticsearch-sink-connector
    connection.password: ******
    key.converter: org.apache.kafka.connect.storage.StringConverter
    drop.invalid.message: true
    behavior.on.malformed.documents: ignore
    retry.backoff.ms: 1000
    behavior.on.null.values: ignore
    max.retries: 5
    topics.regex: oracle-test.*
    type.name: _doc
    connection.url: 'http://elasticsearch-hostname:9200'
    schema.ignore: true
  tasksMax: 2
Enter fullscreen mode Exit fullscreen mode

After installing the connector, the Elasticsearch index will be created automatically. The image below illustrates the newly created index in Elasticsearch, with the document count field indicating the amount of data inserted into the cluster.

show created index

The image shows the oracle-testdb2-audit-table index created.

Optional: After completing the steps outlined above, we encountered an additional challenge related to the timestamp field. The timestamp column defined in the Oracle database was of type TIMESTAMP, but after processing through the data pipeline (from Oracle to Elasticsearch), Elasticsearch interpreted this field as a Long type. The following steps should be performed to resolve this issue.

  1. Create a new Elasticsearch index. The image below demonstrates how to define a custom Elasticsearch index:

create new index with mapping

  1. Add an ingest pipeline in Elasticsearch to convert a Long-type field into a Date-type field. The image below illustrates how to define the ingest pipeline:

add proccessor

  1. Attach the ingest pipeline to the Elasticsearch index.

attach processor to index

summery

This document describes how to build a data pipeline that synchronizes data from an Oracle database to Elasticsearch using Kafka and Kafka Connect. It begins with preparing a Kafka cluster deployed via the Strimzi operator and creating a Kafka Connect instance.

The pipeline first uses a JDBC Source Connector to read data from an Oracle table and publish it to Kafka topics. The setup is verified by checking topic creation and consuming sample messages from Kafka. Next, an Elasticsearch Sink Connector is configured to consume data from Kafka and automatically create and populate an Elasticsearch index.

Top comments (1)

Collapse
 
sina14 profile image
Sina Tavakkol

Excellent 👍🏻