DEV Community

Cover image for PostgreSQL to Snowflake: A Hands-On CDC Streaming Guide
Sourabh Gupta
Sourabh Gupta

Posted on

PostgreSQL to Snowflake: A Hands-On CDC Streaming Guide

Most PostgreSQL to Snowflake pipelines run on a schedule: extract every hour, load to S3, copy into Snowflake. That works for reporting. It fails the moment you need to track hard deletes, catch every intermediate row state, or reduce the lag between your operational database and your warehouse.

There are several ways to move data from PostgreSQL to Snowflake: manual CSV exports via COPY INTO, scheduled batch ETL with tools like Airflow, Snowflake's native Openflow connector (built on Apache NiFi), and managed CDC platforms like Fivetran, Airbyte, or Estuary. Each approach trades off differently across latency, delete handling, setup complexity, and cost.

This tutorial focuses on real-time Change data capture (CDC) using Estuary. CDC works by reading directly from PostgreSQL's write-ahead log (WAL). Every INSERT, UPDATE, and DELETE is captured the moment it's committed and streamed to Snowflake continuously: no polling, no missed deletes, no stale data windows. We use Snowpipe Streaming on the Snowflake side for the lowest-latency ingestion path available.

We'll use a demo Postgres instance in a Docker container to walk through logical replication configuration and to ensure a continuous stream of updates. If you want to evaluate the other approaches against what we build here, there's a comparison table at the end of the tutorial.

By the end of this tutorial, you'll have:

  • A running PostgreSQL CDC capture streaming live changes into Estuary collections

  • A Snowflake materialization receiving those changes in near real time

  • A working understanding of WAL-based replication, schema evolution, and Snowflake sync frequency tuning

Prerequisites: Docker, ngrok, a Snowflake account (trial is fine), and a free Estuary account.

đź’ˇ If you want to use an existing Postgres database rather than a demo Docker one, make sure to use PostgreSQL 10.0 or later. Logical replication was introduced in PG 10.

What You'll Build

Here's the architecture at a glance:

  • Source: A self-hosted PostgreSQL instance with logical replication enabled and a live data generator inserting rows continuously

  • Transport: Estuary captures the WAL stream, stores change events as JSON collections in cloud object storage

  • Destination: Estuary materializes those collections into Snowflake tables, with configurable sync frequency to control warehouse credits

CDC pipeline architecture: Source (Docker with Data generator, PostgreSQL, ngrok) to Transport (Estuary Capture, Collections, Materialization) to Destination (Snowpipe Streaming, Snowflake)

What you'll need before starting:

  • Docker (Docker Compose V2 is bundled in modern Docker Desktop, no separate install needed)

  • ngrok (free tier is fine, though you’ll need to authenticate your account to use TCP) — to expose your local Postgres to Estuary's cloud connector

  • A Snowflake account (trial account works perfectly)

  • An Estuary account — sign up free at dashboard.estuary.dev/register

Step 1: Set Up the Source PostgreSQL Database

Spin Up Postgres + Data Generator with Docker Compose

Create a project directory and save the following as docker-compose.yml. It defines two services: a PostgreSQL container configured for logical replication, and a data generator that continuously writes realistic fake product records.

services:

  postgres:

    image: postgres:18

    container_name: postgres_cdc

    hostname: postgres_cdc

    restart: unless-stopped

    user: postgres

    environment:

      POSTGRES_USER: postgres

      POSTGRES_DB: postgres

      POSTGRES_PASSWORD: postgres

    command:

      - "postgres"

      - "-c"

      - "wal_level=logical"

    healthcheck:

      test: ["CMD-SHELL", "pg_isready -U postgres -d postgres"]

      interval: 5s

      timeout: 10s

      retries: 120

    volumes:

      - ./init.sql:/docker-entrypoint-initdb.d/init.sql

    ports:

      - "5432:5432"



  datagen:

    image: materialize/datagen

    container_name: datagen

    restart: unless-stopped

    environment:

      POSTGRES_HOST: postgres_cdc

      POSTGRES_PORT: 5432

      POSTGRES_DB: postgres

      POSTGRES_USER: postgres

      POSTGRES_PASSWORD: postgres

    entrypoint: "datagen -s /app/schemas/products.sql -n 10000 -f postgres -w 1000"

    depends_on:

      postgres:

        condition: service_healthy

    volumes:

      - ./schemas/products.sql:/app/schemas/products.sql
Enter fullscreen mode Exit fullscreen mode

đź’ˇ The -w 1000 flag tells the generator to insert a new row every 1000ms. You can lower it to stress-test the pipeline, or raise it to keep things calm while you explore the UI.

