DEV Community

Cover image for Building a True Lakehouse via Snowflake Postgres and pg_lake
Reza Brianca
Reza Brianca

Posted on

Building a True Lakehouse via Snowflake Postgres and pg_lake

Overview

Recently, Snowflake acquired Crunchy Data, which provides an enterprise-grade PostgreSQL offering. On top of that awesome news, the team also developed and decided to publish pg_lake, a PostgreSQL extension to manage Iceberg tables. This extension provides write and read capability into Iceberg tables via PostgreSQL.

Given the trend on the open data format, I was curious. If PostgreSQL can write into the Iceberg table, then the true lakehouse architecture can be achieved in a scalable manner. In this article, I want to explain how we can use Snowflake Postgres (recently GA) and Snowflake to manage the entire data lakehouse architecture.
Here is the high level architecture on the lakehouse we will build

Lakehouse Architecture

Architecture and Value Proposition

Here are the components involved in this architecture:

  1. Snowflake Postgres for OLTP writes
  2. pg_lake as the datalake components
  3. AWS S3 as the storage layer
  4. Snowflake as the powerful data and AI platform

By developing this architecture, we hope to achieve the following:

  1. True openness using Iceberg format
  2. Transactional consistency and schema evolution supported by pg_lake
  3. Workload separation between OLTP (on Postgres) and OLAP (on Snowflake)
  4. Leverage zero-copy architecture

Prerequisites

  1. Create a Snowflake account where the Snowflake Postgres is available. You can check the region availability here.
  2. Create an AWS account since we will use AWS S3 and its IAM services.
  3. Local client applications like dBeaver to run the PostgreSQL command.

Setup in Snowflake Postgres

We can easily spin up the Postgres instance using SQL. By default, the instance can not be accessed so we need to add the NETWORK_RULE and NETWORK_POLICY to make it accessible from the client endpoint. We will create 2 types of table here: normal Postgres table and another Iceberg table. Get the metadata file path to be registered when creating a table in Snowflake later. Please note the PostgreSQL instance creation may take about 5-10 minutes. Here is the SQL we can use to set this up:

CREATE POSTGRES INSTANCE retail_txns
  COMPUTE_FAMILY = 'STANDARD_M'
  STORAGE_SIZE_GB = 50
  AUTHENTICATION_AUTHORITY = POSTGRES
  COMMENT = 'Retail simulation with pg_lake Iceberg integration';
-- This will create 2 users : snowflake_admin and application with the password
-- Copy the password since it will only appear once

-- Get the connection host (use this in psql/dBeaver to connect)
SHOW POSTGRES INSTANCES;

-- Check the instance status, wait until `state` changes to `READY`
DESCRIBE POSTGRES INSTANCE retail_txns;

-- The network setup will need a database to attach into
CREATE DATABASE IF NOT EXISTS RETAIL_ANALYTICS;
CREATE SCHEMA IF NOT EXISTS RETAIL_ANALYTICS.NETWORK;

CREATE OR REPLACE NETWORK RULE RETAIL_ANALYTICS.NETWORK.PG_RETAIL_INGRESS
  TYPE = IPV4
  VALUE_LIST = ('0.0.0.0/0') -- need to change this for production environment
  MODE = POSTGRES_INGRESS;

CREATE OR REPLACE NETWORK POLICY PG_RETAIL_ALLOW_ALL
  ALLOWED_NETWORK_RULE_LIST = ('RETAIL_ANALYTICS.NETWORK.PG_RETAIL_INGRESS')
  COMMENT = 'Allow connections to retail_txns Postgres instance';

-- Attach the network policy to the instance
-- (changes may take up to 2 minutes to take effect)
ALTER POSTGRES INSTANCE retail_txns
  SET NETWORK_POLICY = 'PG_RETAIL_ALLOW_ALL';
Enter fullscreen mode Exit fullscreen mode

You should now be able to connect to your PostgreSQL instance using psql or tools like dBeaver. The successful creation will look like this in the Snowsight.

Snowflake Postgres Instance

Snowflake Postgres Details.

Attach Storage Integration to Postgres

Once the instance is ready, we need to create and attach a STORAGE_INTEGRATION to inform Postgres that it can write and read the Iceberg table into the specified S3-compatible bucket. The type is called POSTGRES_EXTERNAL_STORAGE. We need to create the role in the AWS. After login to AWS Console, go to (or search for) IAM (Identity and Access Management) menu. We will need to create a new role to register this storage to Postgres.

We need to go to the AWS Console to create the Policy which then will be attached into the Role. Go to IAM → Policies. You can also follow the documentation here.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:PutObject",
        "s3:GetObject",
        "s3:GetObjectVersion",
        "s3:DeleteObject",
        "s3:DeleteObjectVersion"
      ],
      "Resource": "arn:aws:s3:::retail-lake-ap3/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:ListBucket",
        "s3:GetBucketLocation"
      ],
      "Resource": "arn:aws:s3:::retail-lake-ap3"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Next, go to IAM → Roles and create the new role and attach the Policy we just created. Note the role ARN value since we will use this for the Storage Integration setup.

IMPORTANT: Set the IAM role's "Maximum session duration" to 12 hours (default 1 hour). The storage integration will NOT work with the default 1-hour session.

-- This integration type (POSTGRES_EXTERNAL_STORAGE) allows the Postgres
-- instance to write Iceberg data to your own S3 bucket via pg_lake. 

CREATE STORAGE INTEGRATION pg_lake_s3_integration
  TYPE = POSTGRES_EXTERNAL_STORAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::012345678910:role/snowflake-retail-lake-role'
  STORAGE_ALLOWED_LOCATIONS = ('s3://retail-lake-ap3/');

-- Retrieve the Snowflake IAM user ARN and external ID
DESCRIBE STORAGE INTEGRATION pg_lake_s3_integration;
-- Record these values from the output:
--   STORAGE_AWS_IAM_USER_ARN  (e.g., arn:aws:iam::<account>:user/snowflake-postgres-integration-management)
--   STORAGE_AWS_EXTERNAL_ID   (e.g., ABC12345_SFCRole=1_abcdefg) 
Enter fullscreen mode Exit fullscreen mode

Go back to the AWS Console and access the IAM → Policy to edit the configuration.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "<STORAGE_AWS_IAM_USER_ARN from DESCRIBE output>"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "<STORAGE_AWS_EXTERNAL_ID from DESCRIBE output>"
        }
      }
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Finally, we can attach this storage integration into the Snowflake Postgres. This will allow Postgres instances to write and read into the S3 bucket we specified above.

ALTER POSTGRES INSTANCE retail_txns
  SET STORAGE_INTEGRATION = pg_lake_s3_integration;
Enter fullscreen mode Exit fullscreen mode

Insert Initial Data in Postgres

We can then connect to the Postgres instance using psql or dBeaver to set up the initial data and Iceberg tables.

psql "host=<host_from_SHOW_POSTGRES_INSTANCE_above> port=5432 dbname=postgres user=snowflake_admin sslmode=require"
Enter fullscreen mode Exit fullscreen mode

Once we are inside the instance, we can run the following commands:

-- Create pg_lake extension (installs all sub-extensions)
CREATE EXTENSION IF NOT EXISTS pg_lake CASCADE;

-- Set the default Iceberg storage location (persistent across all sessions)
ALTER DATABASE postgres
  SET pg_lake_iceberg.default_location_prefix = 's3://retail-lake-ap3/';

-- Apply to current session immediately
SET pg_lake_iceberg.default_location_prefix TO 's3://retail-lake-ap3/';

-- Verify
SHOW pg_lake_iceberg.default_location_prefix;
-- Expected: s3://retail-lake-ap3/frompg/tables/ 
Enter fullscreen mode Exit fullscreen mode

We can put some initial data in the database using the familiar DDL.

-- Create retail OLTP tables (standard heap tables for fast inserts) 

-- Stores
CREATE TABLE IF NOT EXISTS stores (
    store_id    SERIAL PRIMARY KEY,
    store_name  TEXT NOT NULL,
    city        TEXT NOT NULL,
    state       TEXT NOT NULL,
    opened_at   TIMESTAMP DEFAULT now()
);

-- Products
CREATE TABLE IF NOT EXISTS products (
    product_id    SERIAL PRIMARY KEY,
    product_name  TEXT NOT NULL,
    category      TEXT NOT NULL,
    unit_price    NUMERIC(10,2) NOT NULL,
    created_at    TIMESTAMP DEFAULT now()
);

