In this document, I’ll walk you through the challenge I faced when fetching data from an Oracle database, streaming it into Kafka, and finally consuming and writing that data into Elasticsearch. My goal is that this guide will help other teams build a reliable data pipeline using similar components.
Before we begin, you’ll need a working Kafka setup. In my case, I prepared a Kafka cluster and verified that all the pods were running correctly. This ensures the environment is ready before configuring Kafka Connect and the connectors for Oracle and Elasticsearch.
The general flow we’ll cover is:
Prerequisite: Installing and running a Kafka cluster using the Strimzi operator.
Configure Kafka Connect: Create a Kafka Connect instance.
Create a custom Dockerfile: Create a custom image and push it to the local registry.
Configure JDBC Connector: Configure the JDBC connector in the Kafka cluster.
Configure ElasticSearch Connector: Add sink configuration to insert and create an index in ElasticSearch.
Configure ingest pipeline(optional): convert Oracle timestamp field to Elastic
search timestamp field.
By the end of this guide, you should have a running pipeline that automatically streams changes from Oracle to Elasticsearch with minimal manual intervention.
Before starting, ensure that Kafka is installed and running in your cluster. In my setup, I deployed a Kafka cluster and confirmed that all pods are up and running:
Afterward, Kafka Connect must be installed using the Strimzi operator. This requires creating a Kafka Connect Custom Resource Definition (CRD) instance on the cluster. The following configuration provides a sample cluster file:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
annotations:
strimzi.io/use-connector-resources: 'true'
name: my-connect-cluster
namespace: kafka-infra
spec:
bootstrapServers: 'my-cluster-kafka-bootstrap:9092'
config:
config.storage.replication.factor: -1
config.storage.topic: connect-cluster-configs
group.id: connect-cluster
offset.storage.replication.factor: -1
offset.storage.topic: connect-cluster-offsets
status.storage.replication.factor: -1
status.storage.topic: connect-cluster-status
image: 'private-registery.com/strimzi/kafka-custom:v2'
replicas: 1
version: 4.0.0
The next step, and one of the most critical components of the data pipeline, is the installation of the ElasticsearchSink and ElasticsearchSink plugins within the Kafka Connect pod. By default, Kafka Connect does not include these plugins, which means a custom image must be built.
This process involves creating a Docker image that packages the required connector plugins and then updating the Kafka Connect custom resource to reference this new image.
Doing so ensures that Kafka Connect can interact with both Elasticsearch and relational databases, enabling seamless data integration across the pipeline. The following sample Dockerfile demonstrates how to add the ElasticsearchSink and JDBCConnector plugins to the image:
FROM quay.io/strimzi/kafka:0.46.0-kafka-4.0.0
USER root
RUN curl -o /tmp/confluentinc-kafka-connect-elasticsearch-15.0.1.zip https://hub-downloads.confluent.io/api/plugins/confluentinc/kafka-connect-elasticsearch/versions/15.0.1/confluentinc-kafka-connect-elasticsearch-15.0.1.zip
RUN curl -o /tmp/confluentinc-kafka-connect-jdbc-10.8.4.zip https://hub-downloads.confluent.io/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.8.4/confluentinc-kafka-connect-jdbc-10.8.4.zip
RUN unzip /tmp/confluentinc-kafka-connect-jdbc-10.8.4.zip -d /opt/kafka/plugins/ && \
rm /tmp/confluentinc-kafka-connect-jdbc-10.8.4.zip
RUN unzip /tmp/confluentinc-kafka-connect-elasticsearch-15.0.1.zip -d /opt/kafka/plugins/ && \
rm /tmp/confluentinc-kafka-connect-elasticsearch-15.0.1.zip
USER 1001
Finally, we need to add a Kafka connector to capture data from Oracle and publish it to Kafka. The configuration below illustrates how to define the JDBC connector configuration file:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
labels:
strimzi.io/cluster: my-connect-cluster
name: my-source-connector-jdbc-testdb2-v0
namespace: kafka-infra
spec:
class: io.confluent.connect.jdbc.JdbcSourceConnector
config:
poll.interval.ms: 3600000
transforms.extractInt.field: ORACLE_TIME_FIELD
mode: timestamp
query: select * from schema_name.table_name
timestamp.column.name: ORACLE_TIME_FIELD
connection.password: *****
topic.prefix: oracle-testdb2-audit-table
connection.user: thinuser
connection.url: 'jdbc:oracle:driver-name:@hostname:port/db-name'
tasksMax: 2
We can now verify in Kafka that the Oracle table topic defined in the JDBC connector configuration has been created successfully and is receiving messages :
topic name(by running):
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
output:
oracle-testdb2-audit-table
messages(by running):
bin/kafka-console-consumer.sh -–topic oracle-testdb2-audit-table --bootstrap-server localhost:9092 --from-beginning --max-messages 1
Later, I will provide a Kafka consumer implemented in Go.
The next section of the documentation will cover how to synchronize data from Kafka to Elasticsearch.
First, we need to install a Kafka connector to fetch data from Kafka and synchronize it with Elasticsearch. The configuration for the Kafka connector is shown below:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
labels:
strimzi.io/cluster: my-connect-cluster
name: my-source-connector-oracle-v1
namespace: kafka-infra
spec:
class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
config:
key.ignore: true
value.converter: org.apache.kafka.connect.json.JsonConverter
connection.username: elastic
value.converter.schemas.enable: false
name: elasticsearch-sink-connector
connection.password: ******
key.converter: org.apache.kafka.connect.storage.StringConverter
drop.invalid.message: true
behavior.on.malformed.documents: ignore
retry.backoff.ms: 1000
behavior.on.null.values: ignore
max.retries: 5
topics.regex: oracle-test.*
type.name: _doc
connection.url: 'http://elasticsearch-hostname:9200'
schema.ignore: true
tasksMax: 2
After installing the connector, the Elasticsearch index will be created automatically. The image below illustrates the newly created index in Elasticsearch, with the document count field indicating the amount of data inserted into the cluster.
The image shows the oracle-testdb2-audit-table index created.
Optional: After completing the steps outlined above, we encountered an additional challenge related to the timestamp field. The timestamp column defined in the Oracle database was of type TIMESTAMP, but after processing through the data pipeline (from Oracle to Elasticsearch), Elasticsearch interpreted this field as a Long type. The following steps should be performed to resolve this issue.
- Create a new Elasticsearch index. The image below demonstrates how to define a custom Elasticsearch index:
- Add an ingest pipeline in Elasticsearch to convert a Long-type field into a Date-type field. The image below illustrates how to define the ingest pipeline:
- Attach the ingest pipeline to the Elasticsearch index.
summery
This document describes how to build a data pipeline that synchronizes data from an Oracle database to Elasticsearch using Kafka and Kafka Connect. It begins with preparing a Kafka cluster deployed via the Strimzi operator and creating a Kafka Connect instance.
The pipeline first uses a JDBC Source Connector to read data from an Oracle table and publish it to Kafka topics. The setup is verified by checking topic creation and consuming sample messages from Kafka. Next, an Elasticsearch Sink Connector is configured to consume data from Kafka and automatically create and populate an Elasticsearch index.





Top comments (1)
Excellent 👍🏻