Define the Data Schema

Create a schemas/ folder and save the following as schemas/products.sql. This defines the table schema that the data generator uses to create records:

-- schemas/products.sql

-- NOTE: COMMENT fields are datagen directives (Faker expressions),

-- not standard SQL. They are only used by the datagen tool, not executed against Postgres.

CREATE TABLE "public"."products" (

  "id"          INT         PRIMARY KEY,

  "name"        VARCHAR     COMMENT 'faker.internet.userName()',

  "merchant_id" INT NOT NULL COMMENT 'faker.datatype.number()',

  "price"       INT         COMMENT 'faker.datatype.number()',

  "status"      VARCHAR     COMMENT 'faker.datatype.boolean()',

  "created_at"  TIMESTAMP   DEFAULT now()

);

Enter fullscreen mode Exit fullscreen mode

⚠️ The COMMENT directives in the schema file are Faker expressions consumed by the datagen tool — they are not executed against PostgreSQL. The actual table is created by init.sql below.

Configure PostgreSQL for CDC Replication

Save the following as init.sql in your project root. This runs automatically when the Postgres container starts and configures everything Estuary needs for log-based CDC:

-- init.sql



-- 1. Create a dedicated replication user (use a strong password in production)

CREATE USER flow_capture WITH PASSWORD 'secret' REPLICATION;



-- 2. Grant read access across all tables

GRANT pg_read_all_data TO flow_capture;



-- 3. Create the products table (the actual table, not the datagen schema)

CREATE TABLE IF NOT EXISTS public.products (

  id        INT PRIMARY KEY,

  name      VARCHAR,

  merchant_id INT NOT NULL,

  price     INT,

  status    VARCHAR,

  created_at  TIMESTAMP DEFAULT now()

);



-- 4. Create the watermarks table

--    Estuary writes small amounts of metadata here to ensure

--    accurate backfill sequencing. Not needed in read-only mode.

CREATE TABLE IF NOT EXISTS public.flow_watermarks (

  slot      TEXT PRIMARY KEY,

  watermark TEXT

);

GRANT ALL PRIVILEGES ON TABLE public.flow_watermarks TO flow_capture;



-- 5. Create and configure the replication publication

CREATE PUBLICATION flow_publication;

ALTER PUBLICATION flow_publication SET (publish_via_partition_root = true);

ALTER PUBLICATION flow_publication ADD TABLE

  public.flow_watermarks,

  public.products;

Enter fullscreen mode Exit fullscreen mode

A few notes on what each block does:

Object Purpose
flow_capture user Dedicated replication user with REPLICATION attribute. Isolated from your app user for security.
pg_read_all_data PostgreSQL built-in role granting SELECT on all tables.
flow_watermarks A small scratch table Estuary writes to during backfill to track replication progress accurately.
flow_publication Defines which tables' WAL events are exposed for replication. publish_via_partition_root is recommended for partitioned tables.

📌 Using an existing database that runs PostgreSQL v13 or earlier? Replace GRANT pg_read_all_data with per-schema grants: GRANT SELECT ON ALL TABLES IN SCHEMA public TO flow_capture; ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO flow_capture;

Start the Containers

Run the following from your project directory:

docker compose up -d
Enter fullscreen mode Exit fullscreen mode

After a few seconds, verify both containers are healthy:

docker compose ps
Enter fullscreen mode Exit fullscreen mode

You should see postgres_cdc and datagen both in a running state. Confirm rows are accumulating:

# Connect and check row counts a few seconds apart

psql -h localhost -p 5432 -U postgres -d postgres -c "SELECT count(*) FROM products;"
Enter fullscreen mode Exit fullscreen mode

Expose the Database via ngrok

Estuary is a fully managed cloud service. Since your Postgres instance is running locally, you need to expose port 5432 to the internet. ngrok makes this a one-liner:

ngrok tcp 5432
Enter fullscreen mode Exit fullscreen mode

Note the Forwarding address from the output — it will look like 5.tcp.ngrok.io:12345. You'll use the host and port as the connection details in Estuary.

⚠️ Keep the ngrok process running throughout the tutorial. Closing it breaks the connection between Estuary and your database.

Step 2: Create a Capture in Estuary

Log in to the Estuary dashboard and navigate to Captures in the left sidebar. Click + New Capture and select the PostgreSQL connector.

Estuary dashboard Sources page showing the New Capture button and existing captures list

Configure the Connection

Fill in the connection details using your ngrok forwarding address:

