DEV Community

Anuj Manjhi
Anuj Manjhi

Posted on

SQL Debezium Kafka Snowflake

(Completely self-contained. No Azure. No Elasticsearch.)

1️⃣ BEGINNER THEORY SECTION (Only What We Use)
🟦 What is CDC (Change Data Capture)?

CDC = CCTV camera for your database

Imagine your SQL database is a bank vault.

CDC acts like:

🎥 A CCTV camera recording every insert, update, delete.

Instead of scanning the whole database again and again,
it only captures what changed.

Why useful?

Real-time data pipelines

No heavy full table scans

Efficient replication

🟦 What is Debezium?

Debezium = Smart CDC Reader

If CDC is CCTV footage,
Debezium is the security officer watching that footage and sending alerts.

It:

Reads database transaction logs

Converts changes into events

Sends events to Kafka

🟦 What is Apache Kafka?

Kafka = Post Office Sorting Center 📬

Imagine thousands of letters arriving per second.

Kafka:

Receives messages (events)

Stores them in order

Lets multiple systems read them independently

Kafka does NOT modify data.
It stores streams.

🟦 What is a Kafka Topic?

Topic = Mailbox

Example:

sql2016.dbo.DailyWeatherFact

Each table becomes a topic.

Kafka keeps events in ordered logs.

🟦 What is Kafka Connect?

Kafka Connect = Automation Robot 🤖

Instead of writing custom code to:

Read from Kafka

Send to Snowflake

Kafka Connect runs connectors that:

Pull from source

Push to destination

🟦 What is a Sink Connector?

Sink = Delivery Truck 🚛

Kafka stores mail.
Sink connector delivers mail to final destination.

In this project:
Kafka → Snowflake

🟦 What is Snowflake?

Snowflake = Digital Library Warehouse 📚

It:

Stores massive data

Supports analytics

Scales independently

Why not write directly to Snowflake?

Because:

Kafka gives:

Buffering

Replay

Fault tolerance

Scalability

2️⃣ ARCHITECTURE OVERVIEW
🧭 Text Diagram
+------------------+
| SQL Server |
| (Climatology) |
+--------+---------+
|
| CDC (transaction logs)
v
+------------------+
| Debezium |
| Source Connector|
+--------+---------+
|
v
+------------------+
| Kafka |
| Topic: |
| sql2016.dbo... |
+--------+---------+
|
v
+------------------+
| Kafka Connect |
| Snowflake Sink |
+--------+---------+
|
v
+------------------+
| Snowflake |
| DAILYWEATHERFACT |
+------------------+
🔌 Ports Used
Component Port
SQL Server 1433
Zookeeper 2181
Kafka Internal 29092
Kafka External 9092
Kafka Connect 8083
Snowflake 443
🔁 Data Flow Step-by-Step

Insert row into SQL

SQL writes change to transaction log

Debezium reads log

Sends event to Kafka topic

Kafka stores event

Snowflake sink reads topic

Inserts into Snowflake table

🔊 Internal vs External Listeners

Kafka runs inside Docker.

Internal communication:

kafka:29092

External (host machine):

localhost:9092

Why needed?
Because:

Containers talk using internal

Your machine talks using external

🏷 Topic Naming Pattern
..

sql2016.dbo.DailyWeatherFact
3️⃣ REAL WORLD USE CASE

Enterprises use this for:

ERP → Data warehouse sync

Banking transactions

Inventory changes

Real-time analytics

Why Kafka before Snowflake?

Because:

Without Kafka With Kafka
Tight coupling Decoupled
No replay Replay anytime
No buffering Safe buffering
Hard scaling Easy scaling

Kafka stores history.
You can replay events anytime.

4️⃣ FULL IMPLEMENTATION GUIDE
Step 0 – Prerequisites

OS: Windows 10/11 or Linux
Docker: 24+
RAM: Minimum 8GB (16GB recommended)
Open Ports: 1433, 2181, 9092, 29092, 8083

Step 1 – SQL Setup
Enable CDC
EXEC sys.sp_cdc_enable_db;
GO

EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name = 'DailyWeatherFact',
@role_name = NULL;
GO

Why?
Enables log tracking.

Create Test Table
CREATE TABLE dbo.DailyWeatherFact (
WeatherDate DATE PRIMARY KEY,
Temperature FLOAT,
Humidity FLOAT
);
Create User
CREATE LOGIN sa2 WITH PASSWORD = '23Hammer';
CREATE USER sa2 FOR LOGIN sa2;
ALTER ROLE sysadmin ADD MEMBER sa2;

Why sysadmin?
Debezium must read logs.

Step 2 – Docker Compose Setup

📁 docker-compose.yml

version: '3.8'

services:

zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"

kafka:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
  KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
  KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092
  KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL

connect:
image: debezium/connect:2.5
depends_on:
- kafka
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:29092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect-configs
OFFSET_STORAGE_TOPIC: connect-offsets
STATUS_STORAGE_TOPIC: connect-status
Step 3 – Debezium Source Connector

📁 debezium-connector.json

(From uploaded file

sqlserver-cdc

)

{
"name": "sqlserver-cdc",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"database.hostname": "host.docker.internal",
"database.port": "1433",
"database.user": "sa2",
"database.password": "23Hammer",
"database.names": "Climatology",
"database.server.name": "sql2016",
"table.include.list": "dbo.DailyWeatherFact",
"database.history.kafka.bootstrap.servers": "kafka:29092",
"database.history.kafka.topic": "schema-changes.sql2016",
"snapshot.mode": "schema_only",
"database.encrypt": "false",
"database.trustServerCertificate": "true"
}
}

Explanation:

server.name → topic prefix

snapshot.mode schema_only → no initial data

history topic → stores schema changes

Step 4 – Snowflake Sink Connector

📁 snowflake-sink.json

(From uploaded file

snowflake-sink-dailyweatherfact

)

{
"name": "snowflake-sink",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max": "1",
"topics": "sql2016.dbo.DailyWeatherFact",
"consumer.auto.offset.reset": "earliest",
"snowflake.url.name": "anmbgrd-cv13135.snowflakecomputing.com",
"snowflake.user.name": "ANUJUSER",
"snowflake.private.key": "YOUR_PRIVATE_KEY",
"snowflake.database.name": "CDC_DB",
"snowflake.schema.name": "RAW",
"snowflake.warehouse.name": "CDC_WH",
"snowflake.topic2table.map": "sql2016.dbo.DailyWeatherFact:DAILYWEATHERFACT_CDC",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"buffer.count.records": "10000",
"buffer.flush.time": "30"
}
}

Buffer settings = batching for performance.

Step 5 – Commands

Start containers:

docker-compose up -d

Check running:

docker ps

Create Debezium connector:

curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @debezium-connector.json

Create Snowflake sink:

curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @snowflake-sink.json

Insert test:

INSERT INTO dbo.DailyWeatherFact VALUES ('2026-01-01', 32.5, 60);

Check topic:

docker exec -it \
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic sql2016.dbo.DailyWeatherFact \
--from-beginning
Step 6 – Validation

✔ Insert in SQL
✔ See event in Kafka
✔ Check Snowflake:

SELECT * FROM RAW.DAILYWEATHERFACT_CDC;
Step 7 – Common Errors
Error Fix
Listener not reachable Fix advertised listeners
Connector class not found Use correct image
Auth error Snowflake Check private key
No data flowing Check offsets
Step 8 – Production Hardening

Enable SSL

Use SASL auth

Replication factor ≥ 3

Dead Letter Queue

Retry configs

Multi-broker cluster

Monitor lag

✅ PROJECT 1 COMPLETE

Top comments (0)