DEV Community

Cover image for How to Ingest Your Legacy Database to Snowflake
Reza Brianca
Reza Brianca

Posted on

How to Ingest Your Legacy Database to Snowflake

Overview

I recently came across a discussion with potential customers about their ingestion pain points. While they want to have a near real-time ingestion experience, they are currently using PostgreSQL version 9.2 where it’s already out of support since 8 years ago (2017). It was interesting because given the lack of IT resources (it was a small team), the database version upgrade will be a stretch for them. This is also on top of another challenge to convince management since the result of software upgrade is (most likely) intangible. Currently they have more than 15 TB with more than 200k transactions per day. We were asked on how to do the ingestion without having to upgrade their systems along with the cost consciousness mindset.

In this article, I want to share what was the approach I decided to take given those limitations. This approach may not be the perfect solution (and I welcome to hear more if you have more efficient solutions), but I think this can be relevant for most enterprises since I believe many of them are running a legacy system, the budget is minimal (if any), and there is no direct impact to revenue when the IT team wants to perform an improvement in the systems.

Solution Design

Since the current PostgreSQL version is not supported with the recent CDC tools like Debezium (the stable version from Debezium requires PostgreSQL version 12 and above), we need to create additional tables inside the database to capture these changes. These tables then need to be registered to Kafka in order to enable Snowpipe Streaming. The high level diagram looks like this:
High Level Architecture

Setup Procedure to Capture Table Changes

First, we need to create a table to capture the changes we want to input later. This is the SQL pseudocode:

CREATE TABLE transaction_cdc_log (
    cdc_id SERIAL PRIMARY KEY,
    table_name VARCHAR(100) NOT NULL,
    operation VARCHAR(10) NOT NULL CHECK (operation IN ('INSERT', 'UPDATE', 'DELETE')),
    record_id INTEGER NOT NULL,
    old_data TEXT,
    new_data TEXT,
    change_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    change_user VARCHAR(100),
    processed BOOLEAN DEFAULT false
);
Enter fullscreen mode Exit fullscreen mode

We can modify the column as needed. The idea is to capture the current and new data for any changes (INSERT, UPDATE or DELETE operation) in the table.

Next, we need to create an intermediary function to convert the old and new data into a full joined text as follows:

CREATE OR REPLACE FUNCTION transaction_row_to_text(rec transactions) RETURNS TEXT AS $$
BEGIN
    RETURN '{' ||
        '"transaction_id":' || COALESCE(rec.transaction_id::TEXT, 'null') || ',' ||
        '"transaction_code":"' || COALESCE(rec.transaction_code, '') || '",' ||
        '"outlet_id":' || COALESCE(rec.outlet_id::TEXT, 'null') || ',' ||
        '"transaction_date":"' || COALESCE(rec.transaction_date::TEXT, '') || '",' ||
        '"total_amount":' || COALESCE(rec.total_amount::TEXT, 'null') || ',' ||
        '"total_discount":' || COALESCE(rec.total_discount::TEXT, 'null') || ',' ||
        '"payment_method":"' || COALESCE(rec.payment_method, '') || '",' ||
        '"cashier_name":"' || COALESCE(rec.cashier_name, '') || '",' ||
        '"customer_id":"' || COALESCE(rec.customer_id, '') || '",' ||
        '"status":"' || COALESCE(rec.status, '') || '",' ||
        '"created_at":"' || COALESCE(rec.created_at::TEXT, '') || '"' ||
        '}';
END;
$$ LANGUAGE plpgsql;
Enter fullscreen mode Exit fullscreen mode

Setup Trigger to Each Table

Once the function is ready, the next step is to create a function that returns a TRIGGER in the database. This function then will be called whenever any changes are made to the table we want to monitor.