Field Value
Address Your ngrok host and port (e.g. 5.tcp.ngrok.io:12345)
Database postgres
User flow_capture
Password secret (or whatever you set in init.sql)

Estuary Create Capture form showing PostgreSQL connector selection, endpoint config with server address, user, and database fields

Click Next. Estuary will connect to your database and auto-discover the tables available in the publication.

Review Collections and Schema Evolution Settings

In the next section, you'll see the discovered tables as Estuary collections. A collection is a real-time data lake of JSON documents backed by cloud object storage (S3, GCS, or Azure Blob). Because it's object storage, data is retained indefinitely and can always be replayed from the beginning.

Estuary infers the JSON schema automatically from your Postgres table definition. You can inspect and override it here if needed.

You'll also see three schema evolution toggles, all enabled by default:

• Automatically keep schemas up to date: Estuary propagates upstream schema changes (new columns, type changes) through to downstream materializations automatically.

• Automatically add new source tables: Any new table added to your publication is picked up and captured without reconfiguration.

• Breaking changes re-version collections: If a change is incompatible (e.g. a column is dropped or renamed), Estuary creates a new versioned collection rather than corrupting existing data.

For this tutorial, leave all defaults and click Next, then Save and Publish. The connector will start and immediately begin backfilling existing rows while simultaneously streaming new changes from the WAL.

âś… Estuary captures both the historical backfill and the live change stream in a single consistent operation. There is no risk of missing events that happened during the initial load.

Step 3: Set Up the Snowflake Materialization

Prepare Snowflake

Open your Snowflake console and create a new SQL worksheet. Paste and run the following setup script. It creates a dedicated role, warehouse, database, schema, and user for Estuary:

-- Snowflake Setup Script for Estuary

-- Run all statements (use "Run All" from the dropdown next to the Run button)



SET database_name   = 'ESTUARY_DB';

SET warehouse_name   = 'ESTUARY_WH';

SET estuary_role    = 'ESTUARY_ROLE';

SET estuary_user    = 'ESTUARY_USER';

SET estuary_schema   = 'ESTUARY_SCHEMA';



-- Role and schema

CREATE ROLE IF NOT EXISTS IDENTIFIER($estuary_role);

GRANT ROLE IDENTIFIER($estuary_role) TO ROLE SYSADMIN;



-- Database

CREATE DATABASE IF NOT EXISTS IDENTIFIER($database_name);

USE DATABASE IDENTIFIER($database_name);

CREATE SCHEMA IF NOT EXISTS IDENTIFIER($estuary_schema);



-- User

CREATE USER IF NOT EXISTS IDENTIFIER($estuary_user)

  DEFAULT_ROLE  = $estuary_role

  DEFAULT_WAREHOUSE = $warehouse_name;

GRANT ROLE IDENTIFIER($estuary_role) TO USER IDENTIFIER($estuary_user);

GRANT ALL ON SCHEMA IDENTIFIER($estuary_schema) TO IDENTIFIER($estuary_role);



-- Warehouse (XS, auto-suspend after 60s to minimize credit usage)

CREATE WAREHOUSE IF NOT EXISTS IDENTIFIER($warehouse_name)

  WAREHOUSE_SIZE = XSMALL

  WAREHOUSE_TYPE = STANDARD

  AUTO_SUSPEND   = 60

  AUTO_RESUME   = TRUE

  INITIALLY_SUSPENDED = TRUE;

GRANT USAGE ON WAREHOUSE IDENTIFIER($warehouse_name) TO ROLE IDENTIFIER($estuary_role);



-- Database access

GRANT CREATE SCHEMA, MONITOR, USAGE

  ON DATABASE IDENTIFIER($database_name)

  TO ROLE IDENTIFIER($estuary_role);



-- Required for Snowflake on GCP only

USE ROLE ACCOUNTADMIN;

GRANT CREATE INTEGRATION ON ACCOUNT TO ROLE IDENTIFIER($estuary_role);

USE ROLE SYSADMIN;



COMMIT;
Enter fullscreen mode Exit fullscreen mode

đź’ˇ The AUTO_SUSPEND = 60 setting suspends the warehouse after 60 seconds of inactivity. Combined with the Sync Frequency setting below, this is the primary lever for controlling Snowflake compute costs.

You will then need to configure JWT authentication for your Snowflake user. You can generate a private-public key pair from the terminal using:

# generate a private key

openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt

# generate a public key

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Enter fullscreen mode Exit fullscreen mode

Copy the public key to the clipboard using cat rsa_key.pub.

In Snowflake, you can then associate this key with your user. Run:

