Overview
Recently, Snowflake acquired Crunchy Data, which provides an enterprise-grade PostgreSQL offering. On top of that awesome news, the team also developed and decided to publish pg_lake, a PostgreSQL extension to manage Iceberg tables. This extension provides write and read capability into Iceberg tables via PostgreSQL.
Given the trend on the open data format, I was curious. If PostgreSQL can write into the Iceberg table, then the true lakehouse architecture can be achieved in a scalable manner. In this article, I want to explain how we can use Snowflake Postgres (recently GA) and Snowflake to manage the entire data lakehouse architecture.
Here is the high level architecture on the lakehouse we will build

Architecture and Value Proposition
Here are the components involved in this architecture:
- Snowflake Postgres for OLTP writes
-
pg_lakeas the datalake components - AWS S3 as the storage layer
- Snowflake as the powerful data and AI platform
By developing this architecture, we hope to achieve the following:
- True openness using Iceberg format
- Transactional consistency and schema evolution supported by pg_lake
- Workload separation between OLTP (on Postgres) and OLAP (on Snowflake)
- Leverage zero-copy architecture
Prerequisites
- Create a Snowflake account where the Snowflake Postgres is available. You can check the region availability here.
- Create an AWS account since we will use AWS S3 and its IAM services.
- Local client applications like dBeaver to run the PostgreSQL command.
Setup in Snowflake Postgres
We can easily spin up the Postgres instance using SQL. By default, the instance can not be accessed so we need to add the NETWORK_RULE and NETWORK_POLICY to make it accessible from the client endpoint. We will create 2 types of table here: normal Postgres table and another Iceberg table. Get the metadata file path to be registered when creating a table in Snowflake later. Please note the PostgreSQL instance creation may take about 5-10 minutes. Here is the SQL we can use to set this up:
CREATE POSTGRES INSTANCE retail_txns
COMPUTE_FAMILY = 'STANDARD_M'
STORAGE_SIZE_GB = 50
AUTHENTICATION_AUTHORITY = POSTGRES
COMMENT = 'Retail simulation with pg_lake Iceberg integration';
-- This will create 2 users : snowflake_admin and application with the password
-- Copy the password since it will only appear once
-- Get the connection host (use this in psql/dBeaver to connect)
SHOW POSTGRES INSTANCES;
-- Check the instance status, wait until `state` changes to `READY`
DESCRIBE POSTGRES INSTANCE retail_txns;
-- The network setup will need a database to attach into
CREATE DATABASE IF NOT EXISTS RETAIL_ANALYTICS;
CREATE SCHEMA IF NOT EXISTS RETAIL_ANALYTICS.NETWORK;
CREATE OR REPLACE NETWORK RULE RETAIL_ANALYTICS.NETWORK.PG_RETAIL_INGRESS
TYPE = IPV4
VALUE_LIST = ('0.0.0.0/0') -- need to change this for production environment
MODE = POSTGRES_INGRESS;
CREATE OR REPLACE NETWORK POLICY PG_RETAIL_ALLOW_ALL
ALLOWED_NETWORK_RULE_LIST = ('RETAIL_ANALYTICS.NETWORK.PG_RETAIL_INGRESS')
COMMENT = 'Allow connections to retail_txns Postgres instance';
-- Attach the network policy to the instance
-- (changes may take up to 2 minutes to take effect)
ALTER POSTGRES INSTANCE retail_txns
SET NETWORK_POLICY = 'PG_RETAIL_ALLOW_ALL';
You should now be able to connect to your PostgreSQL instance using psql or tools like dBeaver. The successful creation will look like this in the Snowsight.
Attach Storage Integration to Postgres
Once the instance is ready, we need to create and attach a STORAGE_INTEGRATION to inform Postgres that it can write and read the Iceberg table into the specified S3-compatible bucket. The type is called POSTGRES_EXTERNAL_STORAGE. We need to create the role in the AWS. After login to AWS Console, go to (or search for) IAM (Identity and Access Management) menu. We will need to create a new role to register this storage to Postgres.
We need to go to the AWS Console to create the Policy which then will be attached into the Role. Go to IAM → Policies. You can also follow the documentation here.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:GetObjectVersion",
"s3:DeleteObject",
"s3:DeleteObjectVersion"
],
"Resource": "arn:aws:s3:::retail-lake-ap3/*"
},
{
"Effect": "Allow",
"Action": [
"s3:ListBucket",
"s3:GetBucketLocation"
],
"Resource": "arn:aws:s3:::retail-lake-ap3"
}
]
}
Next, go to IAM → Roles and create the new role and attach the Policy we just created. Note the role ARN value since we will use this for the Storage Integration setup.
IMPORTANT: Set the IAM role's "Maximum session duration" to 12 hours (default 1 hour). The storage integration will NOT work with the default 1-hour session.
-- This integration type (POSTGRES_EXTERNAL_STORAGE) allows the Postgres
-- instance to write Iceberg data to your own S3 bucket via pg_lake.
CREATE STORAGE INTEGRATION pg_lake_s3_integration
TYPE = POSTGRES_EXTERNAL_STORAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::012345678910:role/snowflake-retail-lake-role'
STORAGE_ALLOWED_LOCATIONS = ('s3://retail-lake-ap3/');
-- Retrieve the Snowflake IAM user ARN and external ID
DESCRIBE STORAGE INTEGRATION pg_lake_s3_integration;
-- Record these values from the output:
-- STORAGE_AWS_IAM_USER_ARN (e.g., arn:aws:iam::<account>:user/snowflake-postgres-integration-management)
-- STORAGE_AWS_EXTERNAL_ID (e.g., ABC12345_SFCRole=1_abcdefg)
Go back to the AWS Console and access the IAM → Policy to edit the configuration.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "<STORAGE_AWS_IAM_USER_ARN from DESCRIBE output>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "<STORAGE_AWS_EXTERNAL_ID from DESCRIBE output>"
}
}
}
]
}
Finally, we can attach this storage integration into the Snowflake Postgres. This will allow Postgres instances to write and read into the S3 bucket we specified above.
ALTER POSTGRES INSTANCE retail_txns
SET STORAGE_INTEGRATION = pg_lake_s3_integration;
Insert Initial Data in Postgres
We can then connect to the Postgres instance using psql or dBeaver to set up the initial data and Iceberg tables.
psql "host=<host_from_SHOW_POSTGRES_INSTANCE_above> port=5432 dbname=postgres user=snowflake_admin sslmode=require"
Once we are inside the instance, we can run the following commands:
-- Create pg_lake extension (installs all sub-extensions)
CREATE EXTENSION IF NOT EXISTS pg_lake CASCADE;
-- Set the default Iceberg storage location (persistent across all sessions)
ALTER DATABASE postgres
SET pg_lake_iceberg.default_location_prefix = 's3://retail-lake-ap3/';
-- Apply to current session immediately
SET pg_lake_iceberg.default_location_prefix TO 's3://retail-lake-ap3/';
-- Verify
SHOW pg_lake_iceberg.default_location_prefix;
-- Expected: s3://retail-lake-ap3/frompg/tables/
We can put some initial data in the database using the familiar DDL.
-- Create retail OLTP tables (standard heap tables for fast inserts)
-- Stores
CREATE TABLE IF NOT EXISTS stores (
store_id SERIAL PRIMARY KEY,
store_name TEXT NOT NULL,
city TEXT NOT NULL,
state TEXT NOT NULL,
opened_at TIMESTAMP DEFAULT now()
);
-- Products
CREATE TABLE IF NOT EXISTS products (
product_id SERIAL PRIMARY KEY,
product_name TEXT NOT NULL,
category TEXT NOT NULL,
unit_price NUMERIC(10,2) NOT NULL,
created_at TIMESTAMP DEFAULT now()
);
-- Customers
CREATE TABLE IF NOT EXISTS customers (
customer_id SERIAL PRIMARY KEY,
first_name TEXT NOT NULL,
last_name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
loyalty_tier TEXT DEFAULT 'BRONZE',
created_at TIMESTAMP DEFAULT now()
);
-- Transactions (header)
CREATE TABLE IF NOT EXISTS transactions (
txn_id SERIAL PRIMARY KEY,
store_id INT NOT NULL REFERENCES stores(store_id),
customer_id INT REFERENCES customers(customer_id),
txn_timestamp TIMESTAMP NOT NULL DEFAULT now(),
total_amount NUMERIC(12,2) NOT NULL DEFAULT 0,
payment_method TEXT NOT NULL DEFAULT 'CARD',
synced_at TIMESTAMP -- NULL means not yet synced to Iceberg
);
-- Transaction line items
CREATE TABLE IF NOT EXISTS transaction_items (
item_id SERIAL PRIMARY KEY,
txn_id INT NOT NULL REFERENCES transactions(txn_id),
product_id INT NOT NULL REFERENCES products(product_id),
quantity INT NOT NULL DEFAULT 1,
unit_price NUMERIC(10,2) NOT NULL,
line_total NUMERIC(12,2) NOT NULL,
synced_at TIMESTAMP
);
-- Indexes for efficient sync queries (only scan unsynced rows)
CREATE INDEX IF NOT EXISTS idx_txn_synced ON transactions(synced_at) WHERE synced_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_items_synced ON transaction_items(synced_at) WHERE synced_at IS NULL;
-- Seed reference data
INSERT INTO stores (store_name, city, state) VALUES
('Downtown Flagship', 'Seattle', 'WA'),
('Mall of Pacific', 'San Francisco', 'CA'),
('Eastside Express', 'Bellevue', 'WA'),
('Harbor Point', 'Portland', 'OR'),
('Valley Center', 'San Jose', 'CA');
INSERT INTO products (product_name, category, unit_price) VALUES
('Organic Coffee Beans 1lb', 'Grocery', 14.99),
('Wireless Earbuds Pro', 'Electronics', 79.99),
('Cotton T-Shirt', 'Apparel', 24.99),
('Stainless Water Bottle', 'Home', 19.99),
('Running Shoes', 'Footwear', 129.99),
('Yoga Mat Premium', 'Fitness', 39.99),
('LED Desk Lamp', 'Electronics', 44.99),
('Organic Protein Bar 12pk', 'Grocery', 29.99),
('Backpack 30L', 'Accessories', 59.99),
('Sunscreen SPF50', 'Personal Care', 12.99),
('Bluetooth Speaker', 'Electronics', 49.99),
('Trail Mix 2lb', 'Grocery', 9.99),
('Insulated Lunch Bag', 'Home', 22.99),
('Fitness Tracker Band', 'Electronics', 89.99),
('Bamboo Cutting Board', 'Home', 17.99);
INSERT INTO customers (first_name, last_name, email, loyalty_tier) VALUES
('Alice', 'Chen', 'alice.chen@example.com', 'GOLD'),
('Bob', 'Martinez', 'bob.martinez@example.com', 'SILVER'),
('Carol', 'Johnson', 'carol.j@example.com', 'BRONZE'),
('David', 'Kim', 'david.kim@example.com', 'GOLD'),
('Eve', 'Patel', 'eve.patel@example.com', 'PLATINUM'),
('Frank', 'Lopez', 'frank.lopez@example.com', 'BRONZE'),
('Grace', 'Wang', 'grace.wang@example.com', 'SILVER'),
('Hank', 'Brown', 'hank.brown@example.com', 'BRONZE'),
('Iris', 'Davis', 'iris.davis@example.com', 'GOLD'),
('Jack', 'Wilson', 'jack.wilson@example.com', 'SILVER');
We will then continue to create the Iceberg table. Since we already set the storage integration, creating an Iceberg table is straightforward.
CREATE TABLE iceberg_transactions (
txn_id INT,
store_id INT,
customer_id INT,
txn_timestamp TIMESTAMP,
total_amount NUMERIC(12,2),
payment_method TEXT,
synced_at TIMESTAMP
) USING iceberg;
CREATE TABLE iceberg_transaction_items (
item_id INT,
txn_id INT,
product_id INT,
quantity INT,
unit_price NUMERIC(10,2),
line_total NUMERIC(12,2),
synced_at TIMESTAMP
) USING iceberg;
-- Dimension tables (Iceberg copies for fully self-contained analytics on Snowflake)
CREATE TABLE iceberg_stores (
store_id INT,
store_name TEXT,
city TEXT,
state TEXT,
opened_at TIMESTAMP
) USING iceberg;
CREATE TABLE iceberg_products (
product_id INT,
product_name TEXT,
category TEXT,
unit_price NUMERIC(10,2),
created_at TIMESTAMP
) USING iceberg;
CREATE TABLE iceberg_customers (
customer_id INT,
first_name TEXT,
last_name TEXT,
email TEXT,
loyalty_tier TEXT,
created_at TIMESTAMP
) USING iceberg;
-- Seed dimension data into Iceberg (one-time)
INSERT INTO iceberg_stores SELECT * FROM stores;
INSERT INTO iceberg_products SELECT * FROM products;
INSERT INTO iceberg_customers SELECT * FROM customers;
At this point, we will have 10 tables with 5 base tables and 5 iceberg tables (it’s called foreign tables in PostgreSQL).