CREATE OR REPLACE FUNCTION capture_transaction_changes() RETURNS TRIGGER AS $$
BEGIN
    IF (TG_OP = 'DELETE') THEN
        INSERT INTO transaction_cdc_log (table_name, operation, record_id, old_data, new_data, change_user)
        VALUES (TG_TABLE_NAME, TG_OP, OLD.transaction_id, transaction_row_to_text(OLD), NULL, current_user);
        RETURN OLD;
    ELSIF (TG_OP = 'UPDATE') THEN
        INSERT INTO transaction_cdc_log (table_name, operation, record_id, old_data, new_data, change_user)
        VALUES (TG_TABLE_NAME, TG_OP, NEW.transaction_id, transaction_row_to_text(OLD), transaction_row_to_text(NEW), current_user);
        RETURN NEW;
    ELSIF (TG_OP = 'INSERT') THEN
        INSERT INTO transaction_cdc_log (table_name, operation, record_id, old_data, new_data, change_user)
        VALUES (TG_TABLE_NAME, TG_OP, NEW.transaction_id, NULL, transaction_row_to_text(NEW), current_user);
        RETURN NEW;
    END IF;
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;
Enter fullscreen mode Exit fullscreen mode

The function above returns a TRIGGER which then can be used to monitor the desired table like the following:

CREATE TRIGGER transaction_cdc_trigger
AFTER INSERT OR UPDATE OR DELETE ON transactions -- this is the table name
FOR EACH ROW EXECUTE PROCEDURE capture_transaction_changes();
Enter fullscreen mode Exit fullscreen mode

Up until this point, you may realize that we need to create those functions for each table we want to monitor. As mentioned in the Overview section, this is an approach we can use if there is no option to upgrade the database version. Of course I recommend upgrading so these processes can be handled seamlessly using more appropriate tools.

Setup Kafka Connect to CDC Log.

Once the function is ready, we can Kafka Connect with JDBC Source Connectors to perform periodic pooling and publish this as Kafka Topic. The high level configuration file is as follows:

{
  "name": "pos-cdc-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "2",
    "connection.url": "jdbc:postgresql://pos-db:5432/pos_db",
    "connection.user": "your_username",
    "connection.password": "your_password",
    "mode": "timestamp+incrementing",
    "incrementing.column.name": "cdc_id",
    "timestamp.column.name": "change_timestamp",
    "table.whitelist": "transaction_cdc_log",
    "topic.prefix": "pos-",
    "poll.interval.ms": "1000",
    "batch.max.rows": "100",
    "timestamp.delay.interval.ms": "0",
    "validate.non.null": "false",
    "transforms": "createKey,extractInt",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "cdc_id",
    "transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractInt.field": "cdc_id"
  }
}
Enter fullscreen mode Exit fullscreen mode

Setup Snowpipe Streaming

Once the data changes are published via Kafka, we can configure Snowpipe Streaming to ingest the data into Snowflake. Please note that to handle ingestion efficiently, we need to use the ‘batch’ approach rather than insert. The batch period will depend on the configuration, so we can set this in seconds (the config above used 1 second). The high level python script is as follows:

#!/usr/bin/env python3
"""
Snowpipe Streaming Client with Batch Processing
Consumes CDC events from Kafka and streams them to Snowflake in batches
"""
import os
import sys
import json
import time
import logging
import signal
from datetime import datetime
from collections import defaultdict
from typing import Optional, Callable
from kafka import KafkaConsumer
import snowflake.connector
from dotenv import load_dotenv

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)

# Load environment variables
load_dotenv()

