<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0" xmlns:atom="http://www.w3.org/2005/Atom" xmlns:dc="http://purl.org/dc/elements/1.1/">
  <channel>
    <title>DEV Community: sisiodos</title>
    <description>The latest articles on DEV Community by sisiodos (@sisiodos).</description>
    <link>https://dev.to/sisiodos</link>
    <image>
      <url>https://media2.dev.to/dynamic/image/width=90,height=90,fit=cover,gravity=auto,format=auto/https:%2F%2Fdev-to-uploads.s3.amazonaws.com%2Fuploads%2Fuser%2Fprofile_image%2F3148210%2Fd6559171-8185-4322-a2fd-0def314aa783.png</url>
      <title>DEV Community: sisiodos</title>
      <link>https://dev.to/sisiodos</link>
    </image>
    <atom:link rel="self" type="application/rss+xml" href="https://dev.to/feed/sisiodos"/>
    <language>en</language>
    <item>
      <title>Connecting RDBs and Search Engines — Chapter 5</title>
      <dc:creator>sisiodos</dc:creator>
      <pubDate>Sat, 10 May 2025 22:38:23 +0000</pubDate>
      <link>https://dev.to/sisiodos/connecting-rdbs-and-search-engines-chapter-5-5eb9</link>
      <guid>https://dev.to/sisiodos/connecting-rdbs-and-search-engines-chapter-5-5eb9</guid>
      <description>&lt;h2&gt;
  
  
  Chapter 5: Implementing a CDC Join Pipeline with Flink SQL
&lt;/h2&gt;

&lt;p&gt;In this chapter, we build a full pipeline that captures CDC streams from PostgreSQL via Debezium, joins the data using Flink SQL, and stores the results in OpenSearch.&lt;/p&gt;

&lt;h2&gt;
  
  
  Architecture Overview
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;PostgreSQL
   ↓ (CDC)
Debezium (Kafka Connect)
   ↓ (debezium-json)
Kafka Topic (orders, products)
   ↓
Flink SQL
   ↓
OpenSearch (orders_with_products)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  PostgreSQL Table Setup
&lt;/h2&gt;

&lt;p&gt;Add the following to &lt;code&gt;postgres/01-init.sql&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;-- Products table&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;products&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;product_id&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;32&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;product_name&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;255&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="n"&gt;category_id&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;32&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="k"&gt;ALTER&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;products&lt;/span&gt; &lt;span class="n"&gt;REPLICA&lt;/span&gt; &lt;span class="k"&gt;IDENTITY&lt;/span&gt; &lt;span class="k"&gt;FULL&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="c1"&gt;-- Initial product data&lt;/span&gt;
&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;products&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;product_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;product_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;category_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;VALUES&lt;/span&gt;
&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'P001'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'Sneaker X'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'C001'&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'P002'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'Jacket Y'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'C002'&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="c1"&gt;-- Orders table&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;orders&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;order_id&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;32&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;order_time&lt;/span&gt; &lt;span class="nb"&gt;TIMESTAMP&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;customer_id&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;32&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="n"&gt;product_id&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;32&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
  &lt;span class="n"&gt;quantity&lt;/span&gt; &lt;span class="nb"&gt;INT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;price&lt;/span&gt; &lt;span class="nb"&gt;NUMERIC&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;10&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="k"&gt;ALTER&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;orders&lt;/span&gt; &lt;span class="n"&gt;REPLICA&lt;/span&gt; &lt;span class="k"&gt;IDENTITY&lt;/span&gt; &lt;span class="k"&gt;FULL&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="c1"&gt;-- Initial order data&lt;/span&gt;
&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;orders&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;order_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;order_time&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;customer_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;product_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;quantity&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;price&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;VALUES&lt;/span&gt;
&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'O1001'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'2025-04-27 10:00:00'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'CUST01'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'P001'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;9800&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;),&lt;/span&gt;
&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="s1"&gt;'O1002'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'2025-04-27 10:05:00'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'CUST02'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'P002'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="mi"&gt;15800&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="mi"&gt;00&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="c1"&gt;-- Grant permissions to Debezium user&lt;/span&gt;
&lt;span class="k"&gt;GRANT&lt;/span&gt; &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;products&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;orders&lt;/span&gt; &lt;span class="k"&gt;TO&lt;/span&gt; &lt;span class="n"&gt;debezium&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="c1"&gt;-- Add tables to existing publication&lt;/span&gt;
&lt;span class="k"&gt;ALTER&lt;/span&gt; &lt;span class="n"&gt;PUBLICATION&lt;/span&gt; &lt;span class="n"&gt;debezium_pub&lt;/span&gt; &lt;span class="k"&gt;ADD&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;products&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;orders&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Register Debezium Connector
&lt;/h2&gt;

&lt;p&gt;After Kafka Connect is running, register the connector using a script such as &lt;code&gt;scripts/02-debezium-table-table-join.sh&lt;/code&gt;. Ensure the config includes:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="nl"&gt;"table.include.list"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"public.testtable,public.products,public.orders"&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Create OpenSearch Index
&lt;/h2&gt;

&lt;p&gt;Prepare &lt;code&gt;opensearch/orders_with_products-mapping.json&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"mappings"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"properties"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"order_id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"keyword"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"product_id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"keyword"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"product_name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"text"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
        &lt;/span&gt;&lt;span class="nl"&gt;"fields"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"raw"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"keyword"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"category_id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"keyword"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"quantity"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"integer"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"price"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"double"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Create the index:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;curl &lt;span class="nt"&gt;-X&lt;/span&gt; PUT &lt;span class="s2"&gt;"http://localhost:9200/orders_with_products"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Content-Type: application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-d&lt;/span&gt; @opensearch/orders_with_products-mapping.json
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;blockquote&gt;
&lt;p&gt;✅ &lt;code&gt;product_name&lt;/code&gt; uses a multi-field (text + keyword) for full-text and exact match support.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h2&gt;
  
  
  Example Flink SQL
&lt;/h2&gt;

&lt;p&gt;Create &lt;code&gt;flink/sql/table-table-join.sql&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;-- Products table&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;products&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;product_id&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;product_name&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;category_id&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;product_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;NOT&lt;/span&gt; &lt;span class="n"&gt;ENFORCED&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="s1"&gt;'connector'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'kafka'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'topic'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'dbserver1.public.products'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'properties.bootstrap.servers'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'kafka:9092'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'format'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'debezium-json'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'scan.startup.mode'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'earliest-offset'&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="c1"&gt;-- Orders table&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;orders&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;order_id&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;order_time&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;customer_id&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;product_id&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;quantity&lt;/span&gt; &lt;span class="nb"&gt;INT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;price&lt;/span&gt; &lt;span class="nb"&gt;DOUBLE&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="s1"&gt;'connector'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'kafka'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'topic'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'dbserver1.public.orders'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'properties.bootstrap.servers'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'kafka:9092'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'format'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'debezium-json'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'scan.startup.mode'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'earliest-offset'&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="c1"&gt;-- OpenSearch Sink&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;orders_with_products&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;order_id&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;product_id&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;product_name&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;category_id&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;quantity&lt;/span&gt; &lt;span class="nb"&gt;INT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;price&lt;/span&gt; &lt;span class="nb"&gt;DOUBLE&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;order_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;product_id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;NOT&lt;/span&gt; &lt;span class="n"&gt;ENFORCED&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="s1"&gt;'connector'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'opensearch-2'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'hosts'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'http://opensearch:9200'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'allow-insecure'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'true'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'index'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'orders_with_products'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'document-id.key-delimiter'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'$'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'sink.bulk-flush.max-size'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'1mb'&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="c1"&gt;-- Join View&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;VIEW&lt;/span&gt; &lt;span class="n"&gt;orders_with_products_view&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt;
  &lt;span class="n"&gt;o&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;order_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;o&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;product_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;product_name&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;category_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;o&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;quantity&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;o&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;price&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;orders&lt;/span&gt; &lt;span class="n"&gt;o&lt;/span&gt;
