DEV Community

Ahmed Kooli
Ahmed Kooli

Posted on

Stream Kafka events into a PostgreSQL table

Kafka to PostgreSQL stream

Here, you will find a step-by-step tutorial on how to setup a local environment with a Kafka Docker container, where its events topic are streamed into a PostgreSQL table using JdbcSinkConnector.

Source code can also be found in github.

Setup Docker containers

Docker containers diagram

Download kafka-connect-jdbc

kafka-connect-jdbc is a plugin that can be mounted into a Kafka Connect container. Technically, the plugin is a .jar file that, when added to the Kafka Connect application, it enables the communication with databases.

kafka-connect-jdbc can be downloaded from confluent website. or be running bellow commands:

curl -O  https://d2p6pa21dvn84.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.8.0/confluentinc-kafka-connect-jdbc-10.8.0.zip
unzip confluentinc-kafka-connect-jdbc-10.8.0.zip
rm confluentinc-kafka-connect-jdbc-10.8.0.zip
Enter fullscreen mode Exit fullscreen mode

Docker compose

copy/past bellow docker-compose.yml next to confluentinc-kafka-connect-jdbc-10.8.0 folder. then run docker-compose up

version: '3.6'
services:
  kafka_connect_postgresql:
    image: postgres:17.0-alpine3.19
    ports:
      - 5432:5432
    environment:
      - POSTGRES_PASSWORD=kafka
      - POSTGRES_USER=kafka

  kafka:
    image: confluentinc/cp-kafka:7.7.1
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: kafka
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
      TOPIC_AUTO_CREATE: 'true'

  kafka-connect:
    ports:
      - 8083:8083
    image: confluentinc/cp-kafka-connect:7.7.1
    volumes:
      - ./confluentinc-kafka-connect-jdbc-10.8.0/lib/kafka-connect-jdbc-10.8.0.jar:/etc/kafka-connect/jars/kafka-connect-jdbc-10.8.0.jar
      - ./confluentinc-kafka-connect-jdbc-10.8.0/lib/postgresql-42.4.4.jar:/etc/kafka-connect/jars/postgresql-42.4.4.jar
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:29092
      CONNECT_GROUP_ID: kafka-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: '__connect-config'
      CONNECT_OFFSET_STORAGE_TOPIC: '__connect-offsets'
      CONNECT_STATUS_STORAGE_TOPIC: '__connect-status'
      CONNECT_KEY_CONVERTER: 'io.confluent.connect.avro.AvroConverter'
      CONNECT_VALUE_CONVERTER: 'io.confluent.connect.avro.AvroConverter'
      CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect'
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085'
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085'
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'

  schema-registry:
    image: confluentinc/cp-schema-registry:7.7.1
    depends_on: [kafka]
    ports:
      - "8085:8085"
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:29092'
      SCHEMA_REGISTRY_HOST_NAME: 'schema-registry'
      SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8085'
Enter fullscreen mode Exit fullscreen mode

Make sure that all the containers are up & running, docker ps should show 4 containers having up status:

  • schema-registry-1
  • kafka-connect-1
  • kafka_connect_postgresql-1
  • kafka-1

Configure Kafka connect

Kafka connect container offers a rest API to manage its connectors. In the scope of this tutorial we are only interested on the /connectors API.
To get the list of active connectors run

curl http://localhost:8083/connectors
Enter fullscreen mode Exit fullscreen mode

Right now the list is just empty. Go ahead and create a new connector:

curl -X POST -H "Content-Type: application/json" -d '{
    "name": "postgres-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "video-game-topic",
        "connection.url": "jdbc:postgresql://kafka_connect_postgresql:5432/postgres",
        "connection.user": "kafka",
        "connection.password": "kafka",
        "table.name.format": "video_games",
        "insert.mode": "upsert",
        "pk.mode": "record_value",
        "pk.fields": "id",
        "batch.size": "1000",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schemas.enable": "true",
        "value.converter.schema.registry.url": "http://schema-registry:8085",
        "auto.create": "true"
    }
}' http://localhost:8083/connectors
Enter fullscreen mode Exit fullscreen mode

Implement a producer

Now that the infrastructure is ready. It is time to write a simple node application having as role: send kafka messages.

package.json :

{
  "dependencies": {
    "@kafkajs/confluent-schema-registry": "^3.3.0",
    "avsc": "^5.7.7",
    "kafkajs": "^2.2.4"
  }
}
Enter fullscreen mode Exit fullscreen mode

producer.js :

const {Kafka} = require('kafkajs')
const {SchemaRegistry, SchemaType} = require('@kafkajs/confluent-schema-registry')

const schema = {
    type: "record",
    name: "VideoGame",
    namespace: "video_game.avro",
    fields: [
        {name: "id", type: "int"},
        {name: "title", type: "string"},
        {name: "year", type: "int"},
    ]
}

const registry = new SchemaRegistry({host: 'http://localhost:8085/'})

const message = {id: 2, title: "Crash Bandicoot", year: 1997}

const kafka = new Kafka({clientId: "my-app", brokers: ["localhost:9092"]})
const producer = kafka.producer()
const produce = async (message) => {
    const {id} = await registry.register({
        type: SchemaType.AVRO,
        schema: JSON.stringify(schema)
    }, {subject: "video-game-topic-value"})
    await producer.connect()
    const encodedMessage = await registry.encode(id, message)
    console.log("sending message...")
    await producer.send({
        topic: 'video-game-topic',
        messages: [
            {
                key: message.id.toString(),
                value: encodedMessage
            },
        ],
    })
    console.log("message sent")
    await producer.disconnect()
}

produce(message).then(r => console.log("done")).catch(e => console.error(e))
Enter fullscreen mode Exit fullscreen mode

Try sending some messages to kafka by running couple of times the producer script.

npm install
node producer.js
node producer.js
node producer.js
Enter fullscreen mode Exit fullscreen mode

At this point the messages are sitting on a topic named video-game-topic. Kafka connect is configured to consume messages from that topic and write them into a table named video_games in public schema located in postgres DB.

docker-compose exec -it kafka_connect_postgresql psql -U kafka -d postgres
SELECT * FROM video_games;
Enter fullscreen mode Exit fullscreen mode

SQL Query

Notice that we found only one row in the table, even though three messages were sent to the Kafka topic. This is because we configured JdbcSinkConnector to use the id field from the message content as the primary key and set the insert mode to upsert. As a result, the first message triggered an insert query, while the other two messages triggered update queries.

Source code can also be found in github.

Top comments (0)