class SnowpipeStreamingClient:
    def __init__(self):
        # Kafka configuration
        self.kafka_bootstrap_servers = os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'kafka:9092')
        self.kafka_group_id = os.getenv('KAFKA_GROUP_ID', 'snowpipe-consumer-group')

        # Snowflake configuration
        self.snowflake_account = os.getenv('SNOWFLAKE_ACCOUNT')
        self.snowflake_user = os.getenv('SNOWFLAKE_USER')
        self.snowflake_password = os.getenv('SNOWFLAKE_PASSWORD')
        self.snowflake_database = os.getenv('SNOWFLAKE_DATABASE', 'KF_DEMO')
        self.snowflake_warehouse = os.getenv('SNOWFLAKE_WAREHOUSE', 'COMPUTE_WH_GEN2')
        self.snowflake_role = os.getenv('SNOWFLAKE_ROLE', 'ACCOUNTADMIN')

        # Batch configuration
        self.batch_size = int(os.getenv('BATCH_SIZE', '50'))
        self.batch_timeout = float(os.getenv('BATCH_TIMEOUT', '2.0'))  # seconds

        # Validate Snowflake credentials
        if not all([self.snowflake_account, self.snowflake_user, self.snowflake_password]):
            raise ValueError("Missing Snowflake credentials. Please set SNOWFLAKE_ACCOUNT, SNOWFLAKE_USER, and SNOWFLAKE_PASSWORD environment variables.")

        # Initialize connections
        self.snowflake_conn = None
        self.consumer = None

        # Batch buffers by topic
        self.batches = {
            'pos-transaction_cdc_log': []
        }
        self.last_flush_time = time.time()

        # Stats
        self.records_processed = 0
        self.batches_flushed = 0
        self.errors = 0
        self.last_report_time = time.time()

        self._shutdown_requested = False
        self._setup_signal_handlers()

        logger.info(f"Snowpipe Streaming Client initialized (Batch Size: {self.batch_size}, Timeout: {self.batch_timeout}s)")

    def _setup_signal_handlers(self) -> None:
        signal.signal(signal.SIGINT, self._signal_handler)
        signal.signal(signal.SIGTERM, self._signal_handler)

    def _signal_handler(self, signum: int, frame) -> None:
        logger.info(f"Received signal {signum}, initiating graceful shutdown...")
        self._shutdown_requested = True

    def _deduplicate_batch(self, records: list, key_field: str) -> list:
        """Deduplicate records by key, keeping only the latest (last) occurrence"""
        seen = {}
        for idx, record in enumerate(records):
            new_data = self.parse_json_field(record.get('new_data'))
            if new_data and record.get('operation') != 'DELETE':
                key = new_data.get(key_field)
                if key is not None:
                    seen[key] = (idx, record)

        return [record for idx, record in sorted(seen.values(), key=lambda x: x[0])]

    def connect_snowflake(self):
        """Connect to Snowflake"""
        max_retries = 5
        for i in range(max_retries):
            try:
                self.snowflake_conn = snowflake.connector.connect(
                    account=self.snowflake_account,
                    user=self.snowflake_user,
                    password=self.snowflake_password,
                    database=self.snowflake_database,
                    warehouse=self.snowflake_warehouse,
                    role=self.snowflake_role
                )
                logger.info(f"Connected to Snowflake: {self.snowflake_account}")
                return
            except Exception as e:
                if i < max_retries - 1:
                    logger.warning(f"Snowflake connection attempt {i+1} failed: {e}")
                    logger.info("Retrying in 10 seconds...")
                    time.sleep(10)
                else:
                    raise

    def connect_kafka(self):
        """Connect to Kafka and subscribe to topics"""
        topics = list(self.batches.keys())

        max_retries = 10
        for i in range(max_retries):
            try:
                self.consumer = KafkaConsumer(
                    *topics,
                    bootstrap_servers=self.kafka_bootstrap_servers,
                    group_id=self.kafka_group_id,
                    auto_offset_reset='earliest',
                    enable_auto_commit=True,
                    auto_commit_interval_ms=1000,
                    max_poll_records=500,
                    max_poll_interval_ms=300000,
                    session_timeout_ms=30000,
                    heartbeat_interval_ms=10000,
                    fetch_min_bytes=1,
                    fetch_max_wait_ms=500,
                    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
                )
                logger.info(f"Connected to Kafka: {self.kafka_bootstrap_servers}")
                logger.info(f"Subscribed to topics: {', '.join(topics)}")
                return
            except Exception as e:
                if i < max_retries - 1:
                    logger.warning(f"Kafka connection attempt {i+1} failed: {e}")
                    logger.info("Retrying in 10 seconds...")
                    time.sleep(10)
                else:
                    raise

    def parse_json_field(self, json_str):
        """Parse JSON-like string from PostgreSQL trigger"""
        if not json_str:
            return None
        try:
            return json.loads(json_str)
        except json.JSONDecodeError:
            return json_str

    def flush_transaction_batch(self, records: list) -> None:
        """Flush a batch of transaction records to Snowflake"""
        if not records:
            return

        deduped_records = self._deduplicate_batch(records, 'transaction_id')
        if not deduped_records:
            return

        try:
            cursor = self.snowflake_conn.cursor()

            values_list = []
            params = []

            for record in deduped_records:
                new_data = self.parse_json_field(record.get('new_data'))

                values_list.append("(%s, %s, %s, %s::TIMESTAMP_NTZ, %s, %s, %s, %s, %s, %s, %s::TIMESTAMP_NTZ)")
                params.extend([
                    new_data.get('transaction_id'),
                    new_data.get('transaction_code'),
                    new_data.get('outlet_id'),
                    new_data.get('transaction_date'),
                    new_data.get('total_amount'),
                    new_data.get('total_discount'),
                    new_data.get('payment_method'),
                    new_data.get('cashier_name'),
                    new_data.get('customer_id'),
                    new_data.get('status'),
                    new_data.get('created_at')
                ])

            if not values_list:
                cursor.close()
                return

            values_clause = ",\n".join(values_list)

            merge_sql = f"""
                MERGE INTO {self.snowflake_database}.POS_DATA.transactions t
                USING (
                    SELECT 
                        column1 as transaction_id,
                        column2 as transaction_code,
                        column3 as outlet_id,
                        column4 as transaction_date,
                        column5 as total_amount,
                        column6 as total_discount,
                        column7 as payment_method,
                        column8 as cashier_name,
                        column9 as customer_id,
                        column10 as status,
                        column11 as created_at
                    FROM VALUES {values_clause}
                ) s
                ON t.transaction_id = s.transaction_id
                WHEN MATCHED THEN UPDATE SET
                    transaction_code = s.transaction_code,
                    outlet_id = s.outlet_id,
                    transaction_date = s.transaction_date,
                    total_amount = s.total_amount,
                    total_discount = s.total_discount,
                    payment_method = s.payment_method,
                    cashier_name = s.cashier_name,
                    customer_id = s.customer_id,
                    status = s.status
                WHEN NOT MATCHED THEN INSERT (
                    transaction_id, transaction_code, outlet_id, transaction_date,
                    total_amount, total_discount, payment_method, cashier_name,
                    customer_id, status, created_at
                ) VALUES (
                    s.transaction_id, s.transaction_code, s.outlet_id, s.transaction_date,
                    s.total_amount, s.total_discount, s.payment_method, s.cashier_name,
                    s.customer_id, s.status, s.created_at
                )
            """

            cursor.execute(merge_sql, params)
            cursor.close()
            self.batches_flushed += 1

        except Exception as e:
            logger.error(f"Error flushing transaction batch ({len(deduped_records)} records): {e}")
            self.errors += len(deduped_records)

    def flush_batches(self, force=False):
        """Flush all batches to Snowflake if they meet size/timeout thresholds"""
        current_time = time.time()
        time_since_last_flush = current_time - self.last_flush_time

        for topic, batch in self.batches.items():
            should_flush = force or len(batch) >= self.batch_size or time_since_last_flush >= self.batch_timeout

            if should_flush and batch:
                if topic == 'pos-transaction_cdc_log':
                    self.flush_transaction_batch(batch)

                self.records_processed += len(batch)
                self.batches[topic].clear()

        if force or time_since_last_flush >= self.batch_timeout:
            self.last_flush_time = current_time

    def process_message(self, message):
        """Add message to appropriate batch buffer"""
        try:
            topic = message.topic
            kafka_value = message.value

            # Extract payload from Kafka Connect wrapper
            if isinstance(kafka_value, dict) and 'payload' in kafka_value:
                record = kafka_value['payload']
            else:
                record = kafka_value

            if not record or not isinstance(record, dict):
                logger.warning(f"Invalid record structure: {record}")
                self.errors += 1
                return

            # Add to batch buffer
            if topic in self.batches:
                self.batches[topic].append(record)
            else:
                logger.warning(f"Unknown topic: {topic}")
                return

            # Flush if any batch reaches size threshold
            self.flush_batches(force=False)

        except Exception as e:
            logger.error(f"Error processing message: {e}")
            self.errors += 1

    def run(self):
        """Main loop to consume and process messages"""
        logger.info("=" * 80)
        logger.info("Snowpipe Streaming Client Started (Batch Mode)")
        logger.info(f"Kafka: {self.kafka_bootstrap_servers}")
        logger.info(f"Snowflake: {self.snowflake_account}/{self.snowflake_database}")
        logger.info(f"Batch Size: {self.batch_size} | Timeout: {self.batch_timeout}s")
        logger.info("=" * 80)

        logger.info("Waiting 30 seconds for Kafka Connect to be ready...")
        time.sleep(30)

        # Connect
        self.connect_snowflake()
        self.connect_kafka()

        logger.info("Ready to process CDC events in batch mode")

        try:
            while not self._shutdown_requested:
                messages = self.consumer.poll(timeout_ms=1000, max_records=self.batch_size)

                for topic_partition, records in messages.items():
                    for message in records:
                        self.process_message(message)

                self.flush_batches(force=False)

                if time.time() - self.last_report_time >= 10:
                    total_buffered = sum(len(b) for b in self.batches.values())
                    logger.info(f"Processed: {self.records_processed} | Batches: {self.batches_flushed} | "
                               f"Buffered: {total_buffered} | Errors: {self.errors}")
                    self.last_report_time = time.time()

        except Exception as e:
            logger.error(f"Fatal error: {e}")
        finally:
            logger.info("Shutting down streaming client...")
            self.flush_batches(force=True)
        self._cleanup()

    def _cleanup(self) -> None:
        if self.consumer:
            self.consumer.close()
        if self.snowflake_conn:
            self.snowflake_conn.close()

        logger.info("=" * 80)
        logger.info("Streaming Client Stopped")
        logger.info(f"Total Records Processed: {self.records_processed}")
        logger.info(f"Total Batches Flushed: {self.batches_flushed}")
        logger.info(f"Total Errors: {self.errors}")
        logger.info("=" * 80)