Once we create the Iceberg tables inside the Postgres, we will need to get the metadata path.
SELECT table_name, metadata_location FROM iceberg_tables;
-- Example metadata_location:
-- s3://retail-lake-ap3/frompg/tables/postgres/public/iceberg_transactions/
-- <OID>/metadata/00000-xxxx.metadata.json
Setup in Snowflake
The setup on the Snowflake side has 2 parts. One is to register the metadata path since the open format like Iceberg uses this to capture the latest information of the tables. The second is to register the data path itself.
Attach External Volume
Setup different external volumes to enable us to read the data. This is a critical component to support open-format tables. We will use the task in the next section to read the latest metadata periodically. This can be improved to run automatically if you use other data catalogue tools. We can use this command to set this up.
CREATE OR REPLACE EXTERNAL VOLUME retail_lake_volume
STORAGE_LOCATIONS = (
(
NAME = 'retail-s3-ap3'
STORAGE_PROVIDER = 'S3'
STORAGE_BASE_URL = 's3://retail-lake-ap3/'
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::012345678910:role/snowflake-retail-lake-role'
)
);
-- Retrieve the Snowflake IAM user ARN and external ID
DESCRIBE EXTERNAL VOLUME retail_lake_volume;
-- Copy the STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID from the output
We will need to update the IAM Policy again.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": [
"<STORAGE_AWS_IAM_USER_ARN from DESCRIBE STORAGE INTEGRATION pg_lake_s3_integration>",
"<STORAGE_AWS_IAM_USER_ARN from DESCRIBE EXTERNAL VOLUME retail_lake_volume>"
]
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": [
"<STORAGE_AWS_EXTERNAL_ID from DESCRIBE STORAGE INTEGRATION pg_lake_s3_integration>",
"<STORAGE_AWS_EXTERNAL_ID from DESCRIBE EXTERNAL VOLUME retail_lake_volume>"
]
}
}
}
]
}
Once we save this, we can check to verify the connection.
-- Verify connectivity
SELECT SYSTEM$VERIFY_EXTERNAL_VOLUME('retail_lake_volume');
Attach Catalog and Metadata Path for Iceberg Data
Setup external volume and catalog integration to enable Snowflake to read the actual parquet data in the S3-compatible bucket. We will need to put the metadata file path created during the Postgres setup.
CREATE DATABASE IF NOT EXISTS RETAIL_ANALYTICS;
CREATE SCHEMA IF NOT EXISTS RETAIL_ANALYTICS.LAKEHOUSE;
USE DATABASE RETAIL_ANALYTICS;
USE SCHEMA RETAIL_ANALYTICS.LAKEHOUSE;
CREATE OR REPLACE CATALOG INTEGRATION pg_lake_catalog
CATALOG_SOURCE = OBJECT_STORE
TABLE_FORMAT = ICEBERG
ENABLED = TRUE;
Since pg_lake manages the Iceberg metadata (not Snowflake), we use a catalog integration for object storage files, and point each table to its metadata file using METADATA_FILE_PATH. This path is relative to the external volume's STORAGE_BASE_URL. Given STORAGE_BASE_URL = 's3://retail-lake-ap3/', and a metadata_location will look like this
s3://retail-lake-ap3/frompg/tables/postgres/public/iceberg_transactions/<OID>/metadata/00000-xxxx.metadata.json. The METADATA_FILE_PATH would be
frompg/tables/postgres/public/iceberg_transactions/<OID>/metadata/00000-xxxx.metadata.json
We will then create the Iceberg table in Snowflake using the metadata file path for each table.
-- Transactions fact table
-- Replace METADATA_FILE_PATH with actual path from: SELECT metadata_location FROM iceberg_tables WHERE table_name = 'iceberg_transactions';
CREATE OR REPLACE ICEBERG TABLE iceberg_transactions
EXTERNAL_VOLUME = 'retail_lake_volume'
CATALOG = 'pg_lake_catalog'
METADATA_FILE_PATH = 'frompg/tables/postgres/public/iceberg_transactions/<OID>/metadata/00000-xxxx.metadata.json';
-- Repeat for another tables with different iceberg table name
At this point, the Iceberg tables created in the Postgres instance are already accessible in Snowflake. We can check this in the Database Explorer to confirm.

