DEV Community

Vhub Systems
Vhub Systems

Posted on

Building a Real-Time Competitor Price Intelligence Pipeline: The Complete Stack

Most price monitoring setups are one-off scripts that run on a schedule and dump data into a spreadsheet. That works until it doesn't — when you need to catch a price drop within an hour, track trends across thousands of SKUs, or alert a sales team when a competitor is running a promotion.

Here's the architecture of a production price intelligence pipeline — from data collection to alert delivery.

The Three Tiers of Price Intelligence

Tier 1 — Ad hoc: A script running in a cron job, results in a CSV. Works for <50 SKUs, monthly monitoring. Falls apart at any scale.

Tier 2 — Scheduled pipeline: Database-backed, scheduled collection, dashboards. Works for 50-5,000 SKUs with daily granularity. This is what most teams actually need.

Tier 3 — Real-time streaming: Change detection in near-real-time, event-driven alerts. Necessary for high-volatility markets (consumer electronics, fast fashion, commodity repricing).

This guide covers Tier 2 — the right level for most teams — with a path to Tier 3 if you need it.

Architecture Overview

[Scraper Workers] → [Raw Data Queue (Redis)] → [Processor] → [TimescaleDB]
                                                                     ↓
                                                           [Alert Engine]
                                                                     ↓
                                                    [Slack / Email / Webhook]
Enter fullscreen mode Exit fullscreen mode

Components:

  • Scraper workers: async Python processes collecting price data
  • Redis queue: decouple collection from processing, handle retries
  • TimescaleDB: PostgreSQL extension optimized for time-series data
  • Alert engine: rule-based notifications for price changes

Setting Up TimescaleDB

TimescaleDB is PostgreSQL with time-series optimizations — automatic partitioning by time, compression, continuous aggregates. It's free, and it's the right database for this use case.

-- Create hypertable (auto-partitioned by time)
CREATE TABLE price_observations (
    time TIMESTAMPTZ NOT NULL,
    sku TEXT NOT NULL,
    competitor TEXT NOT NULL,
    price DECIMAL(10,2) NOT NULL,
    currency CHAR(3) DEFAULT 'USD',
    availability TEXT,
    scraper_version TEXT,
    proxy_country CHAR(2)
);

SELECT create_hypertable('price_observations', 'time');

-- Index for fast competitor + SKU queries
CREATE INDEX ON price_observations (competitor, sku, time DESC);

-- Continuous aggregate: hourly min/max per SKU
CREATE MATERIALIZED VIEW hourly_prices
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 hour', time) AS hour,
    sku,
    competitor,
    MIN(price) AS low,
    MAX(price) AS high,
    AVG(price) AS avg,
    LAST(price, time) AS latest
FROM price_observations
GROUP BY hour, sku, competitor;
Enter fullscreen mode Exit fullscreen mode

The Scraper Worker

import asyncio
import httpx
import redis.asyncio as redis
import json
from datetime import datetime, timezone
from selectolax.parser import HTMLParser  # faster than BeautifulSoup