if __name__ == '__main__':
    try:
        client = SnowpipeStreamingClient()
        client.run()
    except Exception as e:
        logger.error(f"Failed to start streaming client: {e}")
        sys.exit(1)
Enter fullscreen mode Exit fullscreen mode

Create Docker Compose

To deploy this in a development environment easily, we can use Docker. This is the sample docker compose file we can use (assuming the update or new transactions are done separately).

services:
  # Zookeeper
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"
    networks:
      - pharma-network
    healthcheck:
      test: ["CMD", "nc", "-z", "localhost", "2181"]
      interval: 10s
      timeout: 5s
      retries: 5

  # Kafka Broker
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    container_name: kafka
    depends_on:
      zookeeper:
        condition: service_healthy
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    networks:
      - pharma-network
    healthcheck:
      test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"]
      interval: 10s
      timeout: 10s
      retries: 5

  # Kafka Connect
  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.5.0
    container_name: kafka-connect
    depends_on:
      kafka:
        condition: service_healthy
      pos-db:
        condition: service_started
      promo-db:
        condition: service_started
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "connect-group"
      CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
    volumes:
      - ./kafka-connect/connectors:/etc/kafka-connect/connectors
    networks:
      - pos-network
    command:
      - bash
      - -c
      - |
        echo "Installing JDBC Connector..."
        confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.7.4
        echo "Downloading PostgreSQL JDBC Driver..."
        cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib
        curl -O https://jdbc.postgresql.org/download/postgresql-42.6.0.jar
        echo "Starting Kafka Connect..."
        /etc/confluent/docker/run
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8083/"]
      interval: 15s
      timeout: 10s
      retries: 10

  # Snowpipe Streaming Client
  snowpipe-streaming:
    build:
      context: ./snowpipe
      dockerfile: Dockerfile
    container_name: snowpipe-streaming
    depends_on:
      - kafka
    env_file:
      - .env
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
      KAFKA_GROUP_ID: snowpipe-consumer-group
    networks:
      - pos-network
    restart: unless-stopped

  # Connector Setup Service (run once to configure Kafka Connect)
  connector-setup:
    image: curlimages/curl:latest
    container_name: connector-setup
    depends_on:
      kafka-connect:
        condition: service_healthy
    volumes:
      - ./kafka-connect:/kafka-connect
    networks:
      - pos-network
    command:
      - /bin/sh
      - -c
      - |
        echo "Waiting for Kafka Connect to be fully ready..."
        sleep 30

        echo "Creating transaction CDC Source Connector..."
        curl -X POST http://kafka-connect:8083/connectors \
          -H "Content-Type: application/json" \
          -d @/kafka-connect/pos-source.json

        echo "Connector setup complete!"

        echo "Listing all connectors..."
        curl -X GET http://kafka-connect:8083/connectors

