Most scraping tutorials end at "save to CSV". Real data work requires a proper pipeline: deduplicate, validate, store with schema, enable fast queries, and refresh incrementally.
Here is the pipeline I run to scrape 8 data sources into a queryable warehouse for $12/month total.
Architecture Overview
Scraper → Staging (SQLite) → Transform (dbt) → Warehouse (DuckDB) → Query (Metabase)
Cost breakdown:
- Scraping proxies: ~$5/month
- VPS (2GB): $6/month
- Everything else: free (open source)
Stage 1: Scraper → Staging
Every scraper writes raw data to a SQLite staging database. Raw HTML is not stored — only parsed fields. This is the "bronze" layer.
import sqlite3
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import Optional
@dataclass
class ProductListing:
source: str # Which site scraped from
external_id: str # ID on the source site
title: str
price: Optional[float]
currency: str
url: str
scraped_at: datetime
raw_price_text: str # Keep original for debugging
def write_to_staging(db_path: str, items: list[ProductListing]):
conn = sqlite3.connect(db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS listings_staging (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL,
external_id TEXT NOT NULL,
title TEXT,
price REAL,
currency TEXT,
url TEXT,
scraped_at TEXT,
raw_price_text TEXT,
processed BOOLEAN DEFAULT FALSE,
UNIQUE(source, external_id, date(scraped_at)) -- One per day
)
""")
conn.executemany("""
INSERT OR IGNORE INTO listings_staging
(source, external_id, title, price, currency, url, scraped_at, raw_price_text)
VALUES (:source, :external_id, :title, :price, :currency, :url, :scraped_at, :raw_price_text)
""", [asdict(item) for item in items])
conn.commit()
conn.close()
Stage 2: Transform With dbt (Sort Of)
I use a lightweight version of dbt's philosophy without the full framework overhead. Python scripts that transform staging → clean tables with:
- Deduplication: keep latest record per (source, external_id) per day
- Normalisation: standardise currency to EUR, clean price strings
- Validation: reject nulls in required fields, flag outlier prices
- Enrichment: add computed columns (price_change_pct, days_listed)
def transform_listings(staging_db: str, warehouse_db: str):
"""Transform staging records into clean warehouse records."""
# Connect to both databases
staging = sqlite3.connect(staging_db)
warehouse = duckdb.connect(warehouse_db)
# Read unprocessed staging records
new_records = staging.execute("""
SELECT * FROM listings_staging WHERE processed = FALSE
""").fetchall()
# Transform
clean_records = []
for record in new_records:
try:
clean = {
'source': record['source'],
'external_id': record['external_id'],
'title': record['title'].strip()[:500], # Truncate
'price_eur': convert_to_eur(
record['price'], record['currency']
),
'url': record['url'],
'scraped_date': record['scraped_at'][:10],
}
# Validate
if not clean['title'] or clean['price_eur'] is None:
continue
if clean['price_eur'] <= 0 or clean['price_eur'] > 100_000:
continue
clean_records.append(clean)
except Exception:
continue # Skip malformed records
# Write to DuckDB warehouse
warehouse.executemany("""
INSERT OR REPLACE INTO listings VALUES (?, ?, ?, ?, ?, ?)
""", [list(r.values()) for r in clean_records])
# Mark processed
staging.execute("UPDATE listings_staging SET processed = TRUE WHERE processed = FALSE")
staging.commit()
Stage 3: DuckDB as Warehouse
DuckDB is the right choice here:
- Zero infrastructure (single file, like SQLite)
- Columnar storage — fast aggregation queries
- SQL-compatible — no new query language
- Free
-- Price trend query: 30-day average by source
SELECT
source,
title,
DATE_TRUNC('week', scraped_date) as week,
AVG(price_eur) as avg_price,
MIN(price_eur) as min_price,
MAX(price_eur) as max_price,
COUNT(*) as observations
FROM listings
WHERE scraped_date >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY 1, 2, 3
ORDER BY source, title, week;
-- Price drop alerts
SELECT
source,
title,
url,
yesterday_price,
today_price,
ROUND((today_price - yesterday_price) / yesterday_price * 100, 1) as pct_change
FROM (
SELECT
source, title, url,
price_eur as today_price,
LAG(price_eur) OVER (PARTITION BY source, external_id ORDER BY scraped_date) as yesterday_price
FROM listings
)
WHERE yesterday_price IS NOT NULL
AND today_price < yesterday_price * 0.9 -- 10%+ drop
AND scraped_date = CURRENT_DATE;
Stage 4: Visualisation
I run Metabase Community Edition (free, Docker) connected to the DuckDB file. Dashboards for:
- Price trends by category over 90 days
- Daily scrape success rate by source
- New listings vs removed listings per day
- Price drop alerts (refreshes every 4 hours)
Alternative: just query DuckDB directly from a Jupyter notebook. Zero additional infrastructure.
Incremental Refresh Logic
The key to not re-scraping everything every day:
def get_urls_to_refresh(warehouse_db: str, max_age_hours: int = 20) -> list[str]:
"""Return URLs not scraped in the last N hours."""
wh = duckdb.connect(warehouse_db)
stale = wh.execute(f"""
SELECT DISTINCT url
FROM listings
WHERE scraped_date < CURRENT_DATE
OR scraped_date IS NULL
LIMIT 1000
""").fetchall()
return [r[0] for r in stale]
Cost Breakdown
| Component | Cost |
|---|---|
| VPS 2GB (Hetzner) | €4.15/month |
| Residential proxies | ~$5/month |
| DuckDB | Free |
| SQLite | Free |
| Metabase Community | Free |
| Total | ~$9-12/month |
For comparison: a managed data pipeline (Fivetran + Snowflake) for this workload costs $200-500/month.
Ready-to-Use Scrapers
The scrapers that feed this pipeline are available pre-built:
Data Pipeline Scraper Bundle — €29
Includes scrapers pre-configured to output the staging schema above, deduplication logic, and the DuckDB warehouse setup guide.
Running a scraping pipeline at scale? What is your storage and query layer? Curious what the community uses.
Top comments (0)