class PriceWorker:
    def __init__(self, redis_url: str, db_pool):
        self.redis = redis.from_url(redis_url)
        self.db = db_pool
        self.queue_name = "price_jobs"
        self.failed_queue = "price_jobs_failed"

    async def run(self):
        async with httpx.AsyncClient(
            follow_redirects=True,
            timeout=15,
            headers={"User-Agent": "Mozilla/5.0 (compatible; PriceBot/1.0)"}
        ) as client:
            while True:
                job = await self.redis.blpop(self.queue_name, timeout=30)
                if not job:
                    continue

                _, job_data = job
                task = json.loads(job_data)

                try:
                    result = await self.scrape_price(client, task)
                    await self.store(result)
                except Exception as e:
                    await self.redis.rpush(self.failed_queue, json.dumps({
                        **task,
                        "error": str(e),
                        "failed_at": datetime.now(timezone.utc).isoformat()
                    }))

    async def scrape_price(self, client, task: dict) -> dict:
        response = await client.get(task["url"])
        response.raise_for_status()

        tree = HTMLParser(response.text)

        # Adapt selectors per competitor
        price_text = tree.css_first(task.get("price_selector", ".price")).text()
        price = float(price_text.replace("$", "").replace(",", "").strip())

        avail = tree.css_first(task.get("availability_selector", ".availability"))
        availability = avail.text().strip() if avail else "unknown"

        return {
            "time": datetime.now(timezone.utc),
            "sku": task["sku"],
            "competitor": task["competitor"],
            "price": price,
            "availability": availability,
            "url": task["url"]
        }

    async def store(self, result: dict):
        await self.db.execute("""
            INSERT INTO price_observations 
            (time, sku, competitor, price, availability)
            VALUES ($1, $2, $3, $4, $5)
        """, result["time"], result["sku"], result["competitor"],
             result["price"], result["availability"])
Enter fullscreen mode Exit fullscreen mode

Scheduling Jobs

Instead of a fixed cron interval, adaptive scheduling based on price volatility:

class AdaptiveScheduler:
    """Schedule more frequent checks for volatile SKUs."""

    BASE_INTERVAL = 3600  # 1 hour default
    MIN_INTERVAL = 900    # 15 min minimum
    MAX_INTERVAL = 86400  # 24 hour maximum

    async def get_interval(self, sku: str, competitor: str) -> int:
        # Get price change frequency from last 7 days
        result = await self.db.fetchrow("""
            SELECT COUNT(*) as changes
            FROM (
                SELECT time, price,
                       LAG(price) OVER (ORDER BY time) as prev_price
                FROM price_observations
                WHERE sku=$1 AND competitor=$2 
                AND time > NOW() - INTERVAL '7 days'
            ) t
            WHERE price != prev_price
        """, sku, competitor)

        changes_per_week = result["changes"] or 0

        if changes_per_week > 20:
            return self.MIN_INTERVAL  # Highly volatile: check every 15 min
        elif changes_per_week > 5:
            return 1800               # Moderately volatile: every 30 min
        else:
            return self.BASE_INTERVAL # Stable: every hour

    async def enqueue_jobs(self, redis_client):
        skus = await self.get_active_skus()
        for sku in skus:
            interval = await self.get_interval(sku["sku"], sku["competitor"])
            job = {
                "sku": sku["sku"],
                "competitor": sku["competitor"],
                "url": sku["url"],
                "price_selector": sku["price_selector"],
                "scheduled_at": datetime.now(timezone.utc).isoformat()
            }
            # Only enqueue if not already queued
            key = f"scheduled:{sku['competitor']}:{sku['sku']}"
            if await redis_client.set(key, "1", nx=True, ex=interval):
                await redis_client.rpush("price_jobs", json.dumps(job))
Enter fullscreen mode Exit fullscreen mode

The Alert Engine