ALTER USER identifier($estuary_user) SET RSA_PUBLIC_KEY='<value you copied>'
Enter fullscreen mode Exit fullscreen mode

Create the Materialization

Back in Estuary, navigate to Destinations and click + New Materialization.

Estuary dashboard Destinations page showing the New Materialization button and existing materializations list

Select Snowflake and fill in the connection details using the values from the script above.

Estuary Create Materialization form showing Snowflake connector selection and endpoint config with host, database, and schema fields

Field Value
Host Your Snowflake host URL (.snowflakecomputing.com)
Database ESTUARY_DB
Schema ESTUARY_SCHEMA
Warehouse ESTUARY_WH
Timestamp Type Choose how timestamp columns should be stored, such as using TIMESTAMP_LTZ
User ESTUARY_USER
Private Key The private key from the key-pair you generated

Configure Sync Frequency (Cost Optimization)

Snowflake materialization settings showing timestamp type, JWT authentication, and sync frequency configuration set to 30m

Scroll down to Advanced Options. The Sync Frequency parameter controls how long Estuary waits before flushing the latest data into Snowflake. This is a critical cost lever:

Sync Frequency Warehouse Behavior Best For
0 seconds Continuous Real-time dashboards, operational use cases
30 minutes (default) Wakes every 30 min, ~1 credit/hr on XS Near-real-time analytics
2 hours Wakes every 2 hrs, suspends in between Daily reporting, cost-conscious setups
24 hours Once daily flush Archival, low-priority data

Set the frequency that matches your latency requirements.

To test out real-time streaming, for example, go ahead and set the sync frequency to 0s. This will push data as soon as it’s available.

These types of continuous updates are best suited to ingestion via Snowpipe Streaming, Snowflake’s lowest-latency ingestion option. Otherwise, continuous updates can negatively affect costs, as your warehouse may not be able to suspend activity between data loads. We’ll make sure we’re using Snowpipe Streaming in the next section.

Link Your Capture

In the “Source Collections” section, toggle on “Delta Updates” to be the default setting for all newly added collections.

💡 Delta updates is an append-only materialization mode that cleanly fits with Snowpipe Streaming’s low latency streaming capabilities. All bindings in your Snowflake materialization that use delta updates automatically ingest data via Snowpipe Streaming on the back end.

Click Source from Capture and select the PostgreSQL capture you created in Step 2. Estuary will automatically link the collections and propose a mapping to Snowflake tables. You can also toggle delta updates mode individually for each binding here.

Click Next, then Save and Publish. Watch the deployment logs — a successful publish will show the connector handshake completing and the first materialization batch starting.

Verify the Pipeline

Once the materialization is deployed, open Snowflake and run:

USE DATABASE ESTUARY_DB;

USE SCHEMA ESTUARY_SCHEMA;



-- Check that the table exists and has data

SELECT count(*) FROM products;



-- Inspect a sample of rows

SELECT * FROM products LIMIT 10;



-- Watch rows arriving in real time

SELECT count(*) FROM products; -- run this repeatedly
Enter fullscreen mode Exit fullscreen mode

You should see the row count climbing as the datagen service continues writing to Postgres. The Estuary dashboard's collection view shows the change stream in real time; the Snowflake side reflects it after the next sync interval.

🔬 Want to verify a full end-to-end delete? Run DELETE FROM products WHERE id = 1; against Postgres and watch Estuary propagate the delete to Snowflake after the next sync. Batch pipelines miss these entirely — CDC captures every one.

What Happens When Your Schema Changes?

One of the reasons teams avoid CDC is fear of schema drift breaking the pipeline. Estuary handles this automatically. Here's what happens in practice:

• New column added: Estuary detects it via schema auto-discovery, updates the collection schema, and adds the column to the Snowflake table. Existing rows show NULL for the new column.

• Column type widened (e.g. INT to BIGINT): Handled automatically. Estuary updates the schema without interruption.

• Breaking change (column dropped, primary key changed): Estuary creates a new versioned collection (e.g. products_v2) rather than corrupting the existing one. You can choose when to cut over.

All three toggles on the capture's Collections tab control this behavior. The defaults are safe for most teams.

Cleanup

Postgres and ngrok

# Stop and remove containers

docker compose down -v



# Terminate ngrok (Ctrl+C in the ngrok terminal)
Enter fullscreen mode Exit fullscreen mode

Snowflake

-- Run in Snowflake to remove all resources created by this tutorial

REVOKE ALL PRIVILEGES ON SCHEMA ESTUARY_SCHEMA FROM ROLE ESTUARY_ROLE;

REVOKE ALL PRIVILEGES ON DATABASE ESTUARY_DB FROM ROLE ESTUARY_ROLE;

