Here, you will find a step-by-step tutorial on how to setup a local environment with a Kafka Docker container, where its events topic are streamed into a PostgreSQL table using JdbcSinkConnector
.
Source code can also be found in github.
Setup Docker containers
Download kafka-connect-jdbc
kafka-connect-jdbc
is a plugin that can be mounted into a Kafka Connect container. Technically, the plugin is a .jar
file that, when added to the Kafka Connect application, it enables the communication with databases.
kafka-connect-jdbc
can be downloaded from confluent website. or be running bellow commands:
curl -O https://d2p6pa21dvn84.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.8.0/confluentinc-kafka-connect-jdbc-10.8.0.zip
unzip confluentinc-kafka-connect-jdbc-10.8.0.zip
rm confluentinc-kafka-connect-jdbc-10.8.0.zip
Docker compose
copy/past bellow docker-compose.yml
next to confluentinc-kafka-connect-jdbc-10.8.0
folder. then run docker-compose up
version: '3.6'
services:
kafka_connect_postgresql:
image: postgres:17.0-alpine3.19
ports:
- 5432:5432
environment:
- POSTGRES_PASSWORD=kafka
- POSTGRES_USER=kafka
kafka:
image: confluentinc/cp-kafka:7.7.1
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: kafka
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
TOPIC_AUTO_CREATE: 'true'
kafka-connect:
ports:
- 8083:8083
image: confluentinc/cp-kafka-connect:7.7.1
volumes:
- ./confluentinc-kafka-connect-jdbc-10.8.0/lib/kafka-connect-jdbc-10.8.0.jar:/etc/kafka-connect/jars/kafka-connect-jdbc-10.8.0.jar
- ./confluentinc-kafka-connect-jdbc-10.8.0/lib/postgresql-42.4.4.jar:/etc/kafka-connect/jars/postgresql-42.4.4.jar
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:29092
CONNECT_GROUP_ID: kafka-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: '__connect-config'
CONNECT_OFFSET_STORAGE_TOPIC: '__connect-offsets'
CONNECT_STATUS_STORAGE_TOPIC: '__connect-status'
CONNECT_KEY_CONVERTER: 'io.confluent.connect.avro.AvroConverter'
CONNECT_VALUE_CONVERTER: 'io.confluent.connect.avro.AvroConverter'
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect'
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085'
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
schema-registry:
image: confluentinc/cp-schema-registry:7.7.1
depends_on: [kafka]
ports:
- "8085:8085"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:29092'
SCHEMA_REGISTRY_HOST_NAME: 'schema-registry'
SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8085'
Make sure that all the containers are up & running, docker ps
should show 4 containers having up
status:
- schema-registry-1
- kafka-connect-1
- kafka_connect_postgresql-1
- kafka-1
Configure Kafka connect
Kafka connect container offers a rest API to manage its connectors. In the scope of this tutorial we are only interested on the /connectors
API.
To get the list of active connectors run
curl http://localhost:8083/connectors
Right now the list is just empty. Go ahead and create a new connector:
curl -X POST -H "Content-Type: application/json" -d '{
"name": "postgres-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "video-game-topic",
"connection.url": "jdbc:postgresql://kafka_connect_postgresql:5432/postgres",
"connection.user": "kafka",
"connection.password": "kafka",
"table.name.format": "video_games",
"insert.mode": "upsert",
"pk.mode": "record_value",
"pk.fields": "id",
"batch.size": "1000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://schema-registry:8085",
"auto.create": "true"
}
}' http://localhost:8083/connectors
Implement a producer
Now that the infrastructure is ready. It is time to write a simple node application having as role: send kafka messages.
package.json
:
{
"dependencies": {
"@kafkajs/confluent-schema-registry": "^3.3.0",
"avsc": "^5.7.7",
"kafkajs": "^2.2.4"
}
}
producer.js
:
const {Kafka} = require('kafkajs')
const {SchemaRegistry, SchemaType} = require('@kafkajs/confluent-schema-registry')
const schema = {
type: "record",
name: "VideoGame",
namespace: "video_game.avro",
fields: [
{name: "id", type: "int"},
{name: "title", type: "string"},
{name: "year", type: "int"},
]
}
const registry = new SchemaRegistry({host: 'http://localhost:8085/'})
const message = {id: 2, title: "Crash Bandicoot", year: 1997}
const kafka = new Kafka({clientId: "my-app", brokers: ["localhost:9092"]})
const producer = kafka.producer()
const produce = async (message) => {
const {id} = await registry.register({
type: SchemaType.AVRO,
schema: JSON.stringify(schema)
}, {subject: "video-game-topic-value"})
await producer.connect()
const encodedMessage = await registry.encode(id, message)
console.log("sending message...")
await producer.send({
topic: 'video-game-topic',
messages: [
{
key: message.id.toString(),
value: encodedMessage
},
],
})
console.log("message sent")
await producer.disconnect()
}
produce(message).then(r => console.log("done")).catch(e => console.error(e))
Try sending some messages to kafka by running couple of times the producer
script.
npm install
node producer.js
node producer.js
node producer.js
At this point the messages are sitting on a topic named video-game-topic
. Kafka connect is configured to consume messages from that topic and write them into a table named video_games
in public
schema located in postgres DB.
docker-compose exec -it kafka_connect_postgresql psql -U kafka -d postgres
SELECT * FROM video_games;
Notice that we found only one row in the table, even though three messages were sent to the Kafka topic. This is because we configured JdbcSinkConnector to use the id field from the message content as the primary key and set the insert mode to upsert. As a result, the first message triggered an insert query, while the other two messages triggered update queries.
Source code can also be found in github.
Top comments (0)