class PriceAlertEngine:

    ALERT_RULES = [
        {
            "name": "price_drop_5pct",
            "condition": "price_change_pct < -5",
            "cooldown": 3600,  # Don't re-alert for 1 hour
            "channels": ["slack", "email"]
        },
        {
            "name": "below_our_price",
            "condition": "competitor_price < our_price * 0.95",
            "cooldown": 1800,
            "channels": ["slack"]
        },
        {
            "name": "went_out_of_stock",
            "condition": "availability = 'out_of_stock'",
            "cooldown": 86400,
            "channels": ["email"]
        }
    ]

    async def check_alerts(self, sku: str, new_price: float, competitor: str):
        # Get previous price
        prev = await self.db.fetchrow("""
            SELECT price FROM price_observations
            WHERE sku=$1 AND competitor=$2
            ORDER BY time DESC LIMIT 1 OFFSET 1
        """, sku, competitor)

        if not prev:
            return

        prev_price = float(prev["price"])
        change_pct = (new_price - prev_price) / prev_price * 100

        for rule in self.ALERT_RULES:
            if self.evaluate_rule(rule["condition"], {
                "price_change_pct": change_pct,
                "competitor_price": new_price,
                "our_price": await self.get_our_price(sku)
            }):
                await self.fire_alert(sku, competitor, rule, {
                    "prev_price": prev_price,
                    "new_price": new_price,
                    "change_pct": change_pct
                })

    async def fire_alert(self, sku, competitor, rule, data):
        # Check cooldown
        cooldown_key = f"alert_cooldown:{rule['name']}:{sku}:{competitor}"
        if await self.redis.get(cooldown_key):
            return

        message = f"⚠️ {rule['name']}: {competitor} | {sku}\n"
        message += f"Price: ${data['prev_price']:.2f} → ${data['new_price']:.2f} ({data['change_pct']:+.1f}%)"

        for channel in rule["channels"]:
            await self.send_notification(channel, message)

        await self.redis.set(cooldown_key, "1", ex=rule["cooldown"])
Enter fullscreen mode Exit fullscreen mode

Querying the Time-Series Data

-- Price trend for a specific SKU over the last 30 days
SELECT hour, competitor, latest
FROM hourly_prices
WHERE sku = 'PRODUCT-123'
AND hour > NOW() - INTERVAL '30 days'
ORDER BY hour, competitor;

-- Who has the lowest price right now?
SELECT competitor, 
       LAST(price, time) as current_price,
       MAX(time) as last_updated
FROM price_observations
WHERE sku = 'PRODUCT-123'
AND time > NOW() - INTERVAL '2 hours'
GROUP BY competitor
ORDER BY current_price;

-- Correlation: does Competitor A change price after Competitor B?
SELECT 
    b.time as b_change_time,
    a.time as a_change_time,
    EXTRACT(EPOCH FROM (a.time - b.time))/3600 as hours_lag
FROM price_observations a
JOIN price_observations b ON a.sku = b.sku
WHERE a.competitor = 'CompetitorA'
AND b.competitor = 'CompetitorB'
ORDER BY hours_lag;
Enter fullscreen mode Exit fullscreen mode

Deploying With Docker Compose

services:
  timescaledb:
    image: timescale/timescaledb:latest-pg16
    environment:
      POSTGRES_DB: prices
      POSTGRES_USER: analyst
      POSTGRES_PASSWORD: ${DB_PASSWORD}
    volumes:
      - timescale_data:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine

  scraper:
    build: ./scraper
    environment:
      - REDIS_URL=redis://redis:6379
      - DATABASE_URL=postgresql://analyst:${DB_PASSWORD}@timescaledb/prices
    replicas: 3  # Run 3 scraper workers in parallel
    depends_on:
      - redis
      - timescaledb

  scheduler:
    build: ./scheduler
    environment:
      - REDIS_URL=redis://redis:6379
      - DATABASE_URL=postgresql://analyst:${DB_PASSWORD}@timescaledb/prices
    depends_on:
      - redis
      - timescaledb

volumes:
  timescale_data:
Enter fullscreen mode Exit fullscreen mode

Scale scraper workers up/down with docker compose up --scale scraper=N.

What This Gets You

  • Price history for any SKU with millisecond precision
  • Adaptive collection frequency based on actual volatility
  • Alert delivery within 15 minutes of a qualifying price change
  • SQL query access to all historical data via TimescaleDB
  • Horizontal scaling by adding scraper worker replicas

The total infrastructure cost: about $15/month on a basic VPS. The competitive advantage of knowing competitor pricing within an hour instead of the next day is harder to put a number on.


Pre-Built Price Scrapers

If you want the data collection layer without building from scratch, I maintain Apify actors for Amazon, eBay, Google Shopping, and major e-commerce platforms.

Apify Scrapers Bundle — €29 — includes the e-commerce monitoring suite with price tracking setup guide.

Top comments (0)