<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: Ahmed Kooli</title>
    <description>The latest articles on DEV Community by Ahmed Kooli (@ahmed_kooli).</description>
    <link>https://dev.to/ahmed_kooli</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F91999%2F23896903-4438-4022-8857-9938d0853079.png</url>
      <title>DEV Community: Ahmed Kooli</title>
      <link>https://dev.to/ahmed_kooli</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/ahmed_kooli"/>
    <language>en</language>
    <item>
      <title>Stream Kafka events into a PostgreSQL table</title>
      <dc:creator>Ahmed Kooli</dc:creator>
      <pubDate>Sun, 01 Dec 2024 10:54:05 +0000</pubDate>
      <link>https://dev.to/ahmed_kooli/stream-kafka-events-into-a-postgresql-table-54g1</link>
      <guid>https://dev.to/ahmed_kooli/stream-kafka-events-into-a-postgresql-table-54g1</guid>
      <description>&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F1co3ba4u2d1z1iolapd9.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2F1co3ba4u2d1z1iolapd9.png" alt="Kafka to PostgreSQL stream" width="800" height="349"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Here, you will find a step-by-step tutorial on how to setup a local environment with a &lt;strong&gt;Kafka&lt;/strong&gt; Docker container, where its &lt;strong&gt;events&lt;/strong&gt; topic are streamed into a &lt;strong&gt;PostgreSQL&lt;/strong&gt; table using &lt;code&gt;JdbcSinkConnector&lt;/code&gt;.&lt;/p&gt;

&lt;p&gt;Source code can also be found in &lt;a href="https://github.com/kooliahmd/kafka-postgres-connect-example" rel="noopener noreferrer"&gt;github&lt;/a&gt;.&lt;/p&gt;

&lt;h2&gt;
  
  
  Setup Docker containers
&lt;/h2&gt;

&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fr9w9wc1fbvxkhvso4pc0.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fr9w9wc1fbvxkhvso4pc0.png" alt="Docker containers diagram" width="800" height="288"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;h3&gt;
  
  
  Download &lt;code&gt;kafka-connect-jdbc&lt;/code&gt;
&lt;/h3&gt;

&lt;p&gt;&lt;code&gt;kafka-connect-jdbc&lt;/code&gt; is a plugin that can be &lt;strong&gt;mounted&lt;/strong&gt; into a Kafka Connect container. Technically, the plugin is a &lt;code&gt;.jar&lt;/code&gt; file that, when added to the Kafka Connect application, it enables the communication with databases.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;kafka-connect-jdbc&lt;/code&gt; can be downloaded from &lt;a href="https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc" rel="noopener noreferrer"&gt;confluent&lt;/a&gt; website. or be running bellow commands:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;curl -O  https://d2p6pa21dvn84.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.8.0/confluentinc-kafka-connect-jdbc-10.8.0.zip
unzip confluentinc-kafka-connect-jdbc-10.8.0.zip
rm confluentinc-kafka-connect-jdbc-10.8.0.zip
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Docker compose
&lt;/h3&gt;

&lt;p&gt;copy/past bellow &lt;code&gt;docker-compose.yml&lt;/code&gt; next to &lt;code&gt;confluentinc-kafka-connect-jdbc-10.8.0&lt;/code&gt; folder. then run &lt;code&gt;docker-compose up&lt;/code&gt;&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;version: '3.6'
services:
  kafka_connect_postgresql:
    image: postgres:17.0-alpine3.19
    ports:
      - 5432:5432
    environment:
      - POSTGRES_PASSWORD=kafka
      - POSTGRES_USER=kafka

  kafka:
    image: confluentinc/cp-kafka:7.7.1
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: kafka
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
      TOPIC_AUTO_CREATE: 'true'

  kafka-connect:
    ports:
      - 8083:8083
    image: confluentinc/cp-kafka-connect:7.7.1
    volumes:
      - ./confluentinc-kafka-connect-jdbc-10.8.0/lib/kafka-connect-jdbc-10.8.0.jar:/etc/kafka-connect/jars/kafka-connect-jdbc-10.8.0.jar
      - ./confluentinc-kafka-connect-jdbc-10.8.0/lib/postgresql-42.4.4.jar:/etc/kafka-connect/jars/postgresql-42.4.4.jar
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:29092
      CONNECT_GROUP_ID: kafka-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: '__connect-config'
      CONNECT_OFFSET_STORAGE_TOPIC: '__connect-offsets'
      CONNECT_STATUS_STORAGE_TOPIC: '__connect-status'
      CONNECT_KEY_CONVERTER: 'io.confluent.connect.avro.AvroConverter'
      CONNECT_VALUE_CONVERTER: 'io.confluent.connect.avro.AvroConverter'
      CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect'
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085'
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8085'
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'

  schema-registry:
    image: confluentinc/cp-schema-registry:7.7.1
    depends_on: [kafka]
    ports:
      - "8085:8085"
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:29092'
      SCHEMA_REGISTRY_HOST_NAME: 'schema-registry'
      SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8085'
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Make sure that all the containers are up &amp;amp; running, &lt;code&gt;docker ps&lt;/code&gt; should show &lt;strong&gt;4 containers&lt;/strong&gt; having &lt;code&gt;up&lt;/code&gt; status:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;schema-registry-1&lt;/li&gt;
&lt;li&gt;kafka-connect-1&lt;/li&gt;
&lt;li&gt;kafka_connect_postgresql-1&lt;/li&gt;
&lt;li&gt;kafka-1&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Configure Kafka connect
&lt;/h3&gt;

