Most price monitoring setups are one-off scripts that run on a schedule and dump data into a spreadsheet. That works until it doesn't — when you need to catch a price drop within an hour, track trends across thousands of SKUs, or alert a sales team when a competitor is running a promotion.
Here's the architecture of a production price intelligence pipeline — from data collection to alert delivery.
The Three Tiers of Price Intelligence
Tier 1 — Ad hoc: A script running in a cron job, results in a CSV. Works for <50 SKUs, monthly monitoring. Falls apart at any scale.
Tier 2 — Scheduled pipeline: Database-backed, scheduled collection, dashboards. Works for 50-5,000 SKUs with daily granularity. This is what most teams actually need.
Tier 3 — Real-time streaming: Change detection in near-real-time, event-driven alerts. Necessary for high-volatility markets (consumer electronics, fast fashion, commodity repricing).
This guide covers Tier 2 — the right level for most teams — with a path to Tier 3 if you need it.
Architecture Overview
[Scraper Workers] → [Raw Data Queue (Redis)] → [Processor] → [TimescaleDB]
↓
[Alert Engine]
↓
[Slack / Email / Webhook]
Components:
- Scraper workers: async Python processes collecting price data
- Redis queue: decouple collection from processing, handle retries
- TimescaleDB: PostgreSQL extension optimized for time-series data
- Alert engine: rule-based notifications for price changes
Setting Up TimescaleDB
TimescaleDB is PostgreSQL with time-series optimizations — automatic partitioning by time, compression, continuous aggregates. It's free, and it's the right database for this use case.
-- Create hypertable (auto-partitioned by time)
CREATE TABLE price_observations (
time TIMESTAMPTZ NOT NULL,
sku TEXT NOT NULL,
competitor TEXT NOT NULL,
price DECIMAL(10,2) NOT NULL,
currency CHAR(3) DEFAULT 'USD',
availability TEXT,
scraper_version TEXT,
proxy_country CHAR(2)
);
SELECT create_hypertable('price_observations', 'time');
-- Index for fast competitor + SKU queries
CREATE INDEX ON price_observations (competitor, sku, time DESC);
-- Continuous aggregate: hourly min/max per SKU
CREATE MATERIALIZED VIEW hourly_prices
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS hour,
sku,
competitor,
MIN(price) AS low,
MAX(price) AS high,
AVG(price) AS avg,
LAST(price, time) AS latest
FROM price_observations
GROUP BY hour, sku, competitor;
The Scraper Worker
import asyncio
import httpx
import redis.asyncio as redis
import json
from datetime import datetime, timezone
from selectolax.parser import HTMLParser # faster than BeautifulSoup
class PriceWorker:
def __init__(self, redis_url: str, db_pool):
self.redis = redis.from_url(redis_url)
self.db = db_pool
self.queue_name = "price_jobs"
self.failed_queue = "price_jobs_failed"
async def run(self):
async with httpx.AsyncClient(
follow_redirects=True,
timeout=15,
headers={"User-Agent": "Mozilla/5.0 (compatible; PriceBot/1.0)"}
) as client:
while True:
job = await self.redis.blpop(self.queue_name, timeout=30)
if not job:
continue
_, job_data = job
task = json.loads(job_data)
try:
result = await self.scrape_price(client, task)
await self.store(result)
except Exception as e:
await self.redis.rpush(self.failed_queue, json.dumps({
**task,
"error": str(e),
"failed_at": datetime.now(timezone.utc).isoformat()
}))
async def scrape_price(self, client, task: dict) -> dict:
response = await client.get(task["url"])
response.raise_for_status()
tree = HTMLParser(response.text)
# Adapt selectors per competitor
price_text = tree.css_first(task.get("price_selector", ".price")).text()
price = float(price_text.replace("$", "").replace(",", "").strip())
avail = tree.css_first(task.get("availability_selector", ".availability"))
availability = avail.text().strip() if avail else "unknown"
return {
"time": datetime.now(timezone.utc),
"sku": task["sku"],
"competitor": task["competitor"],
"price": price,
"availability": availability,
"url": task["url"]
}
async def store(self, result: dict):
await self.db.execute("""
INSERT INTO price_observations
(time, sku, competitor, price, availability)
VALUES ($1, $2, $3, $4, $5)
""", result["time"], result["sku"], result["competitor"],
result["price"], result["availability"])
Scheduling Jobs
Instead of a fixed cron interval, adaptive scheduling based on price volatility:
class AdaptiveScheduler:
"""Schedule more frequent checks for volatile SKUs."""
BASE_INTERVAL = 3600 # 1 hour default
MIN_INTERVAL = 900 # 15 min minimum
MAX_INTERVAL = 86400 # 24 hour maximum
async def get_interval(self, sku: str, competitor: str) -> int:
# Get price change frequency from last 7 days
result = await self.db.fetchrow("""
SELECT COUNT(*) as changes
FROM (
SELECT time, price,
LAG(price) OVER (ORDER BY time) as prev_price
FROM price_observations
WHERE sku=$1 AND competitor=$2
AND time > NOW() - INTERVAL '7 days'
) t
WHERE price != prev_price
""", sku, competitor)
changes_per_week = result["changes"] or 0
if changes_per_week > 20:
return self.MIN_INTERVAL # Highly volatile: check every 15 min
elif changes_per_week > 5:
return 1800 # Moderately volatile: every 30 min
else:
return self.BASE_INTERVAL # Stable: every hour
async def enqueue_jobs(self, redis_client):
skus = await self.get_active_skus()
for sku in skus:
interval = await self.get_interval(sku["sku"], sku["competitor"])
job = {
"sku": sku["sku"],
"competitor": sku["competitor"],
"url": sku["url"],
"price_selector": sku["price_selector"],
"scheduled_at": datetime.now(timezone.utc).isoformat()
}
# Only enqueue if not already queued
key = f"scheduled:{sku['competitor']}:{sku['sku']}"
if await redis_client.set(key, "1", nx=True, ex=interval):
await redis_client.rpush("price_jobs", json.dumps(job))
The Alert Engine
class PriceAlertEngine:
ALERT_RULES = [
{
"name": "price_drop_5pct",
"condition": "price_change_pct < -5",
"cooldown": 3600, # Don't re-alert for 1 hour
"channels": ["slack", "email"]
},
{
"name": "below_our_price",
"condition": "competitor_price < our_price * 0.95",
"cooldown": 1800,
"channels": ["slack"]
},
{
"name": "went_out_of_stock",
"condition": "availability = 'out_of_stock'",
"cooldown": 86400,
"channels": ["email"]
}
]
async def check_alerts(self, sku: str, new_price: float, competitor: str):
# Get previous price
prev = await self.db.fetchrow("""
SELECT price FROM price_observations
WHERE sku=$1 AND competitor=$2
ORDER BY time DESC LIMIT 1 OFFSET 1
""", sku, competitor)
if not prev:
return
prev_price = float(prev["price"])
change_pct = (new_price - prev_price) / prev_price * 100
for rule in self.ALERT_RULES:
if self.evaluate_rule(rule["condition"], {
"price_change_pct": change_pct,
"competitor_price": new_price,
"our_price": await self.get_our_price(sku)
}):
await self.fire_alert(sku, competitor, rule, {
"prev_price": prev_price,
"new_price": new_price,
"change_pct": change_pct
})
async def fire_alert(self, sku, competitor, rule, data):
# Check cooldown
cooldown_key = f"alert_cooldown:{rule['name']}:{sku}:{competitor}"
if await self.redis.get(cooldown_key):
return
message = f"⚠️ {rule['name']}: {competitor} | {sku}\n"
message += f"Price: ${data['prev_price']:.2f} → ${data['new_price']:.2f} ({data['change_pct']:+.1f}%)"
for channel in rule["channels"]:
await self.send_notification(channel, message)
await self.redis.set(cooldown_key, "1", ex=rule["cooldown"])
Querying the Time-Series Data
-- Price trend for a specific SKU over the last 30 days
SELECT hour, competitor, latest
FROM hourly_prices
WHERE sku = 'PRODUCT-123'
AND hour > NOW() - INTERVAL '30 days'
ORDER BY hour, competitor;
-- Who has the lowest price right now?
SELECT competitor,
LAST(price, time) as current_price,
MAX(time) as last_updated
FROM price_observations
WHERE sku = 'PRODUCT-123'
AND time > NOW() - INTERVAL '2 hours'
GROUP BY competitor
ORDER BY current_price;
-- Correlation: does Competitor A change price after Competitor B?
SELECT
b.time as b_change_time,
a.time as a_change_time,
EXTRACT(EPOCH FROM (a.time - b.time))/3600 as hours_lag
FROM price_observations a
JOIN price_observations b ON a.sku = b.sku
WHERE a.competitor = 'CompetitorA'
AND b.competitor = 'CompetitorB'
ORDER BY hours_lag;
Deploying With Docker Compose
services:
timescaledb:
image: timescale/timescaledb:latest-pg16
environment:
POSTGRES_DB: prices
POSTGRES_USER: analyst
POSTGRES_PASSWORD: ${DB_PASSWORD}
volumes:
- timescale_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
scraper:
build: ./scraper
environment:
- REDIS_URL=redis://redis:6379
- DATABASE_URL=postgresql://analyst:${DB_PASSWORD}@timescaledb/prices
replicas: 3 # Run 3 scraper workers in parallel
depends_on:
- redis
- timescaledb
scheduler:
build: ./scheduler
environment:
- REDIS_URL=redis://redis:6379
- DATABASE_URL=postgresql://analyst:${DB_PASSWORD}@timescaledb/prices
depends_on:
- redis
- timescaledb
volumes:
timescale_data:
Scale scraper workers up/down with docker compose up --scale scraper=N.
What This Gets You
- Price history for any SKU with millisecond precision
- Adaptive collection frequency based on actual volatility
- Alert delivery within 15 minutes of a qualifying price change
- SQL query access to all historical data via TimescaleDB
- Horizontal scaling by adding scraper worker replicas
The total infrastructure cost: about $15/month on a basic VPS. The competitive advantage of knowing competitor pricing within an hour instead of the next day is harder to put a number on.
Pre-Built Price Scrapers
If you want the data collection layer without building from scratch, I maintain Apify actors for Amazon, eBay, Google Shopping, and major e-commerce platforms.
Apify Scrapers Bundle — €29 — includes the e-commerce monitoring suite with price tracking setup guide.
Top comments (0)