Most PostgreSQL to Snowflake pipelines run on a schedule: extract every hour, load to S3, copy into Snowflake. That works for reporting. It fails the moment you need to track hard deletes, catch every intermediate row state, or reduce the lag between your operational database and your warehouse.
There are several ways to move data from PostgreSQL to Snowflake: manual CSV exports via COPY INTO, scheduled batch ETL with tools like Airflow, Snowflake's native Openflow connector (built on Apache NiFi), and managed CDC platforms like Fivetran, Airbyte, or Estuary. Each approach trades off differently across latency, delete handling, setup complexity, and cost.
This tutorial focuses on real-time Change data capture (CDC) using Estuary. CDC works by reading directly from PostgreSQL's write-ahead log (WAL). Every INSERT, UPDATE, and DELETE is captured the moment it's committed and streamed to Snowflake continuously: no polling, no missed deletes, no stale data windows. We use Snowpipe Streaming on the Snowflake side for the lowest-latency ingestion path available.
We'll use a demo Postgres instance in a Docker container to walk through logical replication configuration and to ensure a continuous stream of updates. If you want to evaluate the other approaches against what we build here, there's a comparison table at the end of the tutorial.
By the end of this tutorial, you'll have:
A running PostgreSQL CDC capture streaming live changes into Estuary collections
A Snowflake materialization receiving those changes in near real time
A working understanding of WAL-based replication, schema evolution, and Snowflake sync frequency tuning
Prerequisites: Docker, ngrok, a Snowflake account (trial is fine), and a free Estuary account.
đź’ˇ If you want to use an existing Postgres database rather than a demo Docker one, make sure to use PostgreSQL 10.0 or later. Logical replication was introduced in PG 10.
What You'll Build
Here's the architecture at a glance:
Source: A self-hosted PostgreSQL instance with logical replication enabled and a live data generator inserting rows continuously
Transport: Estuary captures the WAL stream, stores change events as JSON collections in cloud object storage
Destination: Estuary materializes those collections into Snowflake tables, with configurable sync frequency to control warehouse credits
What you'll need before starting:
Docker (Docker Compose V2 is bundled in modern Docker Desktop, no separate install needed)
ngrok (free tier is fine, though you’ll need to authenticate your account to use TCP) — to expose your local Postgres to Estuary's cloud connector
A Snowflake account (trial account works perfectly)
An Estuary account — sign up free at dashboard.estuary.dev/register
Step 1: Set Up the Source PostgreSQL Database
Spin Up Postgres + Data Generator with Docker Compose
Create a project directory and save the following as docker-compose.yml. It defines two services: a PostgreSQL container configured for logical replication, and a data generator that continuously writes realistic fake product records.
services:
postgres:
image: postgres:18
container_name: postgres_cdc
hostname: postgres_cdc
restart: unless-stopped
user: postgres
environment:
POSTGRES_USER: postgres
POSTGRES_DB: postgres
POSTGRES_PASSWORD: postgres
command:
- "postgres"
- "-c"
- "wal_level=logical"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres -d postgres"]
interval: 5s
timeout: 10s
retries: 120
volumes:
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "5432:5432"
datagen:
image: materialize/datagen
container_name: datagen
restart: unless-stopped
environment:
POSTGRES_HOST: postgres_cdc
POSTGRES_PORT: 5432
POSTGRES_DB: postgres
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
entrypoint: "datagen -s /app/schemas/products.sql -n 10000 -f postgres -w 1000"
depends_on:
postgres:
condition: service_healthy
volumes:
- ./schemas/products.sql:/app/schemas/products.sql
đź’ˇ The -w 1000 flag tells the generator to insert a new row every 1000ms. You can lower it to stress-test the pipeline, or raise it to keep things calm while you explore the UI.
Define the Data Schema
Create a schemas/ folder and save the following as schemas/products.sql. This defines the table schema that the data generator uses to create records:
-- schemas/products.sql
-- NOTE: COMMENT fields are datagen directives (Faker expressions),
-- not standard SQL. They are only used by the datagen tool, not executed against Postgres.
CREATE TABLE "public"."products" (
"id" INT PRIMARY KEY,
"name" VARCHAR COMMENT 'faker.internet.userName()',
"merchant_id" INT NOT NULL COMMENT 'faker.datatype.number()',
"price" INT COMMENT 'faker.datatype.number()',
"status" VARCHAR COMMENT 'faker.datatype.boolean()',
"created_at" TIMESTAMP DEFAULT now()
);
⚠️ The COMMENT directives in the schema file are Faker expressions consumed by the datagen tool — they are not executed against PostgreSQL. The actual table is created by init.sql below.
Configure PostgreSQL for CDC Replication
Save the following as init.sql in your project root. This runs automatically when the Postgres container starts and configures everything Estuary needs for log-based CDC:
-- init.sql
-- 1. Create a dedicated replication user (use a strong password in production)
CREATE USER flow_capture WITH PASSWORD 'secret' REPLICATION;
-- 2. Grant read access across all tables
GRANT pg_read_all_data TO flow_capture;
-- 3. Create the products table (the actual table, not the datagen schema)
CREATE TABLE IF NOT EXISTS public.products (
id INT PRIMARY KEY,
name VARCHAR,
merchant_id INT NOT NULL,
price INT,
status VARCHAR,
created_at TIMESTAMP DEFAULT now()
);
-- 4. Create the watermarks table
-- Estuary writes small amounts of metadata here to ensure
-- accurate backfill sequencing. Not needed in read-only mode.
CREATE TABLE IF NOT EXISTS public.flow_watermarks (
slot TEXT PRIMARY KEY,
watermark TEXT
);
GRANT ALL PRIVILEGES ON TABLE public.flow_watermarks TO flow_capture;
-- 5. Create and configure the replication publication
CREATE PUBLICATION flow_publication;
ALTER PUBLICATION flow_publication SET (publish_via_partition_root = true);
ALTER PUBLICATION flow_publication ADD TABLE
public.flow_watermarks,
public.products;
A few notes on what each block does:
| Object | Purpose |
|---|---|
| flow_capture user | Dedicated replication user with REPLICATION attribute. Isolated from your app user for security. |
| pg_read_all_data | PostgreSQL built-in role granting SELECT on all tables. |
| flow_watermarks | A small scratch table Estuary writes to during backfill to track replication progress accurately. |
| flow_publication | Defines which tables' WAL events are exposed for replication. publish_via_partition_root is recommended for partitioned tables. |
📌 Using an existing database that runs PostgreSQL v13 or earlier? Replace GRANT pg_read_all_data with per-schema grants: GRANT SELECT ON ALL TABLES IN SCHEMA public TO flow_capture; ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO flow_capture;
Start the Containers
Run the following from your project directory:
docker compose up -d
After a few seconds, verify both containers are healthy:
docker compose ps
You should see postgres_cdc and datagen both in a running state. Confirm rows are accumulating:
# Connect and check row counts a few seconds apart
psql -h localhost -p 5432 -U postgres -d postgres -c "SELECT count(*) FROM products;"
Expose the Database via ngrok
Estuary is a fully managed cloud service. Since your Postgres instance is running locally, you need to expose port 5432 to the internet. ngrok makes this a one-liner:
ngrok tcp 5432
Note the Forwarding address from the output — it will look like 5.tcp.ngrok.io:12345. You'll use the host and port as the connection details in Estuary.
⚠️ Keep the ngrok process running throughout the tutorial. Closing it breaks the connection between Estuary and your database.
Step 2: Create a Capture in Estuary
Log in to the Estuary dashboard and navigate to Captures in the left sidebar. Click + New Capture and select the PostgreSQL connector.
Configure the Connection
Fill in the connection details using your ngrok forwarding address:
| Field | Value |
|---|---|
| Address | Your ngrok host and port (e.g. 5.tcp.ngrok.io:12345) |
| Database | postgres |
| User | flow_capture |
| Password | secret (or whatever you set in init.sql) |
Click Next. Estuary will connect to your database and auto-discover the tables available in the publication.
Review Collections and Schema Evolution Settings
In the next section, you'll see the discovered tables as Estuary collections. A collection is a real-time data lake of JSON documents backed by cloud object storage (S3, GCS, or Azure Blob). Because it's object storage, data is retained indefinitely and can always be replayed from the beginning.
Estuary infers the JSON schema automatically from your Postgres table definition. You can inspect and override it here if needed.
You'll also see three schema evolution toggles, all enabled by default:
• Automatically keep schemas up to date: Estuary propagates upstream schema changes (new columns, type changes) through to downstream materializations automatically.
• Automatically add new source tables: Any new table added to your publication is picked up and captured without reconfiguration.
• Breaking changes re-version collections: If a change is incompatible (e.g. a column is dropped or renamed), Estuary creates a new versioned collection rather than corrupting existing data.
For this tutorial, leave all defaults and click Next, then Save and Publish. The connector will start and immediately begin backfilling existing rows while simultaneously streaming new changes from the WAL.
âś… Estuary captures both the historical backfill and the live change stream in a single consistent operation. There is no risk of missing events that happened during the initial load.
Step 3: Set Up the Snowflake Materialization
Prepare Snowflake
Open your Snowflake console and create a new SQL worksheet. Paste and run the following setup script. It creates a dedicated role, warehouse, database, schema, and user for Estuary:
-- Snowflake Setup Script for Estuary
-- Run all statements (use "Run All" from the dropdown next to the Run button)
SET database_name = 'ESTUARY_DB';
SET warehouse_name = 'ESTUARY_WH';
SET estuary_role = 'ESTUARY_ROLE';
SET estuary_user = 'ESTUARY_USER';
SET estuary_schema = 'ESTUARY_SCHEMA';
-- Role and schema
CREATE ROLE IF NOT EXISTS IDENTIFIER($estuary_role);
GRANT ROLE IDENTIFIER($estuary_role) TO ROLE SYSADMIN;
-- Database
CREATE DATABASE IF NOT EXISTS IDENTIFIER($database_name);
USE DATABASE IDENTIFIER($database_name);
CREATE SCHEMA IF NOT EXISTS IDENTIFIER($estuary_schema);
-- User
CREATE USER IF NOT EXISTS IDENTIFIER($estuary_user)
DEFAULT_ROLE = $estuary_role
DEFAULT_WAREHOUSE = $warehouse_name;
GRANT ROLE IDENTIFIER($estuary_role) TO USER IDENTIFIER($estuary_user);
GRANT ALL ON SCHEMA IDENTIFIER($estuary_schema) TO IDENTIFIER($estuary_role);
-- Warehouse (XS, auto-suspend after 60s to minimize credit usage)
CREATE WAREHOUSE IF NOT EXISTS IDENTIFIER($warehouse_name)
WAREHOUSE_SIZE = XSMALL
WAREHOUSE_TYPE = STANDARD
AUTO_SUSPEND = 60
AUTO_RESUME = TRUE
INITIALLY_SUSPENDED = TRUE;
GRANT USAGE ON WAREHOUSE IDENTIFIER($warehouse_name) TO ROLE IDENTIFIER($estuary_role);
-- Database access
GRANT CREATE SCHEMA, MONITOR, USAGE
ON DATABASE IDENTIFIER($database_name)
TO ROLE IDENTIFIER($estuary_role);
-- Required for Snowflake on GCP only
USE ROLE ACCOUNTADMIN;
GRANT CREATE INTEGRATION ON ACCOUNT TO ROLE IDENTIFIER($estuary_role);
USE ROLE SYSADMIN;
COMMIT;
đź’ˇ The AUTO_SUSPEND = 60 setting suspends the warehouse after 60 seconds of inactivity. Combined with the Sync Frequency setting below, this is the primary lever for controlling Snowflake compute costs.
You will then need to configure JWT authentication for your Snowflake user. You can generate a private-public key pair from the terminal using:
# generate a private key
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
# generate a public key
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
Copy the public key to the clipboard using cat rsa_key.pub.
In Snowflake, you can then associate this key with your user. Run:
ALTER USER identifier($estuary_user) SET RSA_PUBLIC_KEY='<value you copied>'
Create the Materialization
Back in Estuary, navigate to Destinations and click + New Materialization.
Select Snowflake and fill in the connection details using the values from the script above.
| Field | Value |
|---|---|
| Host | Your Snowflake host URL (.snowflakecomputing.com) |
| Database | ESTUARY_DB |
| Schema | ESTUARY_SCHEMA |
| Warehouse | ESTUARY_WH |
| Timestamp Type | Choose how timestamp columns should be stored, such as using TIMESTAMP_LTZ
|
| User | ESTUARY_USER |
| Private Key | The private key from the key-pair you generated |
Configure Sync Frequency (Cost Optimization)
Scroll down to Advanced Options. The Sync Frequency parameter controls how long Estuary waits before flushing the latest data into Snowflake. This is a critical cost lever:
| Sync Frequency | Warehouse Behavior | Best For |
|---|---|---|
| 0 seconds | Continuous | Real-time dashboards, operational use cases |
| 30 minutes (default) | Wakes every 30 min, ~1 credit/hr on XS | Near-real-time analytics |
| 2 hours | Wakes every 2 hrs, suspends in between | Daily reporting, cost-conscious setups |
| 24 hours | Once daily flush | Archival, low-priority data |
Set the frequency that matches your latency requirements.
To test out real-time streaming, for example, go ahead and set the sync frequency to 0s. This will push data as soon as it’s available.
These types of continuous updates are best suited to ingestion via Snowpipe Streaming, Snowflake’s lowest-latency ingestion option. Otherwise, continuous updates can negatively affect costs, as your warehouse may not be able to suspend activity between data loads. We’ll make sure we’re using Snowpipe Streaming in the next section.
Link Your Capture
In the “Source Collections” section, toggle on “Delta Updates” to be the default setting for all newly added collections.
💡 Delta updates is an append-only materialization mode that cleanly fits with Snowpipe Streaming’s low latency streaming capabilities. All bindings in your Snowflake materialization that use delta updates automatically ingest data via Snowpipe Streaming on the back end.
Click Source from Capture and select the PostgreSQL capture you created in Step 2. Estuary will automatically link the collections and propose a mapping to Snowflake tables. You can also toggle delta updates mode individually for each binding here.
Click Next, then Save and Publish. Watch the deployment logs — a successful publish will show the connector handshake completing and the first materialization batch starting.
Verify the Pipeline
Once the materialization is deployed, open Snowflake and run:
USE DATABASE ESTUARY_DB;
USE SCHEMA ESTUARY_SCHEMA;
-- Check that the table exists and has data
SELECT count(*) FROM products;
-- Inspect a sample of rows
SELECT * FROM products LIMIT 10;
-- Watch rows arriving in real time
SELECT count(*) FROM products; -- run this repeatedly
You should see the row count climbing as the datagen service continues writing to Postgres. The Estuary dashboard's collection view shows the change stream in real time; the Snowflake side reflects it after the next sync interval.
🔬 Want to verify a full end-to-end delete? Run DELETE FROM products WHERE id = 1; against Postgres and watch Estuary propagate the delete to Snowflake after the next sync. Batch pipelines miss these entirely — CDC captures every one.
What Happens When Your Schema Changes?
One of the reasons teams avoid CDC is fear of schema drift breaking the pipeline. Estuary handles this automatically. Here's what happens in practice:
• New column added: Estuary detects it via schema auto-discovery, updates the collection schema, and adds the column to the Snowflake table. Existing rows show NULL for the new column.
• Column type widened (e.g. INT to BIGINT): Handled automatically. Estuary updates the schema without interruption.
• Breaking change (column dropped, primary key changed): Estuary creates a new versioned collection (e.g. products_v2) rather than corrupting the existing one. You can choose when to cut over.
All three toggles on the capture's Collections tab control this behavior. The defaults are safe for most teams.
Cleanup
Postgres and ngrok
# Stop and remove containers
docker compose down -v
# Terminate ngrok (Ctrl+C in the ngrok terminal)
Snowflake
-- Run in Snowflake to remove all resources created by this tutorial
REVOKE ALL PRIVILEGES ON SCHEMA ESTUARY_SCHEMA FROM ROLE ESTUARY_ROLE;
REVOKE ALL PRIVILEGES ON DATABASE ESTUARY_DB FROM ROLE ESTUARY_ROLE;
DROP WAREHOUSE IF EXISTS ESTUARY_WH;
DROP USER IF EXISTS ESTUARY_USER;
DROP ROLE IF EXISTS ESTUARY_ROLE;
DROP SCHEMA IF EXISTS ESTUARY_SCHEMA;
DROP DATABASE IF EXISTS ESTUARY_DB;
Estuary
From the Estuary dashboard, disable or delete the capture and materialization you created.
How Does This Compare to Other Approaches?
The CDC pipeline you just built is one of several ways to get data from PostgreSQL into Snowflake. Here's how the major methods compare:
| Method | Latency | Captures Deletes | Schema Evolution | Setup Complexity | Best For |
|---|---|---|---|---|---|
| Manual CSV/S3 + COPY INTO | Hours to days (scheduled) | No. Requires full reload or custom tracking. | Manual. You manage column changes yourself. | Low initial, high ongoing maintenance. | One-time migrations or very small datasets. |
| Batch ETL (Airflow, custom scripts) | Minutes to hours (schedule-dependent) | Only with soft-delete flags or full reloads. | Manual. DAG changes required per schema change. | Medium. Requires Airflow infrastructure and DAG maintenance. | Scheduled reporting where freshness is not critical. |
| Snowflake Openflow (PostgreSQL connector) | Near real-time (CDC via WAL). | Yes. Captures INSERT, UPDATE, DELETE via logical replication. | Tracks DDL changes and replicates downstream. | Medium-high. Requires Openflow runtime setup (Snowflake Deployment or BYOC), Apache NiFi familiarity helps. Enterprise account required. | Teams standardized on Snowflake wanting a native, single-vendor CDC path. |
| Fivetran | 1 min to hours (plan-dependent). 1-min sync requires Enterprise or Business Critical plan. | Yes, with logical replication. Query-Based mode may miss short-lived rows. | Automatic schema migration, though some edge cases (e.g., ADD COLUMN SET DEFAULT) require manual re-sync. | Low. Fully managed, point-and-click setup. | Teams wanting managed simplicity and a large connector catalog, with budget for consumption-based pricing. |
| Estuary | Sub-second to configurable intervals. Snowpipe Streaming enabled by default for delta updates. | Yes. Captures every INSERT, UPDATE, DELETE from the WAL, including hard deletes. | Automatic. New columns propagate, type widenings handled, breaking changes create versioned collections. | Low. Web UI setup, no infrastructure to manage. | Real-time analytics, operational use cases, teams needing low-latency CDC with Snowpipe Streaming and cost control via sync frequency. |
A few things worth noting:
- Snowflake's Openflow connector is relatively new. It replaces the older Snowflake Connector for PostgreSQL (now in maintenance mode). Openflow is built on Apache NiFi and uses Snowpipe Streaming for ingestion. It's a strong option if you want everything inside Snowflake's ecosystem, but it requires an Enterprise account and Openflow runtime configuration.
- Fivetran's 1-minute sync frequency is only available on Enterprise and Business Critical plans. On lower tiers, the minimum is 5 minutes. Their PostgreSQL connector supports both logical replication and a Query-Based fallback, but Query-Based mode can miss rows that exist for less than the sync interval.
- Estuary's sync frequency is configurable from 0 seconds (continuous via Snowpipe Streaming) to 24 hours, on any plan. The tutorial above uses 0s for demonstration, but production deployments typically use 30 minutes or longer to control Snowflake warehouse credits.
- The manual CSV/S3 approach is essentially free in tooling costs but scales poorly. It's viable for a one-time migration but not for ongoing replication.
Where to Go From Here
You now have a working CDC pipeline from Postgres to Snowflake. A few directions worth exploring next:
- Connect your real database: Replace the Docker Postgres with your production instance. Estuary supports RDS, Aurora, Cloud SQL, Azure Database for PostgreSQL, Supabase, and Neon out of the box.
- Add more tables: Modify the ALTER PUBLICATION flow_publication ADD TABLE statement and Estuary will auto-discover the new bindings.
- Try read-only mode: If you're connecting to a replica or a database where you can't create the watermarks table, Estuary supports a read-only capture mode. No schema modifications required on the source.
- Explore transformations: Estuary supports inline derivations using TypeScript, Python, or SQL. You can reshape, filter, or join collections before they land in Snowflake.
- Add other destinations: The same Postgres capture can fan out to BigQuery, Redshift, Databricks, or Kafka simultaneously. No duplicate infrastructure.
Check the full Estuary documentation for connector-specific setup guides, and join the Estuary Slack community if you run into questions






Top comments (0)