&lt;span class="k"&gt;INNER&lt;/span&gt; &lt;span class="k"&gt;JOIN&lt;/span&gt; &lt;span class="n"&gt;products&lt;/span&gt; &lt;span class="n"&gt;p&lt;/span&gt; &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;o&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;product_id&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="n"&gt;p&lt;/span&gt;&lt;span class="p"&gt;.&lt;/span&gt;&lt;span class="n"&gt;product_id&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="c1"&gt;-- Insert into OpenSearch&lt;/span&gt;
&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;orders_with_products&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;orders_with_products_view&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Run the Flink Job
&lt;/h2&gt;

&lt;p&gt;Execute from inside the Flink container:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;sql-client.sh &lt;span class="nt"&gt;-f&lt;/span&gt; /opt/flink/sql/table-table-join.sql
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Or via Docker:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker compose &lt;span class="nb"&gt;exec &lt;/span&gt;flink-jobmanager sql-client.sh &lt;span class="nt"&gt;-f&lt;/span&gt; /opt/flink/sql/table-table-join.sql
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h2&gt;
  
  
  Validation Steps
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;Verify CDC data in Kafka topics:
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker compose &lt;span class="nb"&gt;exec &lt;/span&gt;kafka kafka-console-consumer &lt;span class="nt"&gt;--bootstrap-server&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;--topic&lt;/span&gt; dbserver1.public.orders &lt;span class="nt"&gt;--from-beginning&lt;/span&gt;
docker compose &lt;span class="nb"&gt;exec &lt;/span&gt;kafka kafka-console-consumer &lt;span class="nt"&gt;--bootstrap-server&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;--topic&lt;/span&gt; dbserver1.public.products &lt;span class="nt"&gt;--from-beginning&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;Verify data in OpenSearch:
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;curl &lt;span class="nt"&gt;-X&lt;/span&gt; GET &lt;span class="s2"&gt;"http://localhost:9200/orders_with_products/_search?pretty"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;With custom query:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;curl &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nt"&gt;-X&lt;/span&gt; GET &lt;span class="s2"&gt;"http://localhost:9200/orders_with_products/_search?pretty"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s1"&gt;'Content-Type: application/json'&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'{ "query": { "match_all": {} } }'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;For convenience, use a script like &lt;code&gt;scripts/opensearch-query.sh&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;#!/bin/bash&lt;/span&gt;

osq&lt;span class="o"&gt;()&lt;/span&gt; &lt;span class="o"&gt;{&lt;/span&gt;
  &lt;span class="nb"&gt;local &lt;/span&gt;&lt;span class="nv"&gt;index&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;1&lt;/span&gt;&lt;span class="k"&gt;:-&lt;/span&gt;&lt;span class="nv"&gt;_all&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;
  &lt;span class="nb"&gt;local &lt;/span&gt;&lt;span class="nv"&gt;size&lt;/span&gt;&lt;span class="o"&gt;=&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;2&lt;/span&gt;&lt;span class="k"&gt;:-&lt;/span&gt;&lt;span class="nv"&gt;10&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;

  curl &lt;span class="nt"&gt;-s&lt;/span&gt; &lt;span class="nt"&gt;-X&lt;/span&gt; GET &lt;span class="s2"&gt;"http://localhost:9200/&lt;/span&gt;&lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;index&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;&lt;span class="s2"&gt;/_search?pretty"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s1"&gt;'Content-Type: application/json'&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
    &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s2"&gt;"{&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;query&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;: { &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;match_all&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;: {} }, &lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;size&lt;/span&gt;&lt;span class="se"&gt;\"&lt;/span&gt;&lt;span class="s2"&gt;: &lt;/span&gt;&lt;span class="k"&gt;${&lt;/span&gt;&lt;span class="nv"&gt;size&lt;/span&gt;&lt;span class="k"&gt;}&lt;/span&gt;&lt;span class="s2"&gt; }"&lt;/span&gt;
&lt;span class="o"&gt;}&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Example usage:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="nb"&gt;source &lt;/span&gt;scripts/opensearch-query.sh
osq orders_with_products
osq orders_with_products 20
osq orders_with_products 100 | jq &lt;span class="s1"&gt;'.hits.hits[]._id'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;blockquote&gt;
&lt;p&gt;In Chapter 6 and beyond, we will explore topics such as &lt;strong&gt;deduplication&lt;/strong&gt;, &lt;strong&gt;batch processing&lt;/strong&gt;, &lt;strong&gt;DLQ (Dead Letter Queue)&lt;/strong&gt;, and &lt;strong&gt;OpenSearch index partitioning strategies&lt;/strong&gt; for production-grade pipelines.&lt;/p&gt;
&lt;/blockquote&gt;

</description>
      <category>flink</category>
      <category>kafka</category>
      <category>opensearch</category>
      <category>books</category>
    </item>
    <item>
      <title>Connecting RDBs and Search Engines — Chapter 4 Part 2</title>
      <dc:creator>sisiodos</dc:creator>
      <pubDate>Sat, 10 May 2025 22:38:11 +0000</pubDate>
      <link>https://dev.to/sisiodos/connecting-rdbs-and-search-engines-chapter-4-part-2-863</link>
      <guid>https://dev.to/sisiodos/connecting-rdbs-and-search-engines-chapter-4-part-2-863</guid>
      <description>&lt;h1&gt;
  
  
  Chapter 4 (Part 2): Integrating Kafka CDC Data with OpenSearch Using Flink
&lt;/h1&gt;

&lt;p&gt;In this chapter, we use Flink SQL to process CDC data in Debezium JSON format from Kafka and write it to OpenSearch.&lt;/p&gt;




&lt;h2&gt;
  
  
  1. Architecture Overview
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;graph TD
  subgraph Source
    PG[PostgreSQL]
  end
  subgraph CDC
    DBZ[Debezium Connect]
  end
  subgraph Stream
    TOPIC[Kafka Topic]
  end
  subgraph Flink
    FLINK[Flink SQL Kafka Source]
    OS_SINK[Flink OpenSearch Sink]
  end
  subgraph Search
    OS[OpenSearch]
  end

  PG --&amp;gt; DBZ --&amp;gt; TOPIC --&amp;gt; FLINK --&amp;gt; OS_SINK --&amp;gt; OS
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Flink consumes CDC events from Kafka, transforms them, and upserts the data into OpenSearch.&lt;/p&gt;




&lt;h2&gt;
  
  
  2. Prerequisites
&lt;/h2&gt;

&lt;p&gt;Ensure the following OSS components are running (e.g., via Docker Compose):&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;PostgreSQL&lt;/li&gt;
&lt;li&gt;Apache Kafka&lt;/li&gt;
&lt;li&gt;Kafka Connect (Debezium)&lt;/li&gt;
&lt;li&gt;ZooKeeper&lt;/li&gt;
&lt;li&gt;Flink (1.19)&lt;/li&gt;
&lt;li&gt;OpenSearch (2.13)&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;💡 See &lt;code&gt;docker-compose.yaml&lt;/code&gt; for details.&lt;/p&gt;
&lt;/blockquote&gt;




&lt;h2&gt;
  
  
  3. Flink JAR Setup
&lt;/h2&gt;

&lt;p&gt;Place the following JARs in &lt;code&gt;flink/plugins/kafka/&lt;/code&gt;, &lt;code&gt;flink/plugins/opensearch&lt;/code&gt; directories.&lt;/p&gt;

&lt;h3&gt;
  
  
  Required Connector JARs
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;flink-sql-connector-kafka-3.3.0-1.19.jar&lt;/code&gt;&lt;/li&gt;
&lt;li&gt;&lt;code&gt;flink-sql-connector-opensearch2-2.0.0-1.19.jar&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;💡 Note: Why use the &lt;code&gt;plugins&lt;/code&gt; directory?&lt;/p&gt;

&lt;p&gt;Flink's SQL Client loads connectors via a &lt;strong&gt;plugin classloader&lt;/strong&gt;.&lt;br&gt;
Placing JARs directly into &lt;code&gt;flink/lib&lt;/code&gt; can lead to &lt;strong&gt;class conflicts when used alongside the DataStream API&lt;/strong&gt;.&lt;br&gt;
Using the &lt;code&gt;plugins/{connector}&lt;/code&gt; directory to place connector JARs is a safer approach that &lt;strong&gt;prevents&lt;br&gt;
classloading issues&lt;/strong&gt;.&lt;/p&gt;
&lt;/blockquote&gt;




&lt;h2&gt;
  
  
  4. Create OpenSearch Index
&lt;/h2&gt;

&lt;p&gt;Create &lt;code&gt;opensearch/test-index-mapping.json&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"mappings"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"properties"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"doc_id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"keyword"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"integer"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"message"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"text"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Run:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;curl &lt;span class="nt"&gt;-X&lt;/span&gt; PUT &lt;span class="s2"&gt;"http://localhost:9200/test-index"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Content-Type: application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-d&lt;/span&gt; @opensearch/test-index-mapping.json
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;blockquote&gt;
&lt;p&gt;✅ Defining mappings helps avoid automatic keyword field generation.&lt;/p&gt;
&lt;/blockquote&gt;




&lt;h2&gt;
  
  
  5. Create Flink SQL Script
&lt;/h2&gt;

&lt;p&gt;Save the following as &lt;code&gt;flink/sql/cdc_to_opensearch.sql&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;cdc_source&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="nb"&gt;INT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;NOT&lt;/span&gt; &lt;span class="n"&gt;ENFORCED&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="s1"&gt;'connector'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'kafka'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'topic'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'dbserver1.public.testtable'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'properties.bootstrap.servers'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'kafka:9092'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'format'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'debezium-json'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'scan.startup.mode'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'earliest-offset'&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;opensearch_sink&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;doc_id&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="nb"&gt;INT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;NOT&lt;/span&gt; &lt;span class="n"&gt;ENFORCED&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="s1"&gt;'connector'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'opensearch-2'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'hosts'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'http://opensearch:9200'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'allow-insecure'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'true'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'index'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'test-index'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'document-id.key-delimiter'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'$'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'sink.bulk-flush.max-size'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'1mb'&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;opensearch_sink&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt;
  &lt;span class="n"&gt;SHA256&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="k"&gt;CAST&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;))&lt;/span&gt; &lt;span class="k"&gt;AS&lt;/span&gt; &lt;span class="n"&gt;doc_id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;message&lt;/span&gt;
&lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;cdc_source&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  6. Run the Flink Job
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker compose &lt;span class="nb"&gt;exec &lt;/span&gt;flink-jobmanager bash
sql-client.sh &lt;span class="nt"&gt;-f&lt;/span&gt; /opt/flink/sql/cdc_to_opensearch.sql &lt;span class="nt"&gt;-l&lt;/span&gt; /opt/flink/plugins/kafka &lt;span class="nt"&gt;-l&lt;/span&gt; /opt/flink/plugins/
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  7. Verify OpenSearch Output
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;curl &lt;span class="nt"&gt;-X&lt;/span&gt; GET &lt;span class="s2"&gt;"http://localhost:9200/test-index/_search?pretty"&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Sample output:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="nl"&gt;"hits"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"_index"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"test-index"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"_id"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"c4ca4238a0b923820dcc509a6f75849b"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"_source"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"doc_id"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"c4ca4238a0b923820dcc509a6f75849b"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"message"&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"CDC test row"&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;]&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;p&gt;In this chapter, we built and verified a pipeline using Flink SQL to upsert CDC data into OpenSearch.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;em&gt;(Coming soon: Chapter 5 — Implementing a CDC Join Pipeline with Flink SQL)&lt;/em&gt;  &lt;/p&gt;
&lt;/blockquote&gt;

</description>
      <category>flink</category>
      <category>kafka</category>
      <category>opensearch</category>
      <category>books</category>
    </item>
    <item>
      <title>Connecting RDBs and Search Engines — Chapter 4 Part 1</title>
      <dc:creator>sisiodos</dc:creator>
      <pubDate>Sat, 10 May 2025 22:37:58 +0000</pubDate>
      <link>https://dev.to/sisiodos/connecting-rdbs-and-search-engines-chapter-4-part-1-540c</link>
      <guid>https://dev.to/sisiodos/connecting-rdbs-and-search-engines-chapter-4-part-1-540c</guid>
      <description>&lt;h1&gt;
  
  
  Chapter 4 (Part 1): Outputting Kafka CDC Data to Console with Flink
&lt;/h1&gt;

&lt;p&gt;In this chapter, we will process the CDC data delivered to Kafka using Flink SQL and print the output to the console. Before persisting to OpenSearch, we visually verify that Flink is correctly consuming and processing the data from Kafka.&lt;/p&gt;




&lt;h2&gt;
  
  
  1. Prerequisites
&lt;/h2&gt;

&lt;p&gt;Ensure the following components are already running:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;PostgreSQL&lt;/li&gt;
&lt;li&gt;Apache Kafka&lt;/li&gt;
&lt;li&gt;Kafka Connect&lt;/li&gt;
&lt;li&gt;ZooKeeper&lt;/li&gt;
&lt;li&gt;Flink&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Refer to &lt;a href="//debezium_kafka.md"&gt;Chapter 3&lt;/a&gt; for details on setting up the Debezium → Kafka pipeline.&lt;/p&gt;




&lt;h2&gt;
  
  
  2. Architecture Overview
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;graph TD
  subgraph source
    PG[PostgreSQL]
  end
  subgraph Change Data Capture
    DBZ[Debezium Connect]
  end
  subgraph stream platform
    TOPIC[Kafka Topic]
  end
  subgraph stream processing
    FLINK[Flink SQL Kafka Source]
    PRINT[Flink Print Sink Console]
  end

  PG --&amp;gt; DBZ --&amp;gt; TOPIC --&amp;gt; FLINK --&amp;gt; PRINT
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;We verify that CDC events flow from Kafka to Flink and appear in the standard output in a format like &lt;code&gt;+I[...]&lt;/code&gt;.&lt;/p&gt;




&lt;h2&gt;
  
  
  3. Add Kafka Connector to Flink
&lt;/h2&gt;