DROP WAREHOUSE IF EXISTS ESTUARY_WH;

DROP USER IF EXISTS ESTUARY_USER;

DROP ROLE IF EXISTS ESTUARY_ROLE;

DROP SCHEMA IF EXISTS ESTUARY_SCHEMA;

DROP DATABASE IF EXISTS ESTUARY_DB;
Enter fullscreen mode Exit fullscreen mode

Estuary

From the Estuary dashboard, disable or delete the capture and materialization you created.

How Does This Compare to Other Approaches?

The CDC pipeline you just built is one of several ways to get data from PostgreSQL into Snowflake. Here's how the major methods compare:

Method Latency Captures Deletes Schema Evolution Setup Complexity Best For
Manual CSV/S3 + COPY INTO Hours to days (scheduled) No. Requires full reload or custom tracking. Manual. You manage column changes yourself. Low initial, high ongoing maintenance. One-time migrations or very small datasets.
Batch ETL (Airflow, custom scripts) Minutes to hours (schedule-dependent) Only with soft-delete flags or full reloads. Manual. DAG changes required per schema change. Medium. Requires Airflow infrastructure and DAG maintenance. Scheduled reporting where freshness is not critical.
Snowflake Openflow (PostgreSQL connector) Near real-time (CDC via WAL). Yes. Captures INSERT, UPDATE, DELETE via logical replication. Tracks DDL changes and replicates downstream. Medium-high. Requires Openflow runtime setup (Snowflake Deployment or BYOC), Apache NiFi familiarity helps. Enterprise account required. Teams standardized on Snowflake wanting a native, single-vendor CDC path.
Fivetran 1 min to hours (plan-dependent). 1-min sync requires Enterprise or Business Critical plan. Yes, with logical replication. Query-Based mode may miss short-lived rows. Automatic schema migration, though some edge cases (e.g., ADD COLUMN SET DEFAULT) require manual re-sync. Low. Fully managed, point-and-click setup. Teams wanting managed simplicity and a large connector catalog, with budget for consumption-based pricing.
Estuary Sub-second to configurable intervals. Snowpipe Streaming enabled by default for delta updates. Yes. Captures every INSERT, UPDATE, DELETE from the WAL, including hard deletes. Automatic. New columns propagate, type widenings handled, breaking changes create versioned collections. Low. Web UI setup, no infrastructure to manage. Real-time analytics, operational use cases, teams needing low-latency CDC with Snowpipe Streaming and cost control via sync frequency.

A few things worth noting:

  • Snowflake's Openflow connector is relatively new. It replaces the older Snowflake Connector for PostgreSQL (now in maintenance mode). Openflow is built on Apache NiFi and uses Snowpipe Streaming for ingestion. It's a strong option if you want everything inside Snowflake's ecosystem, but it requires an Enterprise account and Openflow runtime configuration.
  • Fivetran's 1-minute sync frequency is only available on Enterprise and Business Critical plans. On lower tiers, the minimum is 5 minutes. Their PostgreSQL connector supports both logical replication and a Query-Based fallback, but Query-Based mode can miss rows that exist for less than the sync interval.
  • Estuary's sync frequency is configurable from 0 seconds (continuous via Snowpipe Streaming) to 24 hours, on any plan. The tutorial above uses 0s for demonstration, but production deployments typically use 30 minutes or longer to control Snowflake warehouse credits.
  • The manual CSV/S3 approach is essentially free in tooling costs but scales poorly. It's viable for a one-time migration but not for ongoing replication.

Where to Go From Here

You now have a working CDC pipeline from Postgres to Snowflake. A few directions worth exploring next:

  1. Connect your real database: Replace the Docker Postgres with your production instance. Estuary supports RDS, Aurora, Cloud SQL, Azure Database for PostgreSQL, Supabase, and Neon out of the box.
  2. Add more tables: Modify the ALTER PUBLICATION flow_publication ADD TABLE statement and Estuary will auto-discover the new bindings.
  3. Try read-only mode: If you're connecting to a replica or a database where you can't create the watermarks table, Estuary supports a read-only capture mode. No schema modifications required on the source.
  4. Explore transformations: Estuary supports inline derivations using TypeScript, Python, or SQL. You can reshape, filter, or join collections before they land in Snowflake.
  5. Add other destinations: The same Postgres capture can fan out to BigQuery, Redshift, Databricks, or Kafka simultaneously. No duplicate infrastructure.

Check the full Estuary documentation for connector-specific setup guides, and join the Estuary Slack community if you run into questions

Top comments (0)