&lt;p&gt;Kafka connect container offers a &lt;a href="https://docs.confluent.io/platform/current/connect/references/restapi.html" rel="noopener noreferrer"&gt;rest API&lt;/a&gt; to manage its connectors. In the scope of this tutorial we are only interested on the &lt;code&gt;/connectors&lt;/code&gt; API. &lt;br&gt;
To get the list of active connectors run&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;curl http://localhost:8083/connectors
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Right now the list is just empty. Go ahead and create a new connector:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;curl -X POST -H "Content-Type: application/json" -d '{
    "name": "postgres-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "video-game-topic",
        "connection.url": "jdbc:postgresql://kafka_connect_postgresql:5432/postgres",
        "connection.user": "kafka",
        "connection.password": "kafka",
        "table.name.format": "video_games",
        "insert.mode": "upsert",
        "pk.mode": "record_value",
        "pk.fields": "id",
        "batch.size": "1000",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schemas.enable": "true",
        "value.converter.schema.registry.url": "http://schema-registry:8085",
        "auto.create": "true"
    }
}' http://localhost:8083/connectors
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Implement a producer
&lt;/h3&gt;

&lt;p&gt;Now that the infrastructure is ready. It is time to write a simple node application having as role: send kafka messages.&lt;/p&gt;

&lt;p&gt;&lt;code&gt;package.json&lt;/code&gt; :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;{
  "dependencies": {
    "@kafkajs/confluent-schema-registry": "^3.3.0",
    "avsc": "^5.7.7",
    "kafkajs": "^2.2.4"
  }
}
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;code&gt;producer.js&lt;/code&gt; :&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;const {Kafka} = require('kafkajs')
const {SchemaRegistry, SchemaType} = require('@kafkajs/confluent-schema-registry')

const schema = {
    type: "record",
    name: "VideoGame",
    namespace: "video_game.avro",
    fields: [
        {name: "id", type: "int"},
        {name: "title", type: "string"},
        {name: "year", type: "int"},
    ]
}

const registry = new SchemaRegistry({host: 'http://localhost:8085/'})

const message = {id: 2, title: "Crash Bandicoot", year: 1997}

const kafka = new Kafka({clientId: "my-app", brokers: ["localhost:9092"]})
const producer = kafka.producer()
const produce = async (message) =&amp;gt; {
    const {id} = await registry.register({
        type: SchemaType.AVRO,
        schema: JSON.stringify(schema)
    }, {subject: "video-game-topic-value"})
    await producer.connect()
    const encodedMessage = await registry.encode(id, message)
    console.log("sending message...")
    await producer.send({
        topic: 'video-game-topic',
        messages: [
            {
                key: message.id.toString(),
                value: encodedMessage
            },
        ],
    })
    console.log("message sent")
    await producer.disconnect()
}

produce(message).then(r =&amp;gt; console.log("done")).catch(e =&amp;gt; console.error(e))
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Try sending some messages to kafka by running couple of times the &lt;code&gt;producer&lt;/code&gt; script.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;npm install
node producer.js
node producer.js
node producer.js
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;At this point the messages are sitting on a topic named &lt;code&gt;video-game-topic&lt;/code&gt;. Kafka connect is configured to consume messages from that topic and write them into a table named &lt;code&gt;video_games&lt;/code&gt; in &lt;code&gt;public&lt;/code&gt; schema located in postgres DB.&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;docker-compose exec -it kafka_connect_postgresql psql -U kafka -d postgres
SELECT * FROM video_games;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;&lt;a href="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fakk9d96ihgx4ipb6gf3n.png" class="article-body-image-wrapper"&gt;&lt;img src="https://media2.dev.to/dynamic/image/width=800%2Cheight=%2Cfit=scale-down%2Cgravity=auto%2Cformat=auto/https%3A%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Farticles%2Fakk9d96ihgx4ipb6gf3n.png" alt="SQL Query" width="560" height="182"&gt;&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Notice that we found only one row in the table, even though three messages were sent to the &lt;strong&gt;Kafka topic&lt;/strong&gt;. This is because we configured &lt;strong&gt;JdbcSinkConnector&lt;/strong&gt; to use the id field from the message content as the primary key and set the insert mode to upsert. As a result, the first message triggered an insert query, while the other two messages triggered update queries.&lt;/p&gt;

&lt;p&gt;Source code can also be found in &lt;a href="https://github.com/kooliahmd/kafka-postgres-connect-example" rel="noopener noreferrer"&gt;github&lt;/a&gt;.&lt;/p&gt;

</description>
    </item>
  </channel>
</rss>