&lt;p&gt;Add the Kafka SQL connector JAR to Flink:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;code&gt;flink-sql-connector-kafka-3.3.0-1.19.jar&lt;/code&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;blockquote&gt;
&lt;p&gt;⚠️ To avoid interference with Flink core libraries, place the JAR in &lt;code&gt;flink/plugins/{connector}&lt;/code&gt;.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;h3&gt;
  
  
  &lt;code&gt;docker-compose.yaml&lt;/code&gt; Excerpt
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;flink-jobmanager&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
  &lt;span class="na"&gt;image&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="s"&gt;flink:1.19&lt;/span&gt;
  &lt;span class="na"&gt;command&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;[&lt;/span&gt;&lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/bin/bash"&lt;/span&gt;&lt;span class="pi"&gt;,&lt;/span&gt; &lt;span class="s2"&gt;"&lt;/span&gt;&lt;span class="s"&gt;/jobmanager-entrypoint.sh"&lt;/span&gt;&lt;span class="pi"&gt;]&lt;/span&gt;
  &lt;span class="na"&gt;volumes&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;./flink/jobmanager-entrypoint.sh:/jobmanager-entrypoint.sh&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;./flink/sql:/opt/flink/sql&lt;/span&gt;
    &lt;span class="pi"&gt;-&lt;/span&gt; &lt;span class="s"&gt;./flink/plugins/kafka:/opt/flink/plugins/kafka&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;blockquote&gt;
&lt;p&gt;💡 Note: Why use the &lt;code&gt;plugins&lt;/code&gt; directory?&lt;/p&gt;

&lt;p&gt;Flink's SQL Client loads connectors via a &lt;strong&gt;plugin classloader&lt;/strong&gt;.&lt;br&gt;
Placing JARs directly into &lt;code&gt;flink/lib&lt;/code&gt; can lead to &lt;strong&gt;class conflicts when used alongside the DataStream API&lt;/strong&gt;.&lt;br&gt;
Using the &lt;code&gt;plugins/{connector}&lt;/code&gt; directory to place connector JARs is a safer approach that &lt;strong&gt;prevents&lt;br&gt;
classloading issues&lt;/strong&gt;.&lt;/p&gt;
&lt;/blockquote&gt;




&lt;h2&gt;
  
  
  4. Write Flink SQL Script
&lt;/h2&gt;

&lt;p&gt;Save the following SQL in &lt;code&gt;flink/sql/cdc_to_console.sql&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;cdc_source&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="nb"&gt;INT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;NOT&lt;/span&gt; &lt;span class="n"&gt;ENFORCED&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="s1"&gt;'connector'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'kafka'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'topic'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'dbserver1.public.testtable'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'properties.bootstrap.servers'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'kafka:9092'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'format'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'debezium-json'&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="s1"&gt;'scan.startup.mode'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'earliest-offset'&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;print_sink&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="nb"&gt;INT&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="n"&gt;STRING&lt;/span&gt;
&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="s1"&gt;'connector'&lt;/span&gt; &lt;span class="o"&gt;=&lt;/span&gt; &lt;span class="s1"&gt;'print'&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;print_sink&lt;/span&gt;
&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;cdc_source&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Explanation of Flink SQL Tables
&lt;/h3&gt;

&lt;h4&gt;
  
  
  Kafka Source Table: &lt;code&gt;cdc_source&lt;/code&gt;
&lt;/h4&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;Property&lt;/th&gt;
&lt;th&gt;Description&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;connector = 'kafka'&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;Reads data from a Kafka topic&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;format = 'debezium-json'&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;Handles JSON messages in Debezium format&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;scan.startup.mode = 'earliest-offset'&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;Reads from the earliest offset&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;PRIMARY KEY (...) NOT ENFORCED&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;Defines a primary key without enforcement&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;h4&gt;
  
  
  Console Sink Table: &lt;code&gt;print_sink&lt;/code&gt;
&lt;/h4&gt;

&lt;ul&gt;
&lt;li&gt;Uses Flink's internal print connector to write output to stdout.&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  5. Run the Flink SQL Job
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker compose &lt;span class="nb"&gt;exec &lt;/span&gt;flink-jobmanager bash
sql-client.sh &lt;span class="nt"&gt;-f&lt;/span&gt; /opt/flink/sql/cdc_to_console.sql &lt;span class="nt"&gt;-l&lt;/span&gt; /opt/flink/plugins/kafka
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Verify Running Jobs
&lt;/h3&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker compose &lt;span class="nb"&gt;exec &lt;/span&gt;flink-jobmanager bash
flink list
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Expected output:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;------------------ Running/Restarting Jobs -------------------
&amp;lt;job-id&amp;gt; : insert-into_default_catalog.default_database.print_sink (RUNNING)
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  6. Check the Output
&lt;/h2&gt;



&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker compose logs flink-taskmanager
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Expected output:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;flink-taskmanager-1 | +I[1, CDC test row]
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Why TaskManager?
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;The print sink outputs to the logs of the TaskManager running the job.&lt;/li&gt;
&lt;li&gt;Use &lt;code&gt;docker compose logs flink-taskmanager&lt;/code&gt; to view the output.&lt;/li&gt;
&lt;/ul&gt;




&lt;h2&gt;
  
  
  7. Troubleshooting
&lt;/h2&gt;

&lt;h3&gt;
  
  
  SQL Client Doesn’t Start
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;Check if JobManager is running:
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker compose logs flink-jobmanager
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  No CDC Output Appears
&lt;/h3&gt;

&lt;ul&gt;
&lt;li&gt;Is the Kafka topic name correct?&lt;/li&gt;
&lt;li&gt;Is there data in the Kafka topic?&lt;/li&gt;
&lt;li&gt;Is &lt;code&gt;scan.startup.mode&lt;/code&gt; set to &lt;code&gt;earliest-offset&lt;/code&gt;?&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Check topic content:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;kafka-console-consumer &lt;span class="nt"&gt;--bootstrap-server&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;--topic&lt;/span&gt; dbserver1.public.testtable &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;--from-beginning&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  Bonus: Observing Parallelism
&lt;/h2&gt;

&lt;p&gt;If you scale Flink to use multiple TaskManagers, you’ll see output distributed across their logs.&lt;/p&gt;

&lt;p&gt;This allows you to observe parallel execution, slot allocation, and subtask distribution.&lt;/p&gt;




&lt;p&gt;In this chapter, we confirmed the flow from Kafka → Flink → console output. Next, we will write the results to OpenSearch for persistence.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;em&gt;(Coming soon: Chapter 4 Part 2 — Integrating Kafka CDC Data with OpenSearch Using Flink)&lt;/em&gt;  &lt;/p&gt;
&lt;/blockquote&gt;

</description>
      <category>flink</category>
      <category>kafka</category>
      <category>opensearch</category>
      <category>books</category>
    </item>
    <item>
      <title>Connecting RDBs and Search Engines — Chapter 3</title>
      <dc:creator>sisiodos</dc:creator>
      <pubDate>Sat, 10 May 2025 22:37:45 +0000</pubDate>
      <link>https://dev.to/sisiodos/connecting-rdbs-and-search-engines-chapter-3-3jh1</link>
      <guid>https://dev.to/sisiodos/connecting-rdbs-and-search-engines-chapter-3-3jh1</guid>
      <description>&lt;h2&gt;
  
  
  Chapter 3: Verifying the Flow from PostgreSQL to Debezium to Kafka
&lt;/h2&gt;

&lt;p&gt;In this chapter, we will test the end-to-end process of capturing change data from PostgreSQL with Debezium and delivering it to Kafka.&lt;/p&gt;




&lt;h2&gt;
  
  
  1. Starting the Environment
&lt;/h2&gt;

&lt;p&gt;Before beginning this chapter, ensure that all necessary services are running via Docker Compose:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker compose up &lt;span class="nt"&gt;-d&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Verify that the following components are running:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;PostgreSQL&lt;/li&gt;
&lt;li&gt;Apache Kafka&lt;/li&gt;
&lt;li&gt;Debezium Connect (Kafka Connect)&lt;/li&gt;
&lt;li&gt;ZooKeeper&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;See &lt;code&gt;docker-compose.yaml&lt;/code&gt; in the repository for service configuration details.&lt;/p&gt;




&lt;h2&gt;
  
  
  2. PostgreSQL Setup
&lt;/h2&gt;

&lt;p&gt;First, we create the table that Debezium will monitor. Add the following to &lt;code&gt;postgres/00-init.sql&lt;/code&gt;:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="c1"&gt;-- Create Debezium user&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;ROLE&lt;/span&gt; &lt;span class="n"&gt;debezium&lt;/span&gt; &lt;span class="k"&gt;WITH&lt;/span&gt; &lt;span class="n"&gt;LOGIN&lt;/span&gt; &lt;span class="n"&gt;PASSWORD&lt;/span&gt; &lt;span class="s1"&gt;'dbz'&lt;/span&gt; &lt;span class="n"&gt;REPLICATION&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="c1"&gt;-- Create target table&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;testtable&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;
  &lt;span class="n"&gt;id&lt;/span&gt; &lt;span class="nb"&gt;INTEGER&lt;/span&gt; &lt;span class="k"&gt;PRIMARY&lt;/span&gt; &lt;span class="k"&gt;KEY&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;
  &lt;span class="n"&gt;message&lt;/span&gt; &lt;span class="nb"&gt;VARCHAR&lt;/span&gt;&lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;255&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt;
&lt;span class="p"&gt;);&lt;/span&gt;
&lt;span class="k"&gt;INSERT&lt;/span&gt; &lt;span class="k"&gt;INTO&lt;/span&gt; &lt;span class="n"&gt;testtable&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="n"&gt;id&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="n"&gt;message&lt;/span&gt;&lt;span class="p"&gt;)&lt;/span&gt; &lt;span class="k"&gt;VALUES&lt;/span&gt; &lt;span class="p"&gt;(&lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt; &lt;span class="s1"&gt;'CDC test row'&lt;/span&gt;&lt;span class="p"&gt;);&lt;/span&gt;

&lt;span class="c1"&gt;-- Grant SELECT permission&lt;/span&gt;
&lt;span class="k"&gt;GRANT&lt;/span&gt; &lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="k"&gt;ON&lt;/span&gt; &lt;span class="n"&gt;testtable&lt;/span&gt; &lt;span class="k"&gt;TO&lt;/span&gt; &lt;span class="n"&gt;debezium&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;