networks:
  pos-network:
    driver: bridge

volumes:
  promo_data:
  pos_data:
Enter fullscreen mode Exit fullscreen mode

Snowflake Setup

We need to create the proper database, schema and tables so store the ingestion result. This is a sample query we can use:

-- Create database
CREATE DATABASE IF NOT EXISTS DB_DEMO;
USE DATABASE DB_DEMO;

-- Create schemas
CREATE SCHEMA IF NOT EXISTS POS_DATA;

-- ============================================================================
-- POS_DATA Schema
-- ============================================================================
USE SCHEMA POS_DATA;

CREATE TABLE IF NOT EXISTS transactions (
    transaction_id INTEGER PRIMARY KEY,
    transaction_code VARCHAR(100),
    outlet_id INTEGER,
    transaction_date TIMESTAMP_NTZ,
    total_amount NUMBER(15, 2),
    total_discount NUMBER(15, 2),
    payment_method VARCHAR(50),
    cashier_name VARCHAR(255),
    customer_id VARCHAR(100),
    status VARCHAR(20),
    created_at TIMESTAMP_NTZ,
    snowflake_ingested_at TIMESTAMP_LTZ DEFAULT CURRENT_TIMESTAMP()
);

ALTER TABLE transactions ADD CONSTRAINT unique_transaction_code UNIQUE (transaction_code);
Enter fullscreen mode Exit fullscreen mode