However, any update in the Iceberg tables will not be reflected since we don’t have the mechanism to refresh the metadata. Therefore, we will need to create a task to check for the update periodically.
-- ---- Storage integration for stages ----
CREATE OR REPLACE STORAGE INTEGRATION iceberg_stage_s3_integration
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::01234578910:role/snowflake-retail-lake-role'
STORAGE_ALLOWED_LOCATIONS = ('s3://retail-lake-ap3/frompg/tables/');
-- Retrieve the IAM user ARN and external ID for this integration.
-- Add these to your IAM role trust policy
DESCRIBE STORAGE INTEGRATION iceberg_stage_s3_integration;
We will need to update the IAM Policy again for the last time. This is the final look of the IAM Policy for this architecture.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": [
"<STORAGE_AWS_IAM_USER_ARN from DESCRIBE STORAGE INTEGRATION pg_lake_s3_integration>",
"<STORAGE_AWS_IAM_USER_ARN from DESCRIBE EXTERNAL VOLUME retail_lake_volume>",
"<STORAGE_AWS_IAM_USER_ARN from DESCRIBE STORAGE INTEGRATION iceberg_stage_s3_integration>"
]
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": [
"<STORAGE_AWS_EXTERNAL_ID from DESCRIBE STORAGE INTEGRATION pg_lake_s3_integration>",
"<STORAGE_AWS_EXTERNAL_ID from DESCRIBE EXTERNAL VOLUME retail_lake_volume>",
"<STORAGE_AWS_EXTERNAL_ID from DESCRIBE STORAGE INTEGRATION iceberg_stage_s3_integration>"
]
}
}
}
]
}
We will create the EXTERNAL STAGE pointing to the metadata folder in S3
-- Directory stages (one per Iceberg table's metadata directory
CREATE OR REPLACE STAGE stg_meta_transactions
URL = 's3://retail-lake-ap3/frompg/tables/postgres/public/iceberg_transactions/<OID>/metadata/'
STORAGE_INTEGRATION = iceberg_stage_s3_integration
DIRECTORY = (ENABLE = TRUE);
-- Repeat for another tables using different STAGE name
There will be 5 stages created in total

To enable metadata changes in the stage, we can use the combination of Task, Stream and Stored Procedure in Snowflake.
-- Create stream to monitor changes
CREATE OR REPLACE STREAM stream_meta_transactions ON STAGE stg_meta_transactions;
-- repeat for another stage
-- Create Procedure to refresh the iceberg tables
CREATE OR REPLACE PROCEDURE sp_refresh_iceberg_transactions()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
DECLARE
latest_path VARCHAR;
BEGIN
SELECT RELATIVE_PATH INTO :latest_path
FROM DIRECTORY(@stg_meta_transactions)
WHERE RELATIVE_PATH LIKE '%.metadata.json'
ORDER BY LAST_MODIFIED DESC
LIMIT 1;
EXECUTE IMMEDIATE
'ALTER ICEBERG TABLE iceberg_transactions REFRESH '''
|| 'frompg/tables/postgres/public/iceberg_transactions/<OID>/metadata/' || latest_path || '''';
RETURN latest_path;
END;
$$;
-- repeat for another iceberg tables
-- Create task to do this periodically
CREATE OR REPLACE TASK task_refresh_stage_transactions
WAREHOUSE = COMPUTE_WH
SCHEDULE = '2 MINUTES'
AS
ALTER STAGE stg_meta_transactions REFRESH;
CREATE OR REPLACE TASK task_refresh_iceberg_transactions
WAREHOUSE = COMPUTE_WH
AFTER task_refresh_stage_transactions
WHEN SYSTEM$STREAM_HAS_DATA('stream_meta_transactions')
AS
CALL sp_refresh_iceberg_transactions();
-- create the same task pattern for other tables
-- Start the task
ALTER TASK task_refresh_iceberg_transactions RESUME;
ALTER TASK task_refresh_stage_transactions RESUME;
-- Stop the task
-- ALTER TASK task_refresh_iceberg_transactions SUSPEND;
-- ALTER TASK task_refresh_stage_transactions SUSPEND;
-- We can also trigger this manually
ALTER STAGE stg_meta_transactions REFRESH;
CALL sp_refresh_iceberg_transactions();
All of these will be available via the Database Explorer
Simulate the Transaction
Feel free to use or modify the sample python code below.
import argparse
import random
import time
import sys
from datetime import datetime
import psycopg2
from psycopg2.extras import execute_values
# Retail simulation constants
PAYMENT_METHODS = ["CARD", "CASH", "DIGITAL_WALLET", "GIFT_CARD"]
ITEMS_PER_TXN_RANGE = (1, 5) # each transaction has 1-5 line items
def get_reference_data(conn):
"""Load store, product, and customer IDs from the database."""
with conn.cursor() as cur:
cur.execute("SELECT store_id FROM stores")
store_ids = [r[0] for r in cur.fetchall()]
cur.execute("SELECT product_id, unit_price FROM products")
products = [(r[0], float(r[1])) for r in cur.fetchall()]
cur.execute("SELECT customer_id FROM customers")
customer_ids = [r[0] for r in cur.fetchall()]
return store_ids, products, customer_ids
def generate_transaction(store_ids, products, customer_ids):
"""Generate a single random retail transaction with line items."""
store_id = random.choice(store_ids)
# 90% of transactions have a known customer, 10% are anonymous
customer_id = random.choice(customer_ids) if random.random() < 0.9 else None
payment = random.choice(PAYMENT_METHODS)
num_items = random.randint(*ITEMS_PER_TXN_RANGE)
items = []
total = 0.0
selected_products = random.sample(products, min(num_items, len(products)))
for product_id, unit_price in selected_products:
qty = random.randint(1, 3)
line_total = round(unit_price * qty, 2)
total += line_total
items.append((product_id, qty, unit_price, line_total))
return store_id, customer_id, round(total, 2), payment, items
def insert_batch(conn, batch):
"""Insert a batch of transactions and their line items using bulk operations."""
with conn.cursor() as cur:
# Bulk-insert all transactions in one round-trip
txn_rows = [
(store_id, customer_id, total, payment)
for store_id, customer_id, total, payment, _items in batch
]
result = execute_values(
cur,
"""
INSERT INTO transactions (store_id, customer_id, total_amount, payment_method)
VALUES %s
RETURNING txn_id
""",
txn_rows,
fetch=True,
)
txn_ids = [r[0] for r in result]
# Collect all line items across every transaction
all_items = []
for txn_id, (_sid, _cid, _tot, _pay, items) in zip(txn_ids, batch):
for pid, qty, up, lt in items:
all_items.append((txn_id, pid, qty, up, lt))
# Bulk-insert all line items in one round-trip
if all_items:
execute_values(
cur,
"""
INSERT INTO transaction_items (txn_id, product_id, quantity, unit_price, line_total)
VALUES %s
""",
all_items,
)
conn.commit()
def trigger_sync(conn):
"""Manually trigger the Iceberg sync (CTE chains INSERT then UPDATE atomically)."""
with conn.cursor() as cur:
cur.execute(
"WITH synced AS ("
"INSERT INTO iceberg_transactions "
"SELECT txn_id, store_id, customer_id, txn_timestamp, "
"total_amount, payment_method, now() "
"FROM transactions WHERE synced_at IS NULL "
"RETURNING txn_id) "
"UPDATE transactions SET synced_at = now() "
"WHERE txn_id IN (SELECT txn_id FROM synced)"
)
cur.execute(
"WITH synced AS ("
"INSERT INTO iceberg_transaction_items "
"SELECT item_id, txn_id, product_id, quantity, "
"unit_price, line_total, now() "
"FROM transaction_items WHERE synced_at IS NULL "
"RETURNING item_id) "
"UPDATE transaction_items SET synced_at = now() "
"WHERE item_id IN (SELECT item_id FROM synced)"
)
conn.commit()
def print_stats(conn):
"""Print current counts from both heap and Iceberg tables."""
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM transactions")
heap_txn = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM transactions WHERE synced_at IS NOT NULL")
synced_txn = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM iceberg_transactions")
ice_txn = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM transaction_items")
heap_items = cur.fetchone()[0]
cur.execute("SELECT COUNT(*) FROM iceberg_transaction_items")
ice_items = cur.fetchone()[0]
print(f" Heap transactions: {heap_txn} (synced: {synced_txn})")
print(f" Iceberg transactions: {ice_txn}")
print(f" Heap line items: {heap_items}")
print(f" Iceberg line items: {ice_items}")
def main():
parser = argparse.ArgumentParser(description="Transaction simulator")
parser.add_argument("--host", required=True, help="Postgres host ")
parser.add_argument("--port", type=int, default=5432)
parser.add_argument("--dbname", default="postgres")
parser.add_argument("--user", default="snowflake_admin")
parser.add_argument("--password", required=True, help="Postgres admin password")
parser.add_argument(
"--transactions",
type=int,
default=5000,
help="Total number of transactions to generate",
)
parser.add_argument(
"--batch-size",
type=int,
default=500,
help="Number of transactions per commit batch",
)
parser.add_argument(
"--sync-every",
type=int,
default=500,
help="Trigger manual Iceberg sync after this many transactions (0 = rely on pg_cron only)",
)
args = parser.parse_args()
conn_params = dict(
host=args.host,
port=args.port,
dbname=args.dbname,
user=args.user,
password=args.password,
sslmode="require",
)
conn = psycopg2.connect(**conn_params)
conn.autocommit = False
print(f"Connected to {args.host}:{args.port}/{args.dbname}")
store_ids, products, customer_ids = get_reference_data(conn)
print(
f"Loaded reference data: {len(store_ids)} stores, "
f"{len(products)} products, {len(customer_ids)} customers"
)
total_generated = 0
start_time = time.time()
print(f"\nGenerating {args.transactions} transactions (batch size: {args.batch_size})...\n")
while total_generated < args.transactions:
batch_count = min(args.batch_size, args.transactions - total_generated)
batch = [
generate_transaction(store_ids, products, customer_ids)
for _ in range(batch_count)
]
insert_batch(conn, batch)
total_generated += batch_count
elapsed = time.time() - start_time
tps = total_generated / elapsed if elapsed > 0 else 0
print(
f" [{datetime.now().strftime('%H:%M:%S')}] "
f"{total_generated}/{args.transactions} transactions "
f"({tps:.0f} txn/sec)"
)
# Trigger manual sync at intervals
if args.sync_every > 0 and total_generated % args.sync_every == 0:
print(f" >> Triggering Iceberg sync at {total_generated} transactions...")
sync_start = time.time()
trigger_sync(conn)
print(f" >> Sync completed in {time.time() - sync_start:.2f}s")
# Final sync
print("\nTriggering final Iceberg sync...")
sync_start = time.time()
trigger_sync(conn)
print(f"Final sync completed in {time.time() - sync_start:.2f}s")
elapsed = time.time() - start_time
print(f"\n{'='*60}")
print(f"Simulation complete!")
print(f" Total transactions: {total_generated}")
print(f" Elapsed time: {elapsed:.1f}s")
print(f" Average throughput: {total_generated / elapsed:.0f} txn/sec")
print(f"{'='*60}")
print("\nFinal counts:")
print_stats(conn)
print("\nIceberg metadata locations:")
with conn.cursor() as cur:
cur.execute("SELECT table_name, metadata_location FROM iceberg_tables")
for row in cur.fetchall():
print(f" {row[0]}: {row[1]}")
conn.close()
print("\nDone. Data is now available on both Postgres and Snowflake via shared Iceberg tables.")
if __name__ == "__main__":
main()
The script above will do the following 2 tasks
- Insert transaction data to Postgres
This is the typical
INSERT INTOoperation into your Postgres database. - Trigger periodic data sync to write into Iceberg tables
We will need to run a periodic
INSERT INTOfor the Iceberg tables. Since this table is using columnar-format like parquet, it is much more efficient to do bulk insert rather than individual insert like we do for OLTP. I did a simulation to do bulk insert into Iceberg tables for every 500-1000 rows in Postgres and it can write in under 2 seconds consistently.
Trigger metadata and Iceberg table refresh
At this point, Snowflake does not know if the data is recently updated. That's why we need to get the latest metadata information along with the latest Iceberg table information. This is covered in the latest SQL script.
Once the sync is complete, we can run some queries to check if the data is flowing as expected.
-- 1. Check row counts match Postgres Iceberg tables
SELECT 'sf_transactions' AS source, COUNT(*) AS row_count FROM iceberg_transactions
UNION ALL
SELECT 'sf_items', COUNT(*) FROM iceberg_transaction_items
UNION ALL
SELECT 'sf_stores', COUNT(*) FROM iceberg_stores
UNION ALL
SELECT 'sf_products', COUNT(*) FROM iceberg_products
UNION ALL
SELECT 'sf_customers', COUNT(*) FROM iceberg_customers;
-- 2. Revenue by store (proves joins work across Iceberg tables)
SELECT
s.store_name,
COUNT(DISTINCT t.txn_id) AS txn_count,
SUM(t.total_amount) AS revenue
FROM iceberg_transactions t
JOIN iceberg_stores s ON t.store_id = s.store_id
GROUP BY s.store_name
ORDER BY revenue DESC;
Closing
Combining Snowflake Postgres with S3-compatible bucket truly unlocks the lakehouse architecture. Snowflake separates the billing between Snowflake Postgres and the Virtual Warehouse used for analytics. You can check in more details here. We can improve this further by having a data catalogue tool so Snowflake can activate the AUTO_REFRESH attributes in the stage. Hope this helps and feel free to reach out if you have any questions!




Top comments (0)