&lt;span class="c1"&gt;-- Create publication (for pgoutput plugin)&lt;/span&gt;
&lt;span class="k"&gt;CREATE&lt;/span&gt; &lt;span class="n"&gt;PUBLICATION&lt;/span&gt; &lt;span class="n"&gt;debezium_pub&lt;/span&gt; &lt;span class="k"&gt;FOR&lt;/span&gt; &lt;span class="k"&gt;TABLE&lt;/span&gt; &lt;span class="n"&gt;testtable&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This script runs &lt;strong&gt;once only&lt;/strong&gt; during initial container creation.&lt;br&gt;
To apply changes to the script, you must delete persistent volumes and restart:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker compose down &lt;span class="nt"&gt;--volumes&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  3. PostgreSQL Container Configuration
&lt;/h2&gt;

&lt;p&gt;To enable CDC using WAL (Write-Ahead Log), add the following to the &lt;code&gt;docker-compose.yaml&lt;/code&gt; configuration:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight yaml"&gt;&lt;code&gt;&lt;span class="na"&gt;command&lt;/span&gt;&lt;span class="pi"&gt;:&lt;/span&gt; &lt;span class="pi"&gt;&amp;gt;&lt;/span&gt;
  &lt;span class="s"&gt;postgres&lt;/span&gt;
  &lt;span class="s"&gt;-c wal_level=logical&lt;/span&gt;
  &lt;span class="s"&gt;-c max_replication_slots=4&lt;/span&gt;
  &lt;span class="s"&gt;-c max_wal_senders=4&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To restart PostgreSQL without losing data:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker compose restart postgres
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;To rerun initialization SQL, you’ll need to remove volumes:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker compose down &lt;span class="nt"&gt;--volumes&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  4. Registering the Debezium Connector
&lt;/h2&gt;

&lt;p&gt;Once Debezium Connect is running, register the connector with the following command (Debezium 1.9 format):&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;curl &lt;span class="nt"&gt;-X&lt;/span&gt; POST &lt;span class="s2"&gt;"localhost:8083/connectors"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-H&lt;/span&gt; &lt;span class="s2"&gt;"Content-Type: application/json"&lt;/span&gt; &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;-d&lt;/span&gt; &lt;span class="s1"&gt;'{
    "name": "postgres-connector",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "debezium",
      "database.password": "dbz",
      "database.dbname": "postgres",
      "database.server.name": "dbserver1",
      "topic.prefix": "dbserver1",
      "plugin.name": "pgoutput",
      "publication.name": "debezium_pub",
      "slot.name": "debezium_slot",
      "slot.drop.on.stop": "true",
      "table.include.list": "public.testtable",
      "snapshot.mode": "initial",
      "tombstones.on.delete": "false",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable": "false",
      "value.converter.schemas.enable": "false"
    }
  }'&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  About &lt;code&gt;snapshot.mode&lt;/code&gt;
&lt;/h3&gt;

&lt;p&gt;The &lt;code&gt;snapshot.mode&lt;/code&gt; option controls whether to load the full contents of the database at connector startup.&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;code&gt;initial&lt;/code&gt;: Load all existing data once, then capture changes (default)&lt;/li&gt;
&lt;li&gt;
&lt;code&gt;never&lt;/code&gt;: Skip initial load; capture only subsequent changes&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;In this guide, we use &lt;code&gt;initial&lt;/code&gt;, so existing rows are sent to Kafka with &lt;code&gt;op: r&lt;/code&gt; (read).&lt;/p&gt;




&lt;h2&gt;
  
  
  5. Checking Connector Status
&lt;/h2&gt;

&lt;p&gt;Check the connector status:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;curl http://localhost:8083/connectors/postgres-connector/status
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Expected output:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"name"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"postgres-connector"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"connector"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"state"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"RUNNING"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"worker_id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"10.4.1.81:8083"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"tasks"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;[&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;0&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"state"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"RUNNING"&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
      &lt;/span&gt;&lt;span class="nl"&gt;"worker_id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"10.4.1.81:8083"&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;],&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"type"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"source"&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  6. Checking Kafka Topics
&lt;/h2&gt;

&lt;p&gt;List Kafka topics:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Enter Kafka container&lt;/span&gt;
docker compose &lt;span class="nb"&gt;exec &lt;/span&gt;kafka bash

&lt;span class="c"&gt;# List topics&lt;/span&gt;
kafka-topics &lt;span class="nt"&gt;--bootstrap-server&lt;/span&gt; localhost:9092 &lt;span class="nt"&gt;--list&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Expected output:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;dbserver1.public.testtable
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  7. Viewing CDC Events from Kafka
&lt;/h2&gt;

&lt;p&gt;Use the following to consume events from the topic:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;kafka-console-consumer &lt;span class="nt"&gt;--bootstrap-server&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;--topic&lt;/span&gt; dbserver1.public.testtable &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;--from-beginning&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Sample snapshot output:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"before"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"after"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;1&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"message"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"CDC test row"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"op"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"r"&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  8. Confirming Live Data Changes
&lt;/h2&gt;

&lt;p&gt;Add a new row in PostgreSQL:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;&lt;span class="c"&gt;# Log into PostgreSQL&lt;/span&gt;
docker compose &lt;span class="nb"&gt;exec &lt;/span&gt;postgres psql &lt;span class="nt"&gt;-U&lt;/span&gt; postgres

&lt;span class="c"&gt;# Insert new row&lt;/span&gt;
INSERT INTO testtable &lt;span class="o"&gt;(&lt;/span&gt;&lt;span class="nb"&gt;id&lt;/span&gt;, message&lt;span class="o"&gt;)&lt;/span&gt; VALUES &lt;span class="o"&gt;(&lt;/span&gt;2, &lt;span class="s1"&gt;'inserted row'&lt;/span&gt;&lt;span class="o"&gt;)&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Expected Kafka output:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"before"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"after"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"message"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"inserted row"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"op"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"c"&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Debezium JSON Event Format
&lt;/h3&gt;

&lt;p&gt;Debezium uses the following format for CDC messages:&lt;/p&gt;

&lt;div class="table-wrapper-paragraph"&gt;&lt;table&gt;
&lt;thead&gt;
&lt;tr&gt;
&lt;th&gt;
&lt;code&gt;op&lt;/code&gt; value&lt;/th&gt;
&lt;th&gt;Meaning&lt;/th&gt;
&lt;/tr&gt;
&lt;/thead&gt;
&lt;tbody&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;r&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;Snapshot read (initial data)&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;c&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;Insert (create)&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;u&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;Update&lt;/td&gt;
&lt;/tr&gt;
&lt;tr&gt;
&lt;td&gt;&lt;code&gt;d&lt;/code&gt;&lt;/td&gt;
&lt;td&gt;Delete&lt;/td&gt;
&lt;/tr&gt;
&lt;/tbody&gt;
&lt;/table&gt;&lt;/div&gt;

&lt;p&gt;For example, the following JSON means a new row was inserted:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight json"&gt;&lt;code&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"before"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="kc"&gt;null&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"after"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="p"&gt;{&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"id"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="mi"&gt;2&lt;/span&gt;&lt;span class="p"&gt;,&lt;/span&gt;&lt;span class="w"&gt;
    &lt;/span&gt;&lt;span class="nl"&gt;"message"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"inserted row"&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="p"&gt;},&lt;/span&gt;&lt;span class="w"&gt;
  &lt;/span&gt;&lt;span class="nl"&gt;"op"&lt;/span&gt;&lt;span class="p"&gt;:&lt;/span&gt;&lt;span class="w"&gt; &lt;/span&gt;&lt;span class="s2"&gt;"c"&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;span class="p"&gt;}&lt;/span&gt;&lt;span class="w"&gt;
&lt;/span&gt;&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;h2&gt;
  
  
  Troubleshooting
&lt;/h2&gt;

&lt;h3&gt;
  
  
  Topic Not Created in Kafka
&lt;/h3&gt;