We also need to store the credentials in an .env file to enable the secure connection

SNOWFLAKE_ACCOUNT=ORGNAME-ACCOUNTNAME
SNOWFLAKE_USER=YOUR_USER
SNOWFLAKE_PASSWORD=USE_PAT_TOKEN
SNOWFLAKE_DATABASE=DB_DEMO
SNOWFLAKE_SCHEMA=POS_DATA
SNOWFLAKE_WAREHOUSE=YOUR_COMPUTE
SNOWFLAKE_ROLE=YOUR_ROLE

# Kafka Configuration
KAFKA_BOOTSTRAP_SERVERS=kafka:9092
KAFKA_GROUP_ID=snowpipe-consumer-group

# PostgreSQL - POS Database
POS_DB_HOST=pos-db
POS_DB_PORT=5432
POS_DB_NAME=pos_db
POS_DB_USER=your_username
POS_DB_PASSWORD=your_password
Enter fullscreen mode Exit fullscreen mode

Create Shell Script

Once everything is ready, we can set create another script to run the whole process.

#!/bin/bash

# CDC Demo Staged Startup Script
# Starts services in stages for better reliability with dynamic waiting

set -e

# Helper function: Wait for service with health check
wait_for_service() {
    local SERVICE_NAME=$1
    local CHECK_COMMAND=$2
    local MAX_WAIT=${3:-120}
    local WAIT_TIME=0

    echo "⏳ Waiting for ${SERVICE_NAME} to be ready..."

    while [ $WAIT_TIME -lt $MAX_WAIT ]; do
        if eval "$CHECK_COMMAND" > /dev/null 2>&1; then
            echo "✓ ${SERVICE_NAME} is ready (waited ${WAIT_TIME}s)"
            return 0
        fi
        sleep 5
        WAIT_TIME=$((WAIT_TIME + 5))
        if [ $((WAIT_TIME % 15)) -eq 0 ]; then
            echo "  Still waiting... (${WAIT_TIME}s/${MAX_WAIT}s)"
        fi
    done

    echo "❌ Timeout: ${SERVICE_NAME} didn't start within ${MAX_WAIT} seconds"
    return 1
}

echo "======================================================================"
echo "  Kimia Farma CDC Demo - Staged Startup"
echo "======================================================================"
echo ""

# Check Docker
if ! command -v docker &> /dev/null; then
    echo "❌ Error: Docker is not installed"
    echo "   Please install Docker from https://www.docker.com/get-started"
    exit 1
fi

if ! docker info &> /dev/null; then
    echo "❌ Error: Docker daemon is not running"
    echo "   Please start Docker Desktop"
    exit 1
