Overview
I recently came across a discussion with potential customers about their ingestion pain points. While they want to have a near real-time ingestion experience, they are currently using PostgreSQL version 9.2 where it’s already out of support since 8 years ago (2017). It was interesting because given the lack of IT resources (it was a small team), the database version upgrade will be a stretch for them. This is also on top of another challenge to convince management since the result of software upgrade is (most likely) intangible. Currently they have more than 15 TB with more than 200k transactions per day. We were asked on how to do the ingestion without having to upgrade their systems along with the cost consciousness mindset.
In this article, I want to share what was the approach I decided to take given those limitations. This approach may not be the perfect solution (and I welcome to hear more if you have more efficient solutions), but I think this can be relevant for most enterprises since I believe many of them are running a legacy system, the budget is minimal (if any), and there is no direct impact to revenue when the IT team wants to perform an improvement in the systems.
Solution Design
Since the current PostgreSQL version is not supported with the recent CDC tools like Debezium (the stable version from Debezium requires PostgreSQL version 12 and above), we need to create additional tables inside the database to capture these changes. These tables then need to be registered to Kafka in order to enable Snowpipe Streaming. The high level diagram looks like this:

Setup Procedure to Capture Table Changes
First, we need to create a table to capture the changes we want to input later. This is the SQL pseudocode:
CREATE TABLE transaction_cdc_log (
cdc_id SERIAL PRIMARY KEY,
table_name VARCHAR(100) NOT NULL,
operation VARCHAR(10) NOT NULL CHECK (operation IN ('INSERT', 'UPDATE', 'DELETE')),
record_id INTEGER NOT NULL,
old_data TEXT,
new_data TEXT,
change_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
change_user VARCHAR(100),
processed BOOLEAN DEFAULT false
);
We can modify the column as needed. The idea is to capture the current and new data for any changes (INSERT, UPDATE or DELETE operation) in the table.
Next, we need to create an intermediary function to convert the old and new data into a full joined text as follows:
CREATE OR REPLACE FUNCTION transaction_row_to_text(rec transactions) RETURNS TEXT AS $$
BEGIN
RETURN '{' ||
'"transaction_id":' || COALESCE(rec.transaction_id::TEXT, 'null') || ',' ||
'"transaction_code":"' || COALESCE(rec.transaction_code, '') || '",' ||
'"outlet_id":' || COALESCE(rec.outlet_id::TEXT, 'null') || ',' ||
'"transaction_date":"' || COALESCE(rec.transaction_date::TEXT, '') || '",' ||
'"total_amount":' || COALESCE(rec.total_amount::TEXT, 'null') || ',' ||
'"total_discount":' || COALESCE(rec.total_discount::TEXT, 'null') || ',' ||
'"payment_method":"' || COALESCE(rec.payment_method, '') || '",' ||
'"cashier_name":"' || COALESCE(rec.cashier_name, '') || '",' ||
'"customer_id":"' || COALESCE(rec.customer_id, '') || '",' ||
'"status":"' || COALESCE(rec.status, '') || '",' ||
'"created_at":"' || COALESCE(rec.created_at::TEXT, '') || '"' ||
'}';
END;
$$ LANGUAGE plpgsql;
Setup Trigger to Each Table
Once the function is ready, the next step is to create a function that returns a TRIGGER in the database. This function then will be called whenever any changes are made to the table we want to monitor.
CREATE OR REPLACE FUNCTION capture_transaction_changes() RETURNS TRIGGER AS $$
BEGIN
IF (TG_OP = 'DELETE') THEN
INSERT INTO transaction_cdc_log (table_name, operation, record_id, old_data, new_data, change_user)
VALUES (TG_TABLE_NAME, TG_OP, OLD.transaction_id, transaction_row_to_text(OLD), NULL, current_user);
RETURN OLD;
ELSIF (TG_OP = 'UPDATE') THEN
INSERT INTO transaction_cdc_log (table_name, operation, record_id, old_data, new_data, change_user)
VALUES (TG_TABLE_NAME, TG_OP, NEW.transaction_id, transaction_row_to_text(OLD), transaction_row_to_text(NEW), current_user);
RETURN NEW;
ELSIF (TG_OP = 'INSERT') THEN
INSERT INTO transaction_cdc_log (table_name, operation, record_id, old_data, new_data, change_user)
VALUES (TG_TABLE_NAME, TG_OP, NEW.transaction_id, NULL, transaction_row_to_text(NEW), current_user);
RETURN NEW;
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
The function above returns a TRIGGER which then can be used to monitor the desired table like the following:
CREATE TRIGGER transaction_cdc_trigger
AFTER INSERT OR UPDATE OR DELETE ON transactions -- this is the table name
FOR EACH ROW EXECUTE PROCEDURE capture_transaction_changes();
Up until this point, you may realize that we need to create those functions for each table we want to monitor. As mentioned in the Overview section, this is an approach we can use if there is no option to upgrade the database version. Of course I recommend upgrading so these processes can be handled seamlessly using more appropriate tools.
Setup Kafka Connect to CDC Log.
Once the function is ready, we can Kafka Connect with JDBC Source Connectors to perform periodic pooling and publish this as Kafka Topic. The high level configuration file is as follows:
{
"name": "pos-cdc-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "2",
"connection.url": "jdbc:postgresql://pos-db:5432/pos_db",
"connection.user": "your_username",
"connection.password": "your_password",
"mode": "timestamp+incrementing",
"incrementing.column.name": "cdc_id",
"timestamp.column.name": "change_timestamp",
"table.whitelist": "transaction_cdc_log",
"topic.prefix": "pos-",
"poll.interval.ms": "1000",
"batch.max.rows": "100",
"timestamp.delay.interval.ms": "0",
"validate.non.null": "false",
"transforms": "createKey,extractInt",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "cdc_id",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "cdc_id"
}
}
Setup Snowpipe Streaming
Once the data changes are published via Kafka, we can configure Snowpipe Streaming to ingest the data into Snowflake. Please note that to handle ingestion efficiently, we need to use the ‘batch’ approach rather than insert. The batch period will depend on the configuration, so we can set this in seconds (the config above used 1 second). The high level python script is as follows:
#!/usr/bin/env python3
"""
Snowpipe Streaming Client with Batch Processing
Consumes CDC events from Kafka and streams them to Snowflake in batches
"""
import os
import sys
import json
import time
import logging
import signal
from datetime import datetime
from collections import defaultdict
from typing import Optional, Callable
from kafka import KafkaConsumer
import snowflake.connector
from dotenv import load_dotenv
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
class SnowpipeStreamingClient:
def __init__(self):
# Kafka configuration
self.kafka_bootstrap_servers = os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'kafka:9092')
self.kafka_group_id = os.getenv('KAFKA_GROUP_ID', 'snowpipe-consumer-group')
# Snowflake configuration
self.snowflake_account = os.getenv('SNOWFLAKE_ACCOUNT')
self.snowflake_user = os.getenv('SNOWFLAKE_USER')
self.snowflake_password = os.getenv('SNOWFLAKE_PASSWORD')
self.snowflake_database = os.getenv('SNOWFLAKE_DATABASE', 'KF_DEMO')
self.snowflake_warehouse = os.getenv('SNOWFLAKE_WAREHOUSE', 'COMPUTE_WH_GEN2')
self.snowflake_role = os.getenv('SNOWFLAKE_ROLE', 'ACCOUNTADMIN')
# Batch configuration
self.batch_size = int(os.getenv('BATCH_SIZE', '50'))
self.batch_timeout = float(os.getenv('BATCH_TIMEOUT', '2.0')) # seconds
# Validate Snowflake credentials
if not all([self.snowflake_account, self.snowflake_user, self.snowflake_password]):
raise ValueError("Missing Snowflake credentials. Please set SNOWFLAKE_ACCOUNT, SNOWFLAKE_USER, and SNOWFLAKE_PASSWORD environment variables.")
# Initialize connections
self.snowflake_conn = None
self.consumer = None
# Batch buffers by topic
self.batches = {
'pos-transaction_cdc_log': []
}
self.last_flush_time = time.time()
# Stats
self.records_processed = 0
self.batches_flushed = 0
self.errors = 0
self.last_report_time = time.time()
self._shutdown_requested = False
self._setup_signal_handlers()
logger.info(f"Snowpipe Streaming Client initialized (Batch Size: {self.batch_size}, Timeout: {self.batch_timeout}s)")
def _setup_signal_handlers(self) -> None:
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum: int, frame) -> None:
logger.info(f"Received signal {signum}, initiating graceful shutdown...")
self._shutdown_requested = True
def _deduplicate_batch(self, records: list, key_field: str) -> list:
"""Deduplicate records by key, keeping only the latest (last) occurrence"""
seen = {}
for idx, record in enumerate(records):
new_data = self.parse_json_field(record.get('new_data'))
if new_data and record.get('operation') != 'DELETE':
key = new_data.get(key_field)
if key is not None:
seen[key] = (idx, record)
return [record for idx, record in sorted(seen.values(), key=lambda x: x[0])]
def connect_snowflake(self):
"""Connect to Snowflake"""
max_retries = 5
for i in range(max_retries):
try:
self.snowflake_conn = snowflake.connector.connect(
account=self.snowflake_account,
user=self.snowflake_user,
password=self.snowflake_password,
database=self.snowflake_database,
warehouse=self.snowflake_warehouse,
role=self.snowflake_role
)
logger.info(f"Connected to Snowflake: {self.snowflake_account}")
return
except Exception as e:
if i < max_retries - 1:
logger.warning(f"Snowflake connection attempt {i+1} failed: {e}")
logger.info("Retrying in 10 seconds...")
time.sleep(10)
else:
raise
def connect_kafka(self):
"""Connect to Kafka and subscribe to topics"""
topics = list(self.batches.keys())
max_retries = 10
for i in range(max_retries):
try:
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=self.kafka_bootstrap_servers,
group_id=self.kafka_group_id,
auto_offset_reset='earliest',
enable_auto_commit=True,
auto_commit_interval_ms=1000,
max_poll_records=500,
max_poll_interval_ms=300000,
session_timeout_ms=30000,
heartbeat_interval_ms=10000,
fetch_min_bytes=1,
fetch_max_wait_ms=500,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
logger.info(f"Connected to Kafka: {self.kafka_bootstrap_servers}")
logger.info(f"Subscribed to topics: {', '.join(topics)}")
return
except Exception as e:
if i < max_retries - 1:
logger.warning(f"Kafka connection attempt {i+1} failed: {e}")
logger.info("Retrying in 10 seconds...")
time.sleep(10)
else:
raise
def parse_json_field(self, json_str):
"""Parse JSON-like string from PostgreSQL trigger"""
if not json_str:
return None
try:
return json.loads(json_str)
except json.JSONDecodeError:
return json_str
def flush_transaction_batch(self, records: list) -> None:
"""Flush a batch of transaction records to Snowflake"""
if not records:
return
deduped_records = self._deduplicate_batch(records, 'transaction_id')
if not deduped_records:
return
try:
cursor = self.snowflake_conn.cursor()
values_list = []
params = []
for record in deduped_records:
new_data = self.parse_json_field(record.get('new_data'))
values_list.append("(%s, %s, %s, %s::TIMESTAMP_NTZ, %s, %s, %s, %s, %s, %s, %s::TIMESTAMP_NTZ)")
params.extend([
new_data.get('transaction_id'),
new_data.get('transaction_code'),
new_data.get('outlet_id'),
new_data.get('transaction_date'),
new_data.get('total_amount'),
new_data.get('total_discount'),
new_data.get('payment_method'),
new_data.get('cashier_name'),
new_data.get('customer_id'),
new_data.get('status'),
new_data.get('created_at')
])
if not values_list:
cursor.close()
return
values_clause = ",\n".join(values_list)
merge_sql = f"""
MERGE INTO {self.snowflake_database}.POS_DATA.transactions t
USING (
SELECT
column1 as transaction_id,
column2 as transaction_code,
column3 as outlet_id,
column4 as transaction_date,
column5 as total_amount,
column6 as total_discount,
column7 as payment_method,
column8 as cashier_name,
column9 as customer_id,
column10 as status,
column11 as created_at
FROM VALUES {values_clause}
) s
ON t.transaction_id = s.transaction_id
WHEN MATCHED THEN UPDATE SET
transaction_code = s.transaction_code,
outlet_id = s.outlet_id,
transaction_date = s.transaction_date,
total_amount = s.total_amount,
total_discount = s.total_discount,
payment_method = s.payment_method,
cashier_name = s.cashier_name,
customer_id = s.customer_id,
status = s.status
WHEN NOT MATCHED THEN INSERT (
transaction_id, transaction_code, outlet_id, transaction_date,
total_amount, total_discount, payment_method, cashier_name,
customer_id, status, created_at
) VALUES (
s.transaction_id, s.transaction_code, s.outlet_id, s.transaction_date,
s.total_amount, s.total_discount, s.payment_method, s.cashier_name,
s.customer_id, s.status, s.created_at
)
"""
cursor.execute(merge_sql, params)
cursor.close()
self.batches_flushed += 1
except Exception as e:
logger.error(f"Error flushing transaction batch ({len(deduped_records)} records): {e}")
self.errors += len(deduped_records)
def flush_batches(self, force=False):
"""Flush all batches to Snowflake if they meet size/timeout thresholds"""
current_time = time.time()
time_since_last_flush = current_time - self.last_flush_time
for topic, batch in self.batches.items():
should_flush = force or len(batch) >= self.batch_size or time_since_last_flush >= self.batch_timeout
if should_flush and batch:
if topic == 'pos-transaction_cdc_log':
self.flush_transaction_batch(batch)
self.records_processed += len(batch)
self.batches[topic].clear()
if force or time_since_last_flush >= self.batch_timeout:
self.last_flush_time = current_time
def process_message(self, message):
"""Add message to appropriate batch buffer"""
try:
topic = message.topic
kafka_value = message.value
# Extract payload from Kafka Connect wrapper
if isinstance(kafka_value, dict) and 'payload' in kafka_value:
record = kafka_value['payload']
else:
record = kafka_value
if not record or not isinstance(record, dict):
logger.warning(f"Invalid record structure: {record}")
self.errors += 1
return
# Add to batch buffer
if topic in self.batches:
self.batches[topic].append(record)
else:
logger.warning(f"Unknown topic: {topic}")
return
# Flush if any batch reaches size threshold
self.flush_batches(force=False)
except Exception as e:
logger.error(f"Error processing message: {e}")
self.errors += 1
def run(self):
"""Main loop to consume and process messages"""
logger.info("=" * 80)
logger.info("Snowpipe Streaming Client Started (Batch Mode)")
logger.info(f"Kafka: {self.kafka_bootstrap_servers}")
logger.info(f"Snowflake: {self.snowflake_account}/{self.snowflake_database}")
logger.info(f"Batch Size: {self.batch_size} | Timeout: {self.batch_timeout}s")
logger.info("=" * 80)
logger.info("Waiting 30 seconds for Kafka Connect to be ready...")
time.sleep(30)
# Connect
self.connect_snowflake()
self.connect_kafka()
logger.info("Ready to process CDC events in batch mode")
try:
while not self._shutdown_requested:
messages = self.consumer.poll(timeout_ms=1000, max_records=self.batch_size)
for topic_partition, records in messages.items():
for message in records:
self.process_message(message)
self.flush_batches(force=False)
if time.time() - self.last_report_time >= 10:
total_buffered = sum(len(b) for b in self.batches.values())
logger.info(f"Processed: {self.records_processed} | Batches: {self.batches_flushed} | "
f"Buffered: {total_buffered} | Errors: {self.errors}")
self.last_report_time = time.time()
except Exception as e:
logger.error(f"Fatal error: {e}")
finally:
logger.info("Shutting down streaming client...")
self.flush_batches(force=True)
self._cleanup()
def _cleanup(self) -> None:
if self.consumer:
self.consumer.close()
if self.snowflake_conn:
self.snowflake_conn.close()
logger.info("=" * 80)
logger.info("Streaming Client Stopped")
logger.info(f"Total Records Processed: {self.records_processed}")
logger.info(f"Total Batches Flushed: {self.batches_flushed}")
logger.info(f"Total Errors: {self.errors}")
logger.info("=" * 80)
if __name__ == '__main__':
try:
client = SnowpipeStreamingClient()
client.run()
except Exception as e:
logger.error(f"Failed to start streaming client: {e}")
sys.exit(1)
Create Docker Compose
To deploy this in a development environment easily, we can use Docker. This is the sample docker compose file we can use (assuming the update or new transactions are done separately).
services:
# Zookeeper
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
networks:
- pharma-network
healthcheck:
test: ["CMD", "nc", "-z", "localhost", "2181"]
interval: 10s
timeout: 5s
retries: 5
# Kafka Broker
kafka:
image: confluentinc/cp-kafka:7.5.0
container_name: kafka
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
networks:
- pharma-network
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"]
interval: 10s
timeout: 10s
retries: 5
# Kafka Connect
kafka-connect:
image: confluentinc/cp-kafka-connect:7.5.0
container_name: kafka-connect
depends_on:
kafka:
condition: service_healthy
pos-db:
condition: service_started
promo-db:
condition: service_started
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "connect-group"
CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
volumes:
- ./kafka-connect/connectors:/etc/kafka-connect/connectors
networks:
- pos-network
command:
- bash
- -c
- |
echo "Installing JDBC Connector..."
confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.7.4
echo "Downloading PostgreSQL JDBC Driver..."
cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib
curl -O https://jdbc.postgresql.org/download/postgresql-42.6.0.jar
echo "Starting Kafka Connect..."
/etc/confluent/docker/run
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8083/"]
interval: 15s
timeout: 10s
retries: 10
# Snowpipe Streaming Client
snowpipe-streaming:
build:
context: ./snowpipe
dockerfile: Dockerfile
container_name: snowpipe-streaming
depends_on:
- kafka
env_file:
- .env
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
KAFKA_GROUP_ID: snowpipe-consumer-group
networks:
- pos-network
restart: unless-stopped
# Connector Setup Service (run once to configure Kafka Connect)
connector-setup:
image: curlimages/curl:latest
container_name: connector-setup
depends_on:
kafka-connect:
condition: service_healthy
volumes:
- ./kafka-connect:/kafka-connect
networks:
- pos-network
command:
- /bin/sh
- -c
- |
echo "Waiting for Kafka Connect to be fully ready..."
sleep 30
echo "Creating transaction CDC Source Connector..."
curl -X POST http://kafka-connect:8083/connectors \
-H "Content-Type: application/json" \
-d @/kafka-connect/pos-source.json
echo "Connector setup complete!"
echo "Listing all connectors..."
curl -X GET http://kafka-connect:8083/connectors
networks:
pos-network:
driver: bridge
volumes:
promo_data:
pos_data:
Snowflake Setup
We need to create the proper database, schema and tables so store the ingestion result. This is a sample query we can use:
-- Create database
CREATE DATABASE IF NOT EXISTS DB_DEMO;
USE DATABASE DB_DEMO;
-- Create schemas
CREATE SCHEMA IF NOT EXISTS POS_DATA;
-- ============================================================================
-- POS_DATA Schema
-- ============================================================================
USE SCHEMA POS_DATA;
CREATE TABLE IF NOT EXISTS transactions (
transaction_id INTEGER PRIMARY KEY,
transaction_code VARCHAR(100),
outlet_id INTEGER,
transaction_date TIMESTAMP_NTZ,
total_amount NUMBER(15, 2),
total_discount NUMBER(15, 2),
payment_method VARCHAR(50),
cashier_name VARCHAR(255),
customer_id VARCHAR(100),
status VARCHAR(20),
created_at TIMESTAMP_NTZ,
snowflake_ingested_at TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);
ALTER TABLE transactions ADD CONSTRAINT unique_transaction_code UNIQUE (transaction_code);
We also need to store the credentials in an .env file to enable the secure connection
SNOWFLAKE_ACCOUNT=ORGNAME-ACCOUNTNAME
SNOWFLAKE_USER=YOUR_USER
SNOWFLAKE_PASSWORD=USE_PAT_TOKEN
SNOWFLAKE_DATABASE=DB_DEMO
SNOWFLAKE_SCHEMA=POS_DATA
SNOWFLAKE_WAREHOUSE=YOUR_COMPUTE
SNOWFLAKE_ROLE=YOUR_ROLE
# Kafka Configuration
KAFKA_BOOTSTRAP_SERVERS=kafka:9092
KAFKA_GROUP_ID=snowpipe-consumer-group
# PostgreSQL - POS Database
POS_DB_HOST=pos-db
POS_DB_PORT=5432
POS_DB_NAME=pos_db
POS_DB_USER=your_username
POS_DB_PASSWORD=your_password
Create Shell Script
Once everything is ready, we can set create another script to run the whole process.
#!/bin/bash
# CDC Demo Staged Startup Script
# Starts services in stages for better reliability with dynamic waiting
set -e
# Helper function: Wait for service with health check
wait_for_service() {
local SERVICE_NAME=$1
local CHECK_COMMAND=$2
local MAX_WAIT=${3:-120}
local WAIT_TIME=0
echo "⏳ Waiting for ${SERVICE_NAME} to be ready..."
while [ $WAIT_TIME -lt $MAX_WAIT ]; do
if eval "$CHECK_COMMAND" > /dev/null 2>&1; then
echo "✓ ${SERVICE_NAME} is ready (waited ${WAIT_TIME}s)"
return 0
fi
sleep 5
WAIT_TIME=$((WAIT_TIME + 5))
if [ $((WAIT_TIME % 15)) -eq 0 ]; then
echo " Still waiting... (${WAIT_TIME}s/${MAX_WAIT}s)"
fi
done
echo "❌ Timeout: ${SERVICE_NAME} didn't start within ${MAX_WAIT} seconds"
return 1
}
echo "======================================================================"
echo " Kimia Farma CDC Demo - Staged Startup"
echo "======================================================================"
echo ""
# Check Docker
if ! command -v docker &> /dev/null; then
echo "❌ Error: Docker is not installed"
echo " Please install Docker from https://www.docker.com/get-started"
exit 1
fi
if ! docker info &> /dev/null; then
echo "❌ Error: Docker daemon is not running"
echo " Please start Docker Desktop"
exit 1
fi
echo "✓ Docker is running"
# Check Docker Compose
if ! command -v docker-compose &> /dev/null; then
echo "❌ Error: Docker Compose is not installed"
exit 1
fi
echo "✓ Docker Compose is available"
# Check .env file
if [ ! -f .env ]; then
echo ""
echo "⚠️ Warning: .env file not found"
echo " Creating .env from env.example..."
if [ -f env.example ]; then
cp env.example .env
echo "📝 Created .env from env.example"
echo " Please edit .env and add your Snowflake credentials:"
echo " - SNOWFLAKE_ACCOUNT"
echo " - SNOWFLAKE_USER"
echo " - SNOWFLAKE_PASSWORD"
echo ""
read -p "Press Enter after updating .env file..."
else
echo "❌ Error: env.example not found"
exit 1
fi
fi
echo "✓ .env file exists"
# Check Snowflake credentials
if grep -q "your-account" .env 2>/dev/null; then
echo "⚠️ Warning: Snowflake credentials may not be configured"
read -p "Continue anyway? (y/N) " -n 1 -r
echo
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
exit 1
fi
else
echo "✓ Snowflake credentials are configured"
fi
# Check if containers are already running
if docker-compose ps 2>/dev/null | grep -q "Up"; then
echo ""
echo "⚠️ Some containers are already running"
read -p "Stop and restart all containers? (y/N) " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]; then
echo "Stopping existing containers..."
docker-compose down
else
echo "Exiting..."
exit 0
fi
fi
echo ""
echo "======================================================================"
echo " STAGE 1: Starting Kafka Infrastructure"
echo "======================================================================"
echo ""
echo "Starting Zookeeper, Kafka, and Kafka Connect..."
docker-compose up -d zookeeper kafka kafka-connect
# Wait for Kafka to be ready
if ! wait_for_service "Kafka broker" \
"docker exec kafka kafka-broker-api-versions --bootstrap-server localhost:9092" \
120; then
echo " Check logs: docker-compose logs kafka"
exit 1
fi
# Wait for Kafka Connect to be ready
if ! wait_for_service "Kafka Connect" \
"curl -s http://localhost:8083/" \
120; then
echo " Check logs: docker-compose logs kafka-connect"
exit 1
fi
echo "Registering Kafka Connect connectors..."
docker-compose up -d connector-setup
# Wait for connectors to be registered (with automatic fallback)
MAX_WAIT=120
WAIT_TIME=0
echo "⏳ Waiting for connector registration..."
while [ $WAIT_TIME -lt $MAX_WAIT ]; do
CONNECTORS=$(curl -s http://localhost:8083/connectors 2>/dev/null || echo "[]")
if echo "$CONNECTORS" | grep -q "pos-cdc-source"; then
echo "✓ Connectors registered successfully (waited ${WAIT_TIME}s)"
break
fi
sleep 5
WAIT_TIME=$((WAIT_TIME + 5))
if [ $((WAIT_TIME % 15)) -eq 0 ]; then
echo " Still waiting... (${WAIT_TIME}s/${MAX_WAIT}s)"
fi
done
# If auto-registration failed, try manual registration
if [ $WAIT_TIME -ge $MAX_WAIT ]; then
echo "⚠️ Auto-registration timed out, attempting manual registration..."
# Manual registration of POS connector
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @kafka-connect/pos-source.json > /dev/null 2>&1 || true
sleep 5
# Verify manual registration
CONNECTORS=$(curl -s http://localhost:8083/connectors 2>/dev/null || echo "[]")
if echo "$CONNECTORS" | grep -q "pos-cdc-source"; then
echo "✓ Connectors manually registered successfully"
else
echo "❌ Failed to register connectors automatically"
echo " You can register manually later with:"
echo " curl -X POST http://localhost:8083/connectors -H 'Content-Type: application/json' -d @kafka-connect/pos-source.json"
fi
fi
# Show registered connectors
REGISTERED=$(curl -s http://localhost:8083/connectors 2>/dev/null || echo "[]")
echo "📋 Registered connectors: $REGISTERED"
echo ""
echo "======================================================================"
echo " STAGE 2: Starting Data Pipeline"
echo "======================================================================"
echo ""
echo "Starting Snowpipe Streaming..."
docker-compose up -d snowpipe-streaming
# Give generators a moment to initialize
echo "⏳ Waiting for generators to initialize..."
sleep 5
# Check if generators are running
if docker-compose ps | grep -q "snowpipe-streaming.*Up"; then
echo "✓ Snowpipe streaming client is running"
else
echo "⚠️ Warning: Snowpipe streaming may not have started properly"
fi
echo ""
echo "======================================================================"
echo " ✓ All Stages Complete!"
echo "======================================================================"
echo ""
echo "Services Status:"
docker-compose ps
echo ""
echo "======================================================================"
echo "Monitoring Commands:"
echo "======================================================================"
echo ""
echo "Pipeline Status:"
echo " • Snowpipe Streaming: docker-compose logs -f snowpipe-streaming"
echo " • Kafka Connect: docker-compose logs -f kafka-connect"
echo ""
echo "Health Checks:"
echo " • View all services: docker-compose ps"
echo " • Check connectors: curl http://localhost:8083/connectors"
echo " • Connector status: curl http://localhost:8083/connectors/pos-cdc-source/status | jq"
echo " • Check Kafka topics: docker exec kafka kafka-topics --list --bootstrap-server localhost:9092"
echo " • View CDC logs: docker exec pos-db psql -U your_user -d pos_db -c 'SELECT COUNT(*) FROM transaction_cdc_log;'"
echo ""
echo "Database Access:"
echo " • POS Database: docker exec -it pos-db psql -U your_user -d pos_db"
echo " • Or via local: psql -h localhost -p 5434 -U your_user -d pos_db"
echo " • Password: your_password"
echo ""
echo "Next Steps:"
echo " 1. Verify CDC is working:"
echo " docker exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic pos-transaction_cdc_log --max-messages 5"
echo " 2. Monitor data flow:"
echo " docker-compose logs -f snowpipe-streaming"
echo " 3. Check Snowflake: Run sample queries"
echo ""
echo "To stop the demo:"
echo " docker-compose down"
echo ""
echo "To clean everything and start fresh:"
echo " docker-compose down -v"
echo ""
echo "======================================================================"
Ingestion Result
Once we run the start_demo.sh script, all of the changes in our monitored table will be reflected in the Snowflake tables within seconds. I try to simulate with 5 TPS for one and the latency is less than 10 seconds between the inserted data in PostgreSQL until it appears in Snowflake!
Potential Improvements
This approach provides a solution to deliver near real-time ingestion using a legacy database. In the future, we can add more detailed solution to cover the following:
Scalability
While this workaround can handle 5 TPS easily from a single table, a thorough testing is required to monitor the database performance like CPU and I/O. Another component to configure is the Kafka Connect, especially the number of workers to support expected latency and throughput in the production environment.
Observability
Several metrics may need to be captured when deploying this in production, such as lag behind the source DB, Kafka consumer lag, and Snowpipe Streaming error rates. The log can be stored and consumed by observability tools like Prometheus/Grafana, or other managed services.
Dynamic Function
For production scale, the function creation or trigger generation may require another automation process. By querying table information_schema which contains metadata, the supporting functions and triggers can be created dynamically without manual script development.
Conclusion
This article showed how we can provide a workaround when facing legacy systems. The approach chosen here may not be the most ideal way to ingest the data, but when upgrading the PostgreSQL version is not possible due to many factors, this approach works quite well to simulate the real-world workload. Please note that the additional table will consume more storage in your database, so we may need another scheduler to truncate the table periodically. Hope you find this article useful!
Top comments (0)