&lt;p&gt;If &lt;code&gt;dbserver1.public.testtable&lt;/code&gt; doesn’t appear, check:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;That &lt;code&gt;debezium_pub&lt;/code&gt; exists in PostgreSQL:
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight sql"&gt;&lt;code&gt;&lt;span class="k"&gt;SELECT&lt;/span&gt; &lt;span class="o"&gt;*&lt;/span&gt; &lt;span class="k"&gt;FROM&lt;/span&gt; &lt;span class="n"&gt;pg_publication&lt;/span&gt;&lt;span class="p"&gt;;&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;&lt;p&gt;That &lt;code&gt;publication.name&lt;/code&gt; and &lt;code&gt;slot.name&lt;/code&gt; are correctly set in the connector config&lt;/p&gt;&lt;/li&gt;
&lt;li&gt;&lt;p&gt;That the &lt;code&gt;00-init.sql&lt;/code&gt; script ran (reset if needed):&lt;br&gt;
&lt;/p&gt;&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker compose down &lt;span class="nt"&gt;--volumes&lt;/span&gt;
docker compose up &lt;span class="nt"&gt;-d&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;ul&gt;
&lt;li&gt;That Kafka Connect logs are error-free:
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker compose logs connect
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  No Events Displayed from Kafka
&lt;/h3&gt;

&lt;p&gt;If CDC events aren’t visible in Kafka:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Ensure the connector is &lt;code&gt;RUNNING&lt;/code&gt; (see step 5)&lt;/li&gt;
&lt;li&gt;Ensure the inserted row has a unique ID&lt;/li&gt;
&lt;li&gt;Make sure you’re consuming from the beginning:
&lt;/li&gt;
&lt;/ul&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;kafka-console-consumer &lt;span class="nt"&gt;--bootstrap-server&lt;/span&gt; localhost:9092 &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;--topic&lt;/span&gt; dbserver1.public.testtable &lt;span class="se"&gt;\&lt;/span&gt;
  &lt;span class="nt"&gt;--from-beginning&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;






&lt;p&gt;In this chapter, we confirmed the flow of change data from PostgreSQL to Kafka using Debezium. In the next chapter, we'll use Flink to process this data.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;em&gt;(Coming soon: Chapter 4 Part 1 — Outputting Kafka CDC Data to Console with Flink)&lt;/em&gt;  &lt;/p&gt;
&lt;/blockquote&gt;

</description>
      <category>flink</category>
      <category>kafka</category>
      <category>opensearch</category>
      <category>books</category>
    </item>
    <item>
      <title>Connecting RDBs and Search Engines — Chapter 2</title>
      <dc:creator>sisiodos</dc:creator>
      <pubDate>Sat, 10 May 2025 22:36:31 +0000</pubDate>
      <link>https://dev.to/sisiodos/connecting-rdbs-and-search-engines-chapter-2-4hno</link>
      <guid>https://dev.to/sisiodos/connecting-rdbs-and-search-engines-chapter-2-4hno</guid>
      <description>&lt;h2&gt;
  
  
  Chapter 2: Understanding the Architecture and Component Relationships
&lt;/h2&gt;

&lt;p&gt;Below is a diagram representing the architecture discussed in this chapter:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight plaintext"&gt;&lt;code&gt;graph TD
  subgraph Source
    PG[PostgreSQL]
  end

  subgraph Change Data Capture
    Connect[Debezium Connect]
  end

  subgraph Stream Platform
    ZK[ZooKeeper] --&amp;gt; K[Kafka]
  end

  subgraph Stream Processing
    subgraph Flink
      JM[JobManager] --&amp;gt; TM[TaskManager]
    end
  end

  subgraph Sink
    OS[OpenSearch]
  end

  PG --&amp;gt; Connect --&amp;gt; K --&amp;gt; TM --&amp;gt; OS
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This chapter explains how each component interacts to enable real-time data flow into the search engine.&lt;/p&gt;

&lt;h2&gt;
  
  
  PostgreSQL
&lt;/h2&gt;

&lt;p&gt;PostgreSQL serves as the relational database and source of change data. Initial data is inserted via &lt;code&gt;00-init.sql&lt;/code&gt;, and subsequent changes (INSERT, UPDATE, DELETE) are detected by Debezium.&lt;/p&gt;

&lt;p&gt;Debezium leverages PostgreSQL's &lt;strong&gt;WAL (Write-Ahead Log)&lt;/strong&gt; to capture changes. WAL is a transaction log that records all data modifications before they're applied to the actual tables. By reading the WAL, changes can be captured without impacting application logic.&lt;/p&gt;

&lt;h2&gt;
  
  
  Debezium Connect
&lt;/h2&gt;

&lt;p&gt;Debezium operates as a connector on top of the Kafka Connect framework. Kafka Connect simplifies data integration with Kafka through reusable connectors. These connectors can be registered and managed via Kafka Connect's REST API (covered in the next chapter).&lt;/p&gt;

&lt;p&gt;In this setup, the Debezium PostgreSQL connector reads from WAL and emits change events to Kafka.&lt;/p&gt;

&lt;h2&gt;
  
  
  Kafka + ZooKeeper
&lt;/h2&gt;

&lt;p&gt;Kafka is a high-throughput messaging system used for processing and relaying real-time data. Debezium publishes change events to Kafka topics, which are then consumed by Flink or other downstream systems.&lt;/p&gt;

&lt;p&gt;Kafka manages data by topics. Each consumer subscribes to one or more topics to receive data streams. In this case, Flink uses the topic as a streaming source. Topic design is a crucial part of data modeling in streaming systems.&lt;/p&gt;

&lt;p&gt;ZooKeeper assists Kafka by managing configuration and cluster state. In this setup, we use a single-node ZooKeeper for simplicity.&lt;/p&gt;

&lt;h2&gt;
  
  
  Flink JobManager / TaskManager
&lt;/h2&gt;

&lt;p&gt;Flink is a distributed stream processing engine developed under the Apache project. It supports both real-time and batch processing with low latency and high throughput.&lt;/p&gt;

&lt;p&gt;Flink jobs consume data from Kafka, perform transformations, and output it to OpenSearch.&lt;/p&gt;

&lt;p&gt;JobManager handles job orchestration, while TaskManager executes tasks in parallel.&lt;/p&gt;

&lt;h2&gt;
  
  
  OpenSearch
&lt;/h2&gt;

&lt;p&gt;OpenSearch is an open-source search and analytics engine based on Elasticsearch. It can store structured and semi-structured data, and provides fast full-text search and aggregation capabilities.&lt;/p&gt;

&lt;p&gt;Data is stored in &lt;strong&gt;indexes&lt;/strong&gt;, similar to tables in relational databases. Each index holds multiple &lt;strong&gt;documents&lt;/strong&gt;, which represent individual records.&lt;/p&gt;

&lt;p&gt;Stored data can be queried and visualized using tools like OpenSearch Dashboards.&lt;/p&gt;

&lt;h2&gt;
  
  
  Explanation
&lt;/h2&gt;

&lt;p&gt;There are two main reasons behind this architecture. Let’s examine them:&lt;/p&gt;

&lt;h3&gt;
  
  
  Normalized Data vs. Search Performance
&lt;/h3&gt;

&lt;p&gt;Some may wonder: “Why not query PostgreSQL directly? Isn’t OpenSearch redundant?”&lt;/p&gt;

&lt;p&gt;This architecture deliberately separates the source and the search engine because of differing performance and design goals.&lt;/p&gt;

&lt;p&gt;Relational databases like PostgreSQL often use normalized data models to ensure consistency and avoid redundancy. While beneficial for updates, these models require joins across multiple tables, which can slow down search queries.&lt;/p&gt;

&lt;p&gt;In contrast, OpenSearch allows data to be &lt;strong&gt;denormalized&lt;/strong&gt; and restructured for optimal search performance. This avoids costly joins and improves response times.&lt;/p&gt;

&lt;p&gt;Thus, by separating the update-optimized PostgreSQL from the search-optimized OpenSearch, and managing the transformation in between, the overall system becomes more efficient.&lt;/p&gt;

&lt;h3&gt;
  
  
  The Value of Real-Time Stream Processing
&lt;/h3&gt;

&lt;p&gt;In this architecture, PostgreSQL ensures consistency, while OpenSearch provides high-performance search over transformed data.&lt;/p&gt;

&lt;p&gt;The pipeline—Debezium detecting changes, Kafka transporting them, Flink transforming them, and OpenSearch indexing them—enables real-time propagation of updates.&lt;/p&gt;

&lt;p&gt;The benefits of this streaming approach include:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Real-time responsiveness&lt;/strong&gt;: Unlike batch jobs, changes propagate immediately, allowing up-to-date search results.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Asynchronous replication&lt;/strong&gt;: Changes can be replicated without impacting application/database performance.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Flexible transformation&lt;/strong&gt;: Flink enables complex filtering, joins, and aggregations tailored to search use cases.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Change Data Capture (CDC) with stream processing is a key design pattern in modern data infrastructure.&lt;/p&gt;

&lt;p&gt;In the next chapter, we'll create tables in PostgreSQL, generate changes, and observe how events are streamed to Kafka topics.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;em&gt;(Coming soon: Chapter 3 — Verifying the Flow from PostgreSQL to Debezium to Kafka)&lt;/em&gt;  &lt;/p&gt;
&lt;/blockquote&gt;

</description>
      <category>flink</category>
      <category>kafka</category>
      <category>opensearch</category>
      <category>books</category>
    </item>
    <item>
      <title>Connecting RDBs and Search Engines — Chapter 1</title>
      <dc:creator>sisiodos</dc:creator>
      <pubDate>Sat, 10 May 2025 22:35:18 +0000</pubDate>
      <link>https://dev.to/sisiodos/connecting-rdbs-and-search-engines-chapter-1-42hf</link>
      <guid>https://dev.to/sisiodos/connecting-rdbs-and-search-engines-chapter-1-42hf</guid>
      <description>&lt;h1&gt;
  
  
  Chapter 1: Setting Up the Environment with Docker Compose
&lt;/h1&gt;

&lt;h2&gt;
  
  
  Purpose of This Chapter
&lt;/h2&gt;

&lt;p&gt;The goal of this chapter is to help you build and understand a data pipeline that streams change data from PostgreSQL through stream processing and into OpenSearch, all within your local environment.&lt;/p&gt;

&lt;h2&gt;
  
  
  About the Code (Repeat)
&lt;/h2&gt;

&lt;p&gt;The &lt;code&gt;docker-compose.yaml&lt;/code&gt; file and PostgreSQL initialization scripts used in this chapter are available in the following repository:&lt;/p&gt;

&lt;p&gt;👉 &lt;a href="https://github.com/sisiodos/rdb-to-search-pipeline-with-flink" rel="noopener noreferrer"&gt;https://github.com/sisiodos/rdb-to-search-pipeline-with-flink&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;If you haven't cloned the repository yet, you can set it up with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;git clone https://github.com/sisiodos/rdb-to-search-pipeline-with-flink.git
&lt;span class="nb"&gt;cd &lt;/span&gt;rdb-to-search-pipeline-with-flink
docker compose up &lt;span class="nt"&gt;-d&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;The &lt;code&gt;docker/&lt;/code&gt; directory contains the configuration files, and the &lt;code&gt;postgres/&lt;/code&gt; directory contains SQL scripts for initializing sample data.&lt;/p&gt;

&lt;h2&gt;
  
  
  Architecture Overview
&lt;/h2&gt;

&lt;p&gt;Here’s a brief explanation of each technology used in this pipeline, so even readers unfamiliar with these tools can follow along:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;PostgreSQL&lt;/strong&gt;: An open-source relational database. It serves as the source of data changes in this setup.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Debezium&lt;/strong&gt;: A CDC (Change Data Capture) tool that detects changes in the database and publishes them to Kafka in real time.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Kafka&lt;/strong&gt;: A high-throughput messaging system that temporarily stores the change data from Debezium and relays it to Flink.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Flink&lt;/strong&gt;: A distributed stream processing engine. It processes the change data from Kafka and sends it to OpenSearch.&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;OpenSearch&lt;/strong&gt;: A search engine where the processed data from Flink is stored and made searchable.&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;This pipeline demonstrates the end-to-end flow of capturing changes from PostgreSQL and delivering them to OpenSearch via Kafka and Flink.&lt;/p&gt;

&lt;h2&gt;
  
  
  Building the Environment with Docker Compose
&lt;/h2&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;strong&gt;Note:&lt;/strong&gt; This guide assumes a local environment (Mac / Windows / Linux) with Docker Desktop installed. If you're unfamiliar with Docker CLI or Compose, refer to the &lt;a href="https://www.docker.com/" rel="noopener noreferrer"&gt;official Docker documentation&lt;/a&gt;.&lt;/p&gt;
&lt;/blockquote&gt;

&lt;p&gt;Docker Compose is a tool for defining and managing multi-container Docker applications. With a single &lt;code&gt;docker-compose.yaml&lt;/code&gt; file, you can describe multiple services, including images, ports, environment variables, and dependencies.&lt;/p&gt;

&lt;p&gt;In this project, the &lt;code&gt;docker-compose.yaml&lt;/code&gt; file defines the following open-source services:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;PostgreSQL (sample database)&lt;/li&gt;
&lt;li&gt;Debezium Connect (captures changes from PostgreSQL and sends to Kafka)&lt;/li&gt;
&lt;li&gt;Kafka + ZooKeeper (messaging infrastructure)&lt;/li&gt;
&lt;li&gt;Flink (JobManager and TaskManager)&lt;/li&gt;
&lt;li&gt;OpenSearch (search engine)&lt;/li&gt;
&lt;/ul&gt;

&lt;h3&gt;
  
  
  Starting the Services
&lt;/h3&gt;

&lt;p&gt;To start all services:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker compose up &lt;span class="nt"&gt;-d&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Stopping the Services
&lt;/h3&gt;

&lt;p&gt;To stop all services:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker compose down
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Managing Individual Services
&lt;/h3&gt;

&lt;p&gt;You can control individual services (e.g., pause, restart, check logs) like this:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker compose restart kafka            &lt;span class="c"&gt;# Restart Kafka service only&lt;/span&gt;
docker compose stop connect             &lt;span class="c"&gt;# Stop Connect service only&lt;/span&gt;
docker compose start connect            &lt;span class="c"&gt;# Start Connect service only&lt;/span&gt;
docker compose logs flink-taskmanager   &lt;span class="c"&gt;# View logs from Flink TaskManager&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;h3&gt;
  
  
  Pause and Resume
&lt;/h3&gt;

&lt;p&gt;To temporarily suspend or resume all services:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker compose pause
docker compose unpause
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This is useful when you want to reduce CPU or I/O usage or pause processing during log inspection.&lt;/p&gt;

&lt;h3&gt;
  
  
  Remove Persistent Volumes (Full Cleanup)
&lt;/h3&gt;

&lt;p&gt;To completely clean up the environment, including persistent volumes (e.g., PostgreSQL data), use:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;docker compose down &lt;span class="nt"&gt;--volumes&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;This removes all container volumes and resets everything to its initial state. See the &lt;a href="https://docs.docker.com/compose/" rel="noopener noreferrer"&gt;official documentation&lt;/a&gt; for more details.&lt;/p&gt;

&lt;h2&gt;
  
  
  Related Links
&lt;/h2&gt;

&lt;ul&gt;
&lt;li&gt;
&lt;strong&gt;Debezium&lt;/strong&gt;: &lt;a href="https://debezium.io/" rel="noopener noreferrer"&gt;Official Site&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Apache Kafka&lt;/strong&gt;: &lt;a href="https://kafka.apache.org/" rel="noopener noreferrer"&gt;Official Site&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Apache Flink&lt;/strong&gt;: &lt;a href="https://flink.apache.org/" rel="noopener noreferrer"&gt;Official Site&lt;/a&gt;
&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;OpenSearch&lt;/strong&gt;: &lt;a href="https://opensearch.org/" rel="noopener noreferrer"&gt;Official Site&lt;/a&gt;
&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;With this, you now have the foundational environment for building a local data pipeline. In the next chapter, we will explore the structure and relationships between each component.&lt;/p&gt;

&lt;p&gt;In the next chapter, we will explore the structure and relationships between each component.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;em&gt;(Coming soon: Chapter 2 — Understanding the Architecture and Component Relationships)&lt;/em&gt;  &lt;/p&gt;
&lt;/blockquote&gt;

</description>
      <category>flink</category>
      <category>kafka</category>
      <category>opensearch</category>
      <category>books</category>
    </item>
    <item>
      <title>Connecting RDBs and Search Engines — Preface</title>
      <dc:creator>sisiodos</dc:creator>
      <pubDate>Sat, 10 May 2025 22:33:28 +0000</pubDate>
      <link>https://dev.to/sisiodos/connecting-rdbs-and-search-engines-preface-12hm</link>
      <guid>https://dev.to/sisiodos/connecting-rdbs-and-search-engines-preface-12hm</guid>
      <description>&lt;h1&gt;
  
  
  Introduction
&lt;/h1&gt;

&lt;h2&gt;
  
  
  Why I Wrote This Guide
&lt;/h2&gt;

&lt;p&gt;In today's system architectures, streaming massive amounts of data into a search engine in real time has become increasingly common. However, the underlying mechanisms—stream processing, data transformation, and integration across systems—pose many hidden complexities.&lt;/p&gt;

&lt;p&gt;This guide organizes practical knowledge about OSS-based data pipelines using Kafka, Flink, and OpenSearch, and presents it in a way that allows anyone to learn by actually running and observing the system.&lt;/p&gt;

&lt;h2&gt;
  
  
  Intended Audience
&lt;/h2&gt;

&lt;p&gt;This guide is aimed at individuals who:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Prefer to experiment and validate systems locally rather than in the cloud&lt;/li&gt;
&lt;li&gt;Are interested in stream processing and real-time data integration&lt;/li&gt;
&lt;li&gt;Are new to Kafka, Flink, or OpenSearch&lt;/li&gt;
&lt;li&gt;Want to try out OSS-based architectures using Docker Compose&lt;/li&gt;
&lt;li&gt;Are interested in designing and verifying a data infrastructure but don't know where to start&lt;/li&gt;
&lt;/ul&gt;

&lt;p&gt;Even beginners can safely follow along, as the guide explains the background and architecture of each component in a clear and thorough way.&lt;/p&gt;

&lt;h2&gt;
  
  
  What You'll Learn
&lt;/h2&gt;

&lt;p&gt;After completing this guide, you will be able to:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Build a Kafka → Flink → OpenSearch pipeline using Docker Compose&lt;/li&gt;
&lt;li&gt;Understand the basics of Flink's Event Time and Watermark processing, and how to inspect logs&lt;/li&gt;
&lt;li&gt;Gain insights into detecting and visualizing data ingestion delays into the search engine&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Required Knowledge
&lt;/h2&gt;

&lt;p&gt;This guide assumes basic familiarity with:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;Linux command-line operations&lt;/li&gt;
&lt;li&gt;Basic Docker usage&lt;/li&gt;
&lt;/ul&gt;

&lt;h2&gt;
  
  
  Guide Structure
&lt;/h2&gt;

&lt;p&gt;The book is organized into the following chapters:&lt;/p&gt;

&lt;ol&gt;
&lt;li&gt;Setting up the environment using Docker Compose&lt;/li&gt;
&lt;li&gt;Understanding the architecture and relationship between components&lt;/li&gt;
&lt;li&gt;Extracting change data with Debezium and outputting it to Kafka&lt;/li&gt;
&lt;li&gt;Running Flink jobs and analyzing logs&lt;/li&gt;
&lt;li&gt;Sending data to a search engine and verifying it&lt;/li&gt;
&lt;/ol&gt;

&lt;p&gt;Each chapter includes diagrams and runnable examples to bridge theory with practice.&lt;/p&gt;

&lt;h2&gt;
  
  
  About the Code
&lt;/h2&gt;

&lt;p&gt;All code, SQL scripts, and Docker Compose configuration files in this guide are available on GitHub:&lt;/p&gt;

&lt;p&gt;👉 &lt;a href="https://github.com/sisiodos/rdb-to-search-pipeline-with-flink" rel="noopener noreferrer"&gt;GitHub repository&lt;/a&gt;&lt;/p&gt;

&lt;p&gt;Each chapter corresponds to a runnable configuration, so you can follow along and execute each step as you read.&lt;/p&gt;

&lt;p&gt;If Docker Desktop is installed, you can get started with:&lt;br&gt;
&lt;/p&gt;

&lt;div class="highlight js-code-highlight"&gt;
&lt;pre class="highlight shell"&gt;&lt;code&gt;git clone https://github.com/sisiodos/rdb-to-search-pipeline-with-flink.git
&lt;span class="nb"&gt;cd &lt;/span&gt;rdb-to-search-pipeline-with-flink
docker compose up &lt;span class="nt"&gt;-d&lt;/span&gt;
&lt;/code&gt;&lt;/pre&gt;

&lt;/div&gt;



&lt;p&gt;Chapter-specific files and commands are indicated in the guide to help you follow and try them hands-on.&lt;/p&gt;

&lt;h2&gt;
  
  
  Future Expansion (Planning Notes)
&lt;/h2&gt;

&lt;p&gt;While this guide focuses on the fundamentals of connecting stream processing with search engines using Flink, we are also planning to explore the following advanced topics:&lt;/p&gt;

&lt;ul&gt;
&lt;li&gt;&lt;strong&gt;Flink parallel execution and TaskManager slot configuration&lt;/strong&gt;&lt;/li&gt;
&lt;li&gt;&lt;strong&gt;Data distribution and visualization across multiple TaskManagers&lt;/strong&gt;&lt;/li&gt;
&lt;li&gt;
&lt;strong&gt;Order guarantee and consistency design for Flink → OpenSearch writes&lt;/strong&gt;

&lt;ul&gt;
&lt;li&gt;Upsert control using &lt;code&gt;_id&lt;/code&gt; and &lt;code&gt;op&lt;/code&gt; fields&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;li&gt;&lt;strong&gt;Deduplication strategies for handling duplicate messages&lt;/strong&gt;&lt;/li&gt;

&lt;li&gt;&lt;strong&gt;Checkpointing, exactly-once processing, and TwoPhaseCommitSink&lt;/strong&gt;&lt;/li&gt;

&lt;li&gt;

&lt;strong&gt;Design for stable operation and backpressure mitigation&lt;/strong&gt;

&lt;ul&gt;
&lt;li&gt;Analyzing bottlenecks via metrics like &lt;code&gt;currentSendBackPressure&lt;/code&gt;, &lt;code&gt;busyTimeMsPerSecond&lt;/code&gt;, I/O wait&lt;/li&gt;
&lt;li&gt;Tuning parallelism, slot placement, resource balancing&lt;/li&gt;
&lt;li&gt;Operator chaining, buffer timeouts, Watermark tuning&lt;/li&gt;
&lt;li&gt;Managing large state (joins, deduplication): key selection, TTL, partial aggregation&lt;/li&gt;
&lt;li&gt;Tuning RocksDB StateBackend and spill control&lt;/li&gt;
&lt;li&gt;Using Async I/O Sink, tuning bulk flush/batch size&lt;/li&gt;
&lt;li&gt;Monitoring and visualization using Flink Web UI and Grafana + Prometheus&lt;/li&gt;
&lt;li&gt;Alerting on checkpoint growth and state bloat&lt;/li&gt;
&lt;/ul&gt;


&lt;/li&gt;

&lt;/ul&gt;

&lt;p&gt;These topics go beyond the basics and involve more architectural thinking.&lt;/p&gt;

&lt;h2&gt;
  
  
  Behind the Scenes
&lt;/h2&gt;

&lt;p&gt;This guide was written in collaboration with OpenAI's conversational AI, ChatGPT-4o. From structure and technical verification to fine-tuning expressions, AI played a major role as a thinking partner throughout the writing process.&lt;/p&gt;

&lt;p&gt;That said, the final architecture, structure, verification, and all editorial decisions are the author's responsibility.&lt;/p&gt;

&lt;p&gt;The act of documenting and delivering technical insight is no longer the sole domain of human effort. I hope this guide serves as an example of what's possible when human and AI collaborate—and supports you in your own technical explorations.&lt;/p&gt;

&lt;p&gt;For what it's worth, during the writing process, ChatGPT-4o used "I" as its pronoun. Somewhere along the way, it became something like a co-author. I leave that note here, quietly.&lt;/p&gt;

&lt;blockquote&gt;
&lt;p&gt;&lt;em&gt;(Coming soon: Chapter 1 — Setting Up the Environment with Docker Compose)&lt;/em&gt;  &lt;/p&gt;
&lt;/blockquote&gt;

</description>
      <category>flink</category>
      <category>kafka</category>
      <category>opensearch</category>
      <category>books</category>
    </item>
  </channel>
</rss>