fi

echo "✓ Docker is running"

# Check Docker Compose
if ! command -v docker-compose &> /dev/null; then
    echo "❌ Error: Docker Compose is not installed"
    exit 1
fi

echo "✓ Docker Compose is available"

# Check .env file
if [ ! -f .env ]; then
    echo ""
    echo "⚠️  Warning: .env file not found"
    echo "   Creating .env from env.example..."

    if [ -f env.example ]; then
        cp env.example .env
        echo "📝 Created .env from env.example"
        echo "   Please edit .env and add your Snowflake credentials:"
        echo "   - SNOWFLAKE_ACCOUNT"
        echo "   - SNOWFLAKE_USER"
        echo "   - SNOWFLAKE_PASSWORD"
        echo ""
        read -p "Press Enter after updating .env file..."
    else
        echo "❌ Error: env.example not found"
        exit 1
    fi
fi

echo "✓ .env file exists"

# Check Snowflake credentials
if grep -q "your-account" .env 2>/dev/null; then
    echo "⚠️  Warning: Snowflake credentials may not be configured"
    read -p "Continue anyway? (y/N) " -n 1 -r
    echo
    if [[ ! $REPLY =~ ^[Yy]$ ]]; then
        exit 1
    fi
else
    echo "✓ Snowflake credentials are configured"
fi

# Check if containers are already running
if docker-compose ps 2>/dev/null | grep -q "Up"; then
    echo ""
    echo "⚠️  Some containers are already running"
    read -p "Stop and restart all containers? (y/N) " -n 1 -r
    echo
    if [[ $REPLY =~ ^[Yy]$ ]]; then
        echo "Stopping existing containers..."
        docker-compose down
    else
        echo "Exiting..."
        exit 0
    fi
fi

echo ""
echo "======================================================================"
echo "  STAGE 1: Starting Kafka Infrastructure"
echo "======================================================================"
echo ""

echo "Starting Zookeeper, Kafka, and Kafka Connect..."
docker-compose up -d zookeeper kafka kafka-connect

# Wait for Kafka to be ready
if ! wait_for_service "Kafka broker" \
    "docker exec kafka kafka-broker-api-versions --bootstrap-server localhost:9092" \
    120; then
    echo "   Check logs: docker-compose logs kafka"
    exit 1
fi

# Wait for Kafka Connect to be ready
if ! wait_for_service "Kafka Connect" \
    "curl -s http://localhost:8083/" \
    120; then
    echo "   Check logs: docker-compose logs kafka-connect"
    exit 1
fi

echo "Registering Kafka Connect connectors..."
docker-compose up -d connector-setup

# Wait for connectors to be registered (with automatic fallback)
MAX_WAIT=120
WAIT_TIME=0
echo "⏳ Waiting for connector registration..."