-- Customers
CREATE TABLE IF NOT EXISTS customers (
    customer_id   SERIAL PRIMARY KEY,
    first_name    TEXT NOT NULL,
    last_name     TEXT NOT NULL,
    email         TEXT UNIQUE NOT NULL,
    loyalty_tier  TEXT DEFAULT 'BRONZE',
    created_at    TIMESTAMP DEFAULT now()
);

-- Transactions (header)
CREATE TABLE IF NOT EXISTS transactions (
    txn_id        SERIAL PRIMARY KEY,
    store_id      INT NOT NULL REFERENCES stores(store_id),
    customer_id   INT REFERENCES customers(customer_id),
    txn_timestamp TIMESTAMP NOT NULL DEFAULT now(),
    total_amount  NUMERIC(12,2) NOT NULL DEFAULT 0,
    payment_method TEXT NOT NULL DEFAULT 'CARD',
    synced_at     TIMESTAMP  -- NULL means not yet synced to Iceberg
);

-- Transaction line items
CREATE TABLE IF NOT EXISTS transaction_items (
    item_id       SERIAL PRIMARY KEY,
    txn_id        INT NOT NULL REFERENCES transactions(txn_id),
    product_id    INT NOT NULL REFERENCES products(product_id),
    quantity      INT NOT NULL DEFAULT 1,
    unit_price    NUMERIC(10,2) NOT NULL,
    line_total    NUMERIC(12,2) NOT NULL,
    synced_at     TIMESTAMP
);

-- Indexes for efficient sync queries (only scan unsynced rows)
CREATE INDEX IF NOT EXISTS idx_txn_synced ON transactions(synced_at) WHERE synced_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_items_synced ON transaction_items(synced_at) WHERE synced_at IS NULL;

-- Seed reference data 

INSERT INTO stores (store_name, city, state) VALUES
    ('Downtown Flagship', 'Seattle', 'WA'),
    ('Mall of Pacific', 'San Francisco', 'CA'),
    ('Eastside Express', 'Bellevue', 'WA'),
    ('Harbor Point', 'Portland', 'OR'),
    ('Valley Center', 'San Jose', 'CA');

INSERT INTO products (product_name, category, unit_price) VALUES
    ('Organic Coffee Beans 1lb',   'Grocery',     14.99),
    ('Wireless Earbuds Pro',       'Electronics', 79.99),
    ('Cotton T-Shirt',             'Apparel',     24.99),
    ('Stainless Water Bottle',     'Home',        19.99),
    ('Running Shoes',              'Footwear',    129.99),
    ('Yoga Mat Premium',           'Fitness',     39.99),
    ('LED Desk Lamp',              'Electronics', 44.99),
    ('Organic Protein Bar 12pk',   'Grocery',     29.99),
    ('Backpack 30L',               'Accessories', 59.99),
    ('Sunscreen SPF50',            'Personal Care', 12.99),
    ('Bluetooth Speaker',          'Electronics', 49.99),
    ('Trail Mix 2lb',              'Grocery',      9.99),
    ('Insulated Lunch Bag',        'Home',        22.99),
    ('Fitness Tracker Band',       'Electronics', 89.99),
    ('Bamboo Cutting Board',       'Home',        17.99);

INSERT INTO customers (first_name, last_name, email, loyalty_tier) VALUES
    ('Alice',   'Chen',     'alice.chen@example.com',    'GOLD'),
    ('Bob',     'Martinez', 'bob.martinez@example.com',  'SILVER'),
    ('Carol',   'Johnson',  'carol.j@example.com',       'BRONZE'),
    ('David',   'Kim',      'david.kim@example.com',     'GOLD'),
    ('Eve',     'Patel',    'eve.patel@example.com',     'PLATINUM'),
    ('Frank',   'Lopez',    'frank.lopez@example.com',   'BRONZE'),
    ('Grace',   'Wang',     'grace.wang@example.com',    'SILVER'),
    ('Hank',    'Brown',    'hank.brown@example.com',    'BRONZE'),
    ('Iris',    'Davis',    'iris.davis@example.com',    'GOLD'),
    ('Jack',    'Wilson',   'jack.wilson@example.com',   'SILVER');
Enter fullscreen mode Exit fullscreen mode

We will then continue to create the Iceberg table. Since we already set the storage integration, creating an Iceberg table is straightforward.

CREATE TABLE iceberg_transactions (
    txn_id         INT,
    store_id       INT,
    customer_id    INT,
    txn_timestamp  TIMESTAMP,
    total_amount   NUMERIC(12,2),
    payment_method TEXT,
    synced_at      TIMESTAMP
) USING iceberg;

CREATE TABLE iceberg_transaction_items (
    item_id      INT,
    txn_id       INT,
    product_id   INT,
    quantity     INT,
    unit_price   NUMERIC(10,2),
    line_total   NUMERIC(12,2),
    synced_at    TIMESTAMP
) USING iceberg;

-- Dimension tables (Iceberg copies for fully self-contained analytics on Snowflake)
CREATE TABLE iceberg_stores (
    store_id    INT,
    store_name  TEXT,
    city        TEXT,
    state       TEXT,
    opened_at   TIMESTAMP
) USING iceberg;

CREATE TABLE iceberg_products (
    product_id    INT,
    product_name  TEXT,
    category      TEXT,
    unit_price    NUMERIC(10,2),
    created_at    TIMESTAMP
) USING iceberg;

CREATE TABLE iceberg_customers (
    customer_id   INT,
    first_name    TEXT,
    last_name     TEXT,
    email         TEXT,
    loyalty_tier  TEXT,
    created_at    TIMESTAMP
) USING iceberg;

-- Seed dimension data into Iceberg (one-time)
INSERT INTO iceberg_stores    SELECT * FROM stores;
INSERT INTO iceberg_products  SELECT * FROM products;
INSERT INTO iceberg_customers SELECT * FROM customers;
Enter fullscreen mode Exit fullscreen mode

At this point, we will have 10 tables with 5 base tables and 5 iceberg tables (it’s called foreign tables in PostgreSQL).

Base and foreign tables

Once we create the Iceberg tables inside the Postgres, we will need to get the metadata path.

SELECT table_name, metadata_location FROM iceberg_tables;

-- Example metadata_location:
--   s3://retail-lake-ap3/frompg/tables/postgres/public/iceberg_transactions/
--     <OID>/metadata/00000-xxxx.metadata.json 
Enter fullscreen mode Exit fullscreen mode

The data will look like this

Metadata Path

Setup in Snowflake

The setup on the Snowflake side has 2 parts. One is to register the metadata path since the open format like Iceberg uses this to capture the latest information of the tables. The second is to register the data path itself.

Attach External Volume

Setup different external volumes to enable us to read the data. This is a critical component to support open-format tables. We will use the task in the next section to read the latest metadata periodically. This can be improved to run automatically if you use other data catalogue tools. We can use this command to set this up.

CREATE OR REPLACE EXTERNAL VOLUME retail_lake_volume
  STORAGE_LOCATIONS = (
    (
      NAME                    = 'retail-s3-ap3'
      STORAGE_PROVIDER        = 'S3'
      STORAGE_BASE_URL        = 's3://retail-lake-ap3/'
      STORAGE_AWS_ROLE_ARN    = 'arn:aws:iam::012345678910:role/snowflake-retail-lake-role'
    )
  );

-- Retrieve the Snowflake IAM user ARN and external ID
DESCRIBE EXTERNAL VOLUME retail_lake_volume;
-- Copy the STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID from the output 
Enter fullscreen mode Exit fullscreen mode

We will need to update the IAM Policy again.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": [
          "<STORAGE_AWS_IAM_USER_ARN from DESCRIBE STORAGE INTEGRATION pg_lake_s3_integration>",
          "<STORAGE_AWS_IAM_USER_ARN from DESCRIBE EXTERNAL VOLUME retail_lake_volume>"
        ]
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": [
            "<STORAGE_AWS_EXTERNAL_ID from DESCRIBE STORAGE INTEGRATION pg_lake_s3_integration>",
            "<STORAGE_AWS_EXTERNAL_ID from DESCRIBE EXTERNAL VOLUME retail_lake_volume>"
          ]
        }
      }
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Once we save this, we can check to verify the connection.

-- Verify connectivity
SELECT SYSTEM$VERIFY_EXTERNAL_VOLUME('retail_lake_volume');
Enter fullscreen mode Exit fullscreen mode

Attach Catalog and Metadata Path for Iceberg Data

Setup external volume and catalog integration to enable Snowflake to read the actual parquet data in the S3-compatible bucket. We will need to put the metadata file path created during the Postgres setup.

CREATE DATABASE IF NOT EXISTS RETAIL_ANALYTICS;
CREATE SCHEMA IF NOT EXISTS RETAIL_ANALYTICS.LAKEHOUSE;
USE DATABASE RETAIL_ANALYTICS;
USE SCHEMA RETAIL_ANALYTICS.LAKEHOUSE;

CREATE OR REPLACE CATALOG INTEGRATION pg_lake_catalog
  CATALOG_SOURCE = OBJECT_STORE
  TABLE_FORMAT = ICEBERG
  ENABLED = TRUE;
Enter fullscreen mode Exit fullscreen mode

Since pg_lake manages the Iceberg metadata (not Snowflake), we use a catalog integration for object storage files, and point each table to its metadata file using METADATA_FILE_PATH. This path is relative to the external volume's STORAGE_BASE_URL. Given STORAGE_BASE_URL = 's3://retail-lake-ap3/', and a metadata_location will look like this
s3://retail-lake-ap3/frompg/tables/postgres/public/iceberg_transactions/<OID>/metadata/00000-xxxx.metadata.json. The METADATA_FILE_PATH would be
frompg/tables/postgres/public/iceberg_transactions/<OID>/metadata/00000-xxxx.metadata.json
We will then create the Iceberg table in Snowflake using the metadata file path for each table.

-- Transactions fact table
-- Replace METADATA_FILE_PATH with actual path from: SELECT metadata_location FROM iceberg_tables WHERE table_name = 'iceberg_transactions';
CREATE OR REPLACE ICEBERG TABLE iceberg_transactions
  EXTERNAL_VOLUME = 'retail_lake_volume'
  CATALOG = 'pg_lake_catalog'
  METADATA_FILE_PATH = 'frompg/tables/postgres/public/iceberg_transactions/<OID>/metadata/00000-xxxx.metadata.json';

-- Repeat for another tables with different iceberg table name
Enter fullscreen mode Exit fullscreen mode

At this point, the Iceberg tables created in the Postgres instance are already accessible in Snowflake. We can check this in the Database Explorer to confirm.

Iceberg tables

However, any update in the Iceberg tables will not be reflected since we don’t have the mechanism to refresh the metadata. Therefore, we will need to create a task to check for the update periodically.

-- ---- Storage integration for stages ----
CREATE OR REPLACE STORAGE INTEGRATION iceberg_stage_s3_integration
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::01234578910:role/snowflake-retail-lake-role'
  STORAGE_ALLOWED_LOCATIONS = ('s3://retail-lake-ap3/frompg/tables/');

-- Retrieve the IAM user ARN and external ID for this integration.
-- Add these to your IAM role trust policy
DESCRIBE STORAGE INTEGRATION iceberg_stage_s3_integration;
Enter fullscreen mode Exit fullscreen mode

We will need to update the IAM Policy again for the last time. This is the final look of the IAM Policy for this architecture.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": [
          "<STORAGE_AWS_IAM_USER_ARN from DESCRIBE STORAGE INTEGRATION pg_lake_s3_integration>",
          "<STORAGE_AWS_IAM_USER_ARN from DESCRIBE EXTERNAL VOLUME retail_lake_volume>",
          "<STORAGE_AWS_IAM_USER_ARN from DESCRIBE STORAGE INTEGRATION iceberg_stage_s3_integration>"
        ]
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": [
            "<STORAGE_AWS_EXTERNAL_ID from DESCRIBE STORAGE INTEGRATION pg_lake_s3_integration>",
            "<STORAGE_AWS_EXTERNAL_ID from DESCRIBE EXTERNAL VOLUME retail_lake_volume>",
            "<STORAGE_AWS_EXTERNAL_ID from DESCRIBE STORAGE INTEGRATION iceberg_stage_s3_integration>"
          ]
        }
      }
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

We will create the EXTERNAL STAGE pointing to the metadata folder in S3

-- Directory stages (one per Iceberg table's metadata directory  
CREATE OR REPLACE STAGE stg_meta_transactions
  URL = 's3://retail-lake-ap3/frompg/tables/postgres/public/iceberg_transactions/<OID>/metadata/'
  STORAGE_INTEGRATION = iceberg_stage_s3_integration
  DIRECTORY = (ENABLE = TRUE);
-- Repeat for another tables using different STAGE name
Enter fullscreen mode Exit fullscreen mode

There will be 5 stages created in total

Stages
To enable metadata changes in the stage, we can use the combination of Task, Stream and Stored Procedure in Snowflake.

-- Create stream to monitor changes
CREATE OR REPLACE STREAM stream_meta_transactions ON STAGE stg_meta_transactions;
-- repeat for another stage 

-- Create Procedure to refresh the iceberg tables
CREATE OR REPLACE PROCEDURE sp_refresh_iceberg_transactions()
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
DECLARE
  latest_path VARCHAR;
BEGIN
  SELECT RELATIVE_PATH INTO :latest_path
  FROM DIRECTORY(@stg_meta_transactions)
  WHERE RELATIVE_PATH LIKE '%.metadata.json'
  ORDER BY LAST_MODIFIED DESC
  LIMIT 1;

  EXECUTE IMMEDIATE
    'ALTER ICEBERG TABLE iceberg_transactions REFRESH '''
    || 'frompg/tables/postgres/public/iceberg_transactions/<OID>/metadata/' || latest_path || '''';
  RETURN latest_path;
END;
$$;
-- repeat for another iceberg tables

-- Create task to do this periodically
CREATE OR REPLACE TASK task_refresh_stage_transactions
  WAREHOUSE = COMPUTE_WH
  SCHEDULE  = '2 MINUTES'
AS
  ALTER STAGE stg_meta_transactions REFRESH;

CREATE OR REPLACE TASK task_refresh_iceberg_transactions
  WAREHOUSE = COMPUTE_WH
  AFTER task_refresh_stage_transactions
  WHEN SYSTEM$STREAM_HAS_DATA('stream_meta_transactions')
AS
  CALL sp_refresh_iceberg_transactions();
-- create the same task pattern for other tables

-- Start the task
ALTER TASK task_refresh_iceberg_transactions RESUME;
ALTER TASK task_refresh_stage_transactions RESUME;
-- Stop the task
-- ALTER TASK task_refresh_iceberg_transactions SUSPEND;
-- ALTER TASK task_refresh_stage_transactions SUSPEND;

-- We can also trigger this manually
ALTER STAGE stg_meta_transactions REFRESH;
CALL sp_refresh_iceberg_transactions();
Enter fullscreen mode Exit fullscreen mode

All of these will be available via the Database Explorer

Task and Procedure

Simulate the Transaction

Feel free to use or modify the sample python code below.

import argparse
import random
import time
import sys
from datetime import datetime

import psycopg2
from psycopg2.extras import execute_values


# Retail simulation constants

PAYMENT_METHODS = ["CARD", "CASH", "DIGITAL_WALLET", "GIFT_CARD"]
ITEMS_PER_TXN_RANGE = (1, 5)  # each transaction has 1-5 line items


def get_reference_data(conn):
    """Load store, product, and customer IDs from the database."""
    with conn.cursor() as cur:
        cur.execute("SELECT store_id FROM stores")
        store_ids = [r[0] for r in cur.fetchall()]

        cur.execute("SELECT product_id, unit_price FROM products")
        products = [(r[0], float(r[1])) for r in cur.fetchall()]

        cur.execute("SELECT customer_id FROM customers")
        customer_ids = [r[0] for r in cur.fetchall()]

    return store_ids, products, customer_ids


def generate_transaction(store_ids, products, customer_ids):
    """Generate a single random retail transaction with line items."""
    store_id = random.choice(store_ids)
    # 90% of transactions have a known customer, 10% are anonymous
    customer_id = random.choice(customer_ids) if random.random() < 0.9 else None
    payment = random.choice(PAYMENT_METHODS)
    num_items = random.randint(*ITEMS_PER_TXN_RANGE)

    items = []
    total = 0.0
    selected_products = random.sample(products, min(num_items, len(products)))

    for product_id, unit_price in selected_products:
        qty = random.randint(1, 3)
        line_total = round(unit_price * qty, 2)
        total += line_total
        items.append((product_id, qty, unit_price, line_total))

    return store_id, customer_id, round(total, 2), payment, items


def insert_batch(conn, batch):
    """Insert a batch of transactions and their line items using bulk operations."""
    with conn.cursor() as cur:
        # Bulk-insert all transactions in one round-trip
        txn_rows = [
            (store_id, customer_id, total, payment)
            for store_id, customer_id, total, payment, _items in batch
        ]
        result = execute_values(
            cur,
            """
            INSERT INTO transactions (store_id, customer_id, total_amount, payment_method)
            VALUES %s
            RETURNING txn_id
            """,
            txn_rows,
            fetch=True,
        )
        txn_ids = [r[0] for r in result]

        # Collect all line items across every transaction
        all_items = []
        for txn_id, (_sid, _cid, _tot, _pay, items) in zip(txn_ids, batch):
            for pid, qty, up, lt in items:
                all_items.append((txn_id, pid, qty, up, lt))

        # Bulk-insert all line items in one round-trip
        if all_items:
            execute_values(
                cur,
                """
                INSERT INTO transaction_items (txn_id, product_id, quantity, unit_price, line_total)
                VALUES %s
                """,
                all_items,
            )
    conn.commit()


def trigger_sync(conn):
    """Manually trigger the Iceberg sync (CTE chains INSERT then UPDATE atomically)."""
    with conn.cursor() as cur:
        cur.execute(
            "WITH synced AS ("
            "INSERT INTO iceberg_transactions "
            "SELECT txn_id, store_id, customer_id, txn_timestamp, "
            "total_amount, payment_method, now() "
            "FROM transactions WHERE synced_at IS NULL "
            "RETURNING txn_id) "
            "UPDATE transactions SET synced_at = now() "
            "WHERE txn_id IN (SELECT txn_id FROM synced)"
        )
        cur.execute(
            "WITH synced AS ("
            "INSERT INTO iceberg_transaction_items "
            "SELECT item_id, txn_id, product_id, quantity, "
            "unit_price, line_total, now() "
            "FROM transaction_items WHERE synced_at IS NULL "
            "RETURNING item_id) "
            "UPDATE transaction_items SET synced_at = now() "
            "WHERE item_id IN (SELECT item_id FROM synced)"
        )
    conn.commit()


def print_stats(conn):
    """Print current counts from both heap and Iceberg tables."""
    with conn.cursor() as cur:
        cur.execute("SELECT COUNT(*) FROM transactions")
        heap_txn = cur.fetchone()[0]
        cur.execute("SELECT COUNT(*) FROM transactions WHERE synced_at IS NOT NULL")
        synced_txn = cur.fetchone()[0]
        cur.execute("SELECT COUNT(*) FROM iceberg_transactions")
        ice_txn = cur.fetchone()[0]
        cur.execute("SELECT COUNT(*) FROM transaction_items")
        heap_items = cur.fetchone()[0]
        cur.execute("SELECT COUNT(*) FROM iceberg_transaction_items")
        ice_items = cur.fetchone()[0]

    print(f"  Heap transactions: {heap_txn} (synced: {synced_txn})")
    print(f"  Iceberg transactions: {ice_txn}")
    print(f"  Heap line items: {heap_items}")
    print(f"  Iceberg line items: {ice_items}")


def main():
    parser = argparse.ArgumentParser(description="Transaction simulator")
    parser.add_argument("--host", required=True, help="Postgres host ")
    parser.add_argument("--port", type=int, default=5432)
    parser.add_argument("--dbname", default="postgres")
    parser.add_argument("--user", default="snowflake_admin")
    parser.add_argument("--password", required=True, help="Postgres admin password")
    parser.add_argument(
        "--transactions",
        type=int,
        default=5000,
        help="Total number of transactions to generate",
    )
    parser.add_argument(
        "--batch-size",
        type=int,
        default=500,
        help="Number of transactions per commit batch",
    )
    parser.add_argument(
        "--sync-every",
        type=int,
        default=500,
        help="Trigger manual Iceberg sync after this many transactions (0 = rely on pg_cron only)",
    )
    args = parser.parse_args()

    conn_params = dict(
        host=args.host,
        port=args.port,
        dbname=args.dbname,
        user=args.user,
        password=args.password,
        sslmode="require",
    )

    conn = psycopg2.connect(**conn_params)
    conn.autocommit = False

    print(f"Connected to {args.host}:{args.port}/{args.dbname}")
    store_ids, products, customer_ids = get_reference_data(conn)
    print(
        f"Loaded reference data: {len(store_ids)} stores, "
        f"{len(products)} products, {len(customer_ids)} customers"
    )

    total_generated = 0
    start_time = time.time()

    print(f"\nGenerating {args.transactions} transactions (batch size: {args.batch_size})...\n")

    while total_generated < args.transactions:
        batch_count = min(args.batch_size, args.transactions - total_generated)
        batch = [
            generate_transaction(store_ids, products, customer_ids)
            for _ in range(batch_count)
        ]

        insert_batch(conn, batch)
        total_generated += batch_count

        elapsed = time.time() - start_time
        tps = total_generated / elapsed if elapsed > 0 else 0
        print(
            f"  [{datetime.now().strftime('%H:%M:%S')}] "
            f"{total_generated}/{args.transactions} transactions "
            f"({tps:.0f} txn/sec)"
        )

        # Trigger manual sync at intervals
        if args.sync_every > 0 and total_generated % args.sync_every == 0:
            print(f"  >> Triggering Iceberg sync at {total_generated} transactions...")
            sync_start = time.time()
            trigger_sync(conn)
            print(f"  >> Sync completed in {time.time() - sync_start:.2f}s")

    # Final sync
    print("\nTriggering final Iceberg sync...")
    sync_start = time.time()
    trigger_sync(conn)
    print(f"Final sync completed in {time.time() - sync_start:.2f}s")

    elapsed = time.time() - start_time
    print(f"\n{'='*60}")
    print(f"Simulation complete!")
    print(f"  Total transactions: {total_generated}")
    print(f"  Elapsed time: {elapsed:.1f}s")
    print(f"  Average throughput: {total_generated / elapsed:.0f} txn/sec")
    print(f"{'='*60}")

    print("\nFinal counts:")
    print_stats(conn)

    print("\nIceberg metadata locations:")
    with conn.cursor() as cur:
        cur.execute("SELECT table_name, metadata_location FROM iceberg_tables")
        for row in cur.fetchall():
            print(f"  {row[0]}: {row[1]}")

    conn.close()
    print("\nDone. Data is now available on both Postgres and Snowflake via shared Iceberg tables.")


if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

The script above will do the following 2 tasks

  1. Insert transaction data to Postgres This is the typical INSERT INTO operation into your Postgres database.
  2. Trigger periodic data sync to write into Iceberg tables We will need to run a periodic INSERT INTO for the Iceberg tables. Since this table is using columnar-format like parquet, it is much more efficient to do bulk insert rather than individual insert like we do for OLTP. I did a simulation to do bulk insert into Iceberg tables for every 500-1000 rows in Postgres and it can write in under 2 seconds consistently.

Trigger metadata and Iceberg table refresh

At this point, Snowflake does not know if the data is recently updated. That's why we need to get the latest metadata information along with the latest Iceberg table information. This is covered in the latest SQL script.

Once the sync is complete, we can run some queries to check if the data is flowing as expected.

-- 1. Check row counts match Postgres Iceberg tables
SELECT 'sf_transactions' AS source, COUNT(*) AS row_count FROM iceberg_transactions
UNION ALL
SELECT 'sf_items',        COUNT(*) FROM iceberg_transaction_items
UNION ALL
SELECT 'sf_stores',       COUNT(*) FROM iceberg_stores
UNION ALL
SELECT 'sf_products',     COUNT(*) FROM iceberg_products
UNION ALL
SELECT 'sf_customers',    COUNT(*) FROM iceberg_customers;

-- 2. Revenue by store (proves joins work across Iceberg tables)
SELECT
    s.store_name,
    COUNT(DISTINCT t.txn_id)  AS txn_count,
    SUM(t.total_amount)       AS revenue
FROM iceberg_transactions t
JOIN iceberg_stores s ON t.store_id = s.store_id
GROUP BY s.store_name
ORDER BY revenue DESC;
Enter fullscreen mode Exit fullscreen mode

Closing

Combining Snowflake Postgres with S3-compatible bucket truly unlocks the lakehouse architecture. Snowflake separates the billing between Snowflake Postgres and the Virtual Warehouse used for analytics. You can check in more details here. We can improve this further by having a data catalogue tool so Snowflake can activate the AUTO_REFRESH attributes in the stage. Hope this helps and feel free to reach out if you have any questions!

Top comments (0)