DEV Community

Vhub Systems
Vhub Systems

Posted on

The Web Scraping Data Warehouse Pipeline I Run for $12/Month

Most scraping tutorials end at "save to CSV". Real data work requires a proper pipeline: deduplicate, validate, store with schema, enable fast queries, and refresh incrementally.

Here is the pipeline I run to scrape 8 data sources into a queryable warehouse for $12/month total.

Architecture Overview

Scraper → Staging (SQLite) → Transform (dbt) → Warehouse (DuckDB) → Query (Metabase)
Enter fullscreen mode Exit fullscreen mode

Cost breakdown:

  • Scraping proxies: ~$5/month
  • VPS (2GB): $6/month
  • Everything else: free (open source)

Stage 1: Scraper → Staging

Every scraper writes raw data to a SQLite staging database. Raw HTML is not stored — only parsed fields. This is the "bronze" layer.

import sqlite3
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import Optional

@dataclass
class ProductListing:
    source: str          # Which site scraped from
    external_id: str     # ID on the source site
    title: str
    price: Optional[float]
    currency: str
    url: str
    scraped_at: datetime
    raw_price_text: str  # Keep original for debugging

def write_to_staging(db_path: str, items: list[ProductListing]):
    conn = sqlite3.connect(db_path)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS listings_staging (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            source TEXT NOT NULL,
            external_id TEXT NOT NULL,
            title TEXT,
            price REAL,
            currency TEXT,
            url TEXT,
            scraped_at TEXT,
            raw_price_text TEXT,
            processed BOOLEAN DEFAULT FALSE,
            UNIQUE(source, external_id, date(scraped_at))  -- One per day
        )
    """)

    conn.executemany("""
        INSERT OR IGNORE INTO listings_staging
        (source, external_id, title, price, currency, url, scraped_at, raw_price_text)
        VALUES (:source, :external_id, :title, :price, :currency, :url, :scraped_at, :raw_price_text)
    """, [asdict(item) for item in items])

    conn.commit()
    conn.close()
Enter fullscreen mode Exit fullscreen mode

Stage 2: Transform With dbt (Sort Of)

I use a lightweight version of dbt's philosophy without the full framework overhead. Python scripts that transform staging → clean tables with:

  • Deduplication: keep latest record per (source, external_id) per day
  • Normalisation: standardise currency to EUR, clean price strings
  • Validation: reject nulls in required fields, flag outlier prices
  • Enrichment: add computed columns (price_change_pct, days_listed)
def transform_listings(staging_db: str, warehouse_db: str):
    """Transform staging records into clean warehouse records."""

    # Connect to both databases
    staging = sqlite3.connect(staging_db)
    warehouse = duckdb.connect(warehouse_db)

    # Read unprocessed staging records
    new_records = staging.execute("""
        SELECT * FROM listings_staging WHERE processed = FALSE
    """).fetchall()

    # Transform
    clean_records = []
    for record in new_records:
        try:
            clean = {
                'source': record['source'],
                'external_id': record['external_id'],
                'title': record['title'].strip()[:500],  # Truncate
                'price_eur': convert_to_eur(
                    record['price'], record['currency']
                ),
                'url': record['url'],
                'scraped_date': record['scraped_at'][:10],
            }

            # Validate
            if not clean['title'] or clean['price_eur'] is None:
                continue
            if clean['price_eur'] <= 0 or clean['price_eur'] > 100_000:
                continue

            clean_records.append(clean)
        except Exception:
            continue  # Skip malformed records

    # Write to DuckDB warehouse
    warehouse.executemany("""
        INSERT OR REPLACE INTO listings VALUES (?, ?, ?, ?, ?, ?)
    """, [list(r.values()) for r in clean_records])

    # Mark processed
    staging.execute("UPDATE listings_staging SET processed = TRUE WHERE processed = FALSE")
    staging.commit()
Enter fullscreen mode Exit fullscreen mode

Stage 3: DuckDB as Warehouse

DuckDB is the right choice here:

  • Zero infrastructure (single file, like SQLite)
  • Columnar storage — fast aggregation queries
  • SQL-compatible — no new query language
  • Free
-- Price trend query: 30-day average by source
SELECT 
    source,
    title,
    DATE_TRUNC('week', scraped_date) as week,
    AVG(price_eur) as avg_price,
    MIN(price_eur) as min_price,
    MAX(price_eur) as max_price,
    COUNT(*) as observations
FROM listings
WHERE scraped_date >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY 1, 2, 3
ORDER BY source, title, week;

-- Price drop alerts
SELECT 
    source,
    title,
    url,
    yesterday_price,
    today_price,
    ROUND((today_price - yesterday_price) / yesterday_price * 100, 1) as pct_change
FROM (
    SELECT 
        source, title, url,
        price_eur as today_price,
        LAG(price_eur) OVER (PARTITION BY source, external_id ORDER BY scraped_date) as yesterday_price
    FROM listings
)
WHERE yesterday_price IS NOT NULL
  AND today_price < yesterday_price * 0.9  -- 10%+ drop
  AND scraped_date = CURRENT_DATE;
Enter fullscreen mode Exit fullscreen mode

Stage 4: Visualisation

I run Metabase Community Edition (free, Docker) connected to the DuckDB file. Dashboards for:

  • Price trends by category over 90 days
  • Daily scrape success rate by source
  • New listings vs removed listings per day
  • Price drop alerts (refreshes every 4 hours)

Alternative: just query DuckDB directly from a Jupyter notebook. Zero additional infrastructure.

Incremental Refresh Logic

The key to not re-scraping everything every day:

def get_urls_to_refresh(warehouse_db: str, max_age_hours: int = 20) -> list[str]:
    """Return URLs not scraped in the last N hours."""
    wh = duckdb.connect(warehouse_db)

    stale = wh.execute(f"""
        SELECT DISTINCT url
        FROM listings
        WHERE scraped_date < CURRENT_DATE
           OR scraped_date IS NULL
        LIMIT 1000
    """).fetchall()

    return [r[0] for r in stale]
Enter fullscreen mode Exit fullscreen mode

Cost Breakdown

Component Cost
VPS 2GB (Hetzner) €4.15/month
Residential proxies ~$5/month
DuckDB Free
SQLite Free
Metabase Community Free
Total ~$9-12/month

For comparison: a managed data pipeline (Fivetran + Snowflake) for this workload costs $200-500/month.

Ready-to-Use Scrapers

The scrapers that feed this pipeline are available pre-built:

Data Pipeline Scraper Bundle — €29

Includes scrapers pre-configured to output the staging schema above, deduplication logic, and the DuckDB warehouse setup guide.


Running a scraping pipeline at scale? What is your storage and query layer? Curious what the community uses.

Top comments (0)