while [ $WAIT_TIME -lt $MAX_WAIT ]; do
    CONNECTORS=$(curl -s http://localhost:8083/connectors 2>/dev/null || echo "[]")
    if echo "$CONNECTORS" | grep -q "pos-cdc-source"; then
        echo "✓ Connectors registered successfully (waited ${WAIT_TIME}s)"
        break
    fi
    sleep 5
    WAIT_TIME=$((WAIT_TIME + 5))
    if [ $((WAIT_TIME % 15)) -eq 0 ]; then
        echo "  Still waiting... (${WAIT_TIME}s/${MAX_WAIT}s)"
    fi
done

# If auto-registration failed, try manual registration
if [ $WAIT_TIME -ge $MAX_WAIT ]; then
    echo "⚠️  Auto-registration timed out, attempting manual registration..."

    # Manual registration of POS connector
    curl -X POST http://localhost:8083/connectors \
      -H "Content-Type: application/json" \
      -d @kafka-connect/pos-source.json > /dev/null 2>&1 || true

    sleep 5

    # Verify manual registration
    CONNECTORS=$(curl -s http://localhost:8083/connectors 2>/dev/null || echo "[]")
    if echo "$CONNECTORS" | grep -q "pos-cdc-source"; then
        echo "✓ Connectors manually registered successfully"
    else
        echo "❌ Failed to register connectors automatically"
        echo "   You can register manually later with:"
        echo "   curl -X POST http://localhost:8083/connectors -H 'Content-Type: application/json' -d @kafka-connect/pos-source.json"
    fi
fi

# Show registered connectors
REGISTERED=$(curl -s http://localhost:8083/connectors 2>/dev/null || echo "[]")
echo "📋 Registered connectors: $REGISTERED"

echo ""
echo "======================================================================"
echo "  STAGE 2: Starting Data Pipeline"
echo "======================================================================"
echo ""

echo "Starting Snowpipe Streaming..."
docker-compose up -d snowpipe-streaming

# Give generators a moment to initialize
echo "⏳ Waiting for generators to initialize..."
sleep 5

# Check if generators are running
if docker-compose ps | grep -q "snowpipe-streaming.*Up"; then
    echo "✓ Snowpipe streaming client is running"
else
    echo "⚠️  Warning: Snowpipe streaming may not have started properly"
fi

echo ""
echo "======================================================================"
echo "  ✓ All Stages Complete!"
echo "======================================================================"
echo ""
echo "Services Status:"
docker-compose ps

echo ""
echo "======================================================================"
echo "Monitoring Commands:"
echo "======================================================================"
echo ""
echo "Pipeline Status:"
echo "  • Snowpipe Streaming:    docker-compose logs -f snowpipe-streaming"
echo "  • Kafka Connect:         docker-compose logs -f kafka-connect"
echo ""
echo "Health Checks:"
echo "  • View all services:     docker-compose ps"
echo "  • Check connectors:      curl http://localhost:8083/connectors"
echo "  • Connector status:      curl http://localhost:8083/connectors/pos-cdc-source/status | jq"
echo "  • Check Kafka topics:    docker exec kafka kafka-topics --list --bootstrap-server localhost:9092"
echo "  • View CDC logs:         docker exec pos-db psql -U your_user -d pos_db -c 'SELECT COUNT(*) FROM transaction_cdc_log;'"
echo ""
echo "Database Access:"
echo "  • POS Database:          docker exec -it pos-db psql -U your_user -d pos_db"
echo "  • Or via local:          psql -h localhost -p 5434 -U your_user -d pos_db"
echo "  • Password:              your_password"
echo ""
echo "Next Steps:"
echo "  1. Verify CDC is working:"
echo "     docker exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic pos-transaction_cdc_log --max-messages 5"
echo "  2. Monitor data flow:"
echo "     docker-compose logs -f snowpipe-streaming"
echo "  3. Check Snowflake: Run sample queries"
echo ""
echo "To stop the demo:"
echo "  docker-compose down"
echo ""
echo "To clean everything and start fresh:"
echo "  docker-compose down -v"
echo ""
echo "======================================================================"
Enter fullscreen mode Exit fullscreen mode

Ingestion Result

Once we run the start_demo.sh script, all of the changes in our monitored table will be reflected in the Snowflake tables within seconds. I try to simulate with 5 TPS for one and the latency is less than 10 seconds between the inserted data in PostgreSQL until it appears in Snowflake!

Potential Improvements

This approach provides a solution to deliver near real-time ingestion using a legacy database. In the future, we can add more detailed solution to cover the following:

Scalability

While this workaround can handle 5 TPS easily from a single table, a thorough testing is required to monitor the database performance like CPU and I/O. Another component to configure is the Kafka Connect, especially the number of workers to support expected latency and throughput in the production environment.

Observability

Several metrics may need to be captured when deploying this in production, such as lag behind the source DB, Kafka consumer lag, and Snowpipe Streaming error rates. The log can be stored and consumed by observability tools like Prometheus/Grafana, or other managed services.

Dynamic Function

For production scale, the function creation or trigger generation may require another automation process. By querying table information_schema which contains metadata, the supporting functions and triggers can be created dynamically without manual script development.

Conclusion

This article showed how we can provide a workaround when facing legacy systems. The approach chosen here may not be the most ideal way to ingest the data, but when upgrading the PostgreSQL version is not possible due to many factors, this approach works quite well to simulate the real-world workload. Please note that the additional table will consume more storage in your database, so we may need another scheduler to truncate the table periodically. Hope you find this article useful!

Top comments (0)