DEV Community

Cover image for Build a Price Monitoring System with Python: Amazon, Shopify & eBay (2026)
ZyVOP
ZyVOP

Posted on • Originally published at zyvop.com

Build a Price Monitoring System with Python: Amazon, Shopify & eBay (2026)

Why Price Monitoring Is the Most Practical Scraping Project

Amazon remains the largest e-commerce platform in the world, and extracting product data from it — prices, reviews, seller information — is one of the most common web scraping use cases. Whether you're building a deal-finder for personal use, doing competitive analysis for a business, or building a side product that alerts users when prices drop, a price monitoring system is arguably the most immediately useful scraper you can build.

Here's what makes it interesting technically in 2026:

Modern e-commerce sites — Shopify stores, Amazon, direct-to-consumer brands — render prices with JavaScript, hide them behind A/B tests, and protect their pages with sophisticated bot detection. A naive scraper that worked in 2022 breaks instantly today.

Amazon reprices millions of products every day, and according to industry monitoring data, 37 percent of active Amazon monitors check at least hourly and 12 percent run at five-minute intervals. Electronics and trending categories can move dozens of times per day.

This guide builds a complete, production-ready price monitoring system step by step — from scraping the price off a single page all the way to a scheduled multi-platform tracker with email alerts.


What We'll Build

By the end of this guide you'll have:

  • A multi-platform scraper that monitors prices on Amazon, Shopify stores, and eBay

  • A SQLite price history database that tracks every price change over time

  • A price-drop alert system that emails you when a product hits your target price

  • A daily scheduler that runs automatically without you touching it

  • A trend report showing price movements over time per product


Understanding the Platforms

Each platform has different scraping characteristics:

Platform Rendering Anti-Bot Price Location Difficulty
Amazon Mostly server-side Very aggressive .a-price-whole + .a-price-fraction Hard
Shopify Server + JS hybrid Low–Medium /products/handle.json API Easy
eBay Server-side Moderate .x-price-primary Medium
Generic sites Varies Low Custom per site Varies

Shopify is the easiest — every Shopify store exposes a clean JSON API at /products/[handle].json that returns the full product including all variant prices. Amazon is the hardest, as it reprices constantly and blocks aggressively.


Part 1: The Database Layer

Build the storage foundation first — everything else writes into it.

# db.py
import sqlite3
from datetime import datetime, timezone
from pathlib import Path

DB_PATH = "price_monitor.db"

def get_conn():
    conn = sqlite3.connect(DB_PATH)
    conn.row_factory = sqlite3.Row   # Access columns by name
    return conn

def init_db():
    """Create tables if they don't exist."""
    with get_conn() as conn:
        conn.executescript("""
            CREATE TABLE IF NOT EXISTS products (
                id          INTEGER PRIMARY KEY AUTOINCREMENT,
                name        TEXT NOT NULL,
                url         TEXT NOT NULL UNIQUE,
                platform    TEXT NOT NULL,
                target_price REAL,
                alert_email TEXT,
                active      INTEGER DEFAULT 1,
                added_at    TEXT DEFAULT (datetime('now'))
            );

            CREATE TABLE IF NOT EXISTS price_history (
                id          INTEGER PRIMARY KEY AUTOINCREMENT,
                product_id  INTEGER NOT NULL REFERENCES products(id),
                price       REAL,
                currency    TEXT DEFAULT 'USD',
                in_stock    INTEGER,
                scraped_at  TEXT DEFAULT (datetime('now'))
            );

            CREATE INDEX IF NOT EXISTS idx_history_product
                ON price_history(product_id, scraped_at DESC);
        """)
    print("Database initialised.")

def add_product(name, url, platform, target_price=None, alert_email=None):
    """Register a new product to monitor."""
    with get_conn() as conn:
        try:
            conn.execute("""
                INSERT INTO products (name, url, platform, target_price, alert_email)
                VALUES (?, ?, ?, ?, ?)
            """, (name, url, platform, target_price, alert_email))
            print(f"Added: {name}")
        except sqlite3.IntegrityError:
            print(f"Already exists: {url}")

def save_price(product_id, price, currency="USD", in_stock=True):
    """Record a price observation."""
    with get_conn() as conn:
        conn.execute("""
            INSERT INTO price_history (product_id, price, currency, in_stock)
            VALUES (?, ?, ?, ?)
        """, (product_id, price, currency, int(in_stock)))

def get_active_products():
    """Return all active monitored products."""
    with get_conn() as conn:
        return conn.execute(
            "SELECT * FROM products WHERE active = 1"
        ).fetchall()

def get_price_history(product_id, days=30):
    """Return recent price history for a product."""
    with get_conn() as conn:
        return conn.execute("""
            SELECT price, currency, in_stock, scraped_at
            FROM price_history
            WHERE product_id = ?
              AND scraped_at >= datetime('now', ?)
            ORDER BY scraped_at ASC
        """, (product_id, f"-{days} days")).fetchall()

def get_lowest_price(product_id):
    """Return the all-time lowest recorded price."""
    with get_conn() as conn:
        result = conn.execute(
            "SELECT MIN(price) FROM price_history WHERE product_id = ?",
            (product_id,)
        ).fetchone()
        return result[0]

Enter fullscreen mode Exit fullscreen mode

Part 2: Scraping Amazon

Scraping Amazon prices with Python means pulling five related numbers off a product page: the current Buy Box price, the "was" price (list price), any Subscribe and Save price, per-variant prices, and the price of any other-sellers offers.

# scrapers/amazon.py
import re
import random
import asyncio
from curl_cffi.requests import AsyncSession
from bs4 import BeautifulSoup

HEADERS = {
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
    "Accept-Language": "en-US,en;q=0.9",
    "Accept-Encoding": "gzip, deflate, br",
    "sec-ch-ua": '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"',
    "sec-ch-ua-mobile": "?0",
    "sec-ch-ua-platform": '"Windows"',
    "Sec-Fetch-Dest": "document",
    "Sec-Fetch-Mode": "navigate",
    "Sec-Fetch-Site": "none",
    "Upgrade-Insecure-Requests": "1",
}

def parse_amazon_price(html: str) -> dict:
    """
    Extract price data from an Amazon product page.
    Targets the most stable selectors as of June 2026.
    """
    soup = BeautifulSoup(html, "lxml")

    result = {
        "price":      None,
        "was_price":  None,
        "currency":   "USD",
        "in_stock":   False,
        "title":      None,
        "asin":       None,
    }

    # ── Title ─────────────────────────────────────────────────
    title_el = soup.select_one("#productTitle")
    if title_el:
        result["title"] = title_el.get_text(strip=True)

    # ── Current Price (Buy Box) ────────────────────────────────
    # Method 1: Standard Buy Box price — most common location
    whole = soup.select_one(".a-price-whole")
    frac  = soup.select_one(".a-price-fraction")
    if whole:
        price_str = whole.get_text(strip=True).replace(",", "").rstrip(".")
        if frac:
            price_str += "." + frac.get_text(strip=True)
        try:
            result["price"] = float(price_str)
        except ValueError:
            pass

    # Method 2: Fallback — hidden offscreen price (more reliable on some pages)
    if not result["price"]:
        offscreen = soup.select_one(".a-offscreen")
        if offscreen:
            price_text = offscreen.get_text(strip=True)
            numbers = re.findall(r"[\d,]+\.?\d*", price_text.replace(",", ""))
            if numbers:
                try:
                    result["price"] = float(numbers[0])
                except ValueError:
                    pass

    # ── Was / List Price ──────────────────────────────────────
    was_el = soup.select_one(".basisPrice .a-offscreen")
    if was_el:
        was_text = was_el.get_text(strip=True)
        numbers = re.findall(r"[\d,]+\.?\d*", was_text.replace(",", ""))
        if numbers:
            try:
                result["was_price"] = float(numbers[0])
            except ValueError:
                pass

    # ── Stock Status ──────────────────────────────────────────
    avail_el = soup.select_one("#availability span")
    if avail_el:
        avail_text = avail_el.get_text(strip=True).lower()
        result["in_stock"] = "in stock" in avail_text

    # ── Currency (detect from symbol) ────────────────────────
    price_symbol = soup.select_one(".a-price-symbol")
    if price_symbol:
        symbol = price_symbol.get_text(strip=True)
        symbol_map = {"$": "USD", "£": "GBP", "": "EUR", "": "INR"}
        result["currency"] = symbol_map.get(symbol, "USD")

    # ── ASIN from URL or page ─────────────────────────────────
    asin_input = soup.select_one("#ASIN")
    if asin_input:
        result["asin"] = asin_input.get("value")

    return result

async def scrape_amazon(url: str, proxy: str = None) -> dict:
    """Fetch and parse a single Amazon product page."""
    proxies = {"https": proxy} if proxy else None

    async with AsyncSession(impersonate="chrome120") as session:
        # Visit homepage first for a more natural session pattern
        await session.get(
            "https://www.amazon.com",
            headers=HEADERS,
            proxies=proxies,
            timeout=15
        )
        await asyncio.sleep(random.uniform(1.5, 3.0))

        response = await session.get(
            url,
            headers=HEADERS,
            proxies=proxies,
            timeout=20
        )

    if response.status_code != 200:
        raise Exception(f"Amazon returned {response.status_code}")

    # Detect block page
    if "Type the characters you see" in response.text or \
       "Enter the characters you see below" in response.text:
        raise Exception("CAPTCHA encountered — rotate proxy and retry")

    return parse_amazon_price(response.text)
Enter fullscreen mode Exit fullscreen mode

Part 3: Scraping Shopify (The Easy Way)

Every Shopify store has a secret weapon: the /products/[handle].json endpoint returns clean JSON with full product data including all variant prices, no scraping needed.

# scrapers/shopify.py
import httpx
import re
from urllib.parse import urlparse

async def scrape_shopify(url: str) -> dict:
    """
    Extract price from any Shopify store using the built-in JSON API.
    Works on all Shopify stores — no CSS selectors needed.

    Supported URL formats:
      - https://store.com/products/my-product
      - https://store.myshopify.com/products/handle
    """
    parsed = urlparse(url)
    base   = f"{parsed.scheme}://{parsed.netloc}"

    # Extract product handle from URL path
    # URL pattern: /products/product-handle or /collections/x/products/handle
    path_match = re.search(r"/products/([^/?#]+)", parsed.path)
    if not path_match:
        raise ValueError(f"Cannot extract product handle from: {url}")

    handle   = path_match.group(1)
    json_url = f"{base}/products/{handle}.json"

    headers = {
        "User-Agent": "Mozilla/5.0 (compatible; PriceBot/1.0)",
        "Accept": "application/json",
    }

    async with httpx.AsyncClient(headers=headers, follow_redirects=True) as client:
        r = await client.get(json_url, timeout=15)
        r.raise_for_status()
        data = r.json()

    product  = data["product"]
    variants = product.get("variants", [])

    if not variants:
        return {"price": None, "in_stock": False, "title": product.get("title")}

    # Get the first available variant's price
    # (or lowest price if all variants have different prices)
    prices = []
    for v in variants:
        if v.get("available", True):
            try:
                prices.append(float(v["price"]))
            except (ValueError, KeyError):
                pass

    lowest_price = min(prices) if prices else None

    return {
        "title":          product.get("title"),
        "price":          lowest_price,
        "was_price":      float(variants[0].get("compare_at_price") or 0) or None,
        "currency":       "USD",  # Shopify JSON doesn't include currency — check storefront
        "in_stock":       any(v.get("available", False) for v in variants),
        "variants_count": len(variants),
        "vendor":         product.get("vendor"),
        "product_type":   product.get("product_type"),
    }
Enter fullscreen mode Exit fullscreen mode

Part 4: Scraping eBay

# scrapers/ebay.py
import httpx
import re
from bs4 import BeautifulSoup

HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/120",
    "Accept-Language": "en-US,en;q=0.9",
    "Accept": "text/html,application/xhtml+xml,*/*;q=0.8",
}

async def scrape_ebay(url: str) -> dict:
    """Extract price from an eBay listing page."""
    async with httpx.AsyncClient(headers=HEADERS, follow_redirects=True) as client:
        r = await client.get(url, timeout=20)
        r.raise_for_status()

    soup = BeautifulSoup(r.text, "lxml")
    result = {"price": None, "was_price": None, "in_stock": True,
              "currency": "USD", "title": None}

    # Title
    title_el = soup.select_one("h1.x-item-title__mainTitle span")
    if title_el:
        result["title"] = title_el.get_text(strip=True)

    # Current price
    price_el = soup.select_one(".x-price-primary span.ux-textspans")
    if price_el:
        price_text = price_el.get_text(strip=True)
        nums = re.findall(r"[\d,]+\.?\d*", price_text.replace(",", ""))
        if nums:
            try:
                result["price"] = float(nums[0])
            except ValueError:
                pass

    # Was price (strikethrough)
    was_el = soup.select_one(".x-additional-info__original-price span")
    if was_el:
        was_text = was_el.get_text(strip=True)
        nums = re.findall(r"[\d,]+\.?\d*", was_text.replace(",", ""))
        if nums:
            try:
                result["was_price"] = float(nums[0])
            except ValueError:
                pass

    # Stock status
    qty_el = soup.select_one(".d-quantity__availability")
    if qty_el:
        qty_text = qty_el.get_text(strip=True).lower()
        result["in_stock"] = "sold out" not in qty_text and "unavailable" not in qty_text

    return result
Enter fullscreen mode Exit fullscreen mode

Part 5: The Unified Monitor

# monitor.py
import asyncio
import random
from db import get_active_products, save_price, get_lowest_price, init_db
from scrapers.amazon import scrape_amazon
from scrapers.shopify import scrape_shopify
from scrapers.ebay import scrape_ebay

SCRAPER_MAP = {
    "amazon":  scrape_amazon,
    "shopify": scrape_shopify,
    "ebay":    scrape_ebay,
}

async def check_product(product) -> dict | None:
    """Scrape price for one product and save to database."""
    scraper = SCRAPER_MAP.get(product["platform"])
    if not scraper:
        print(f"  No scraper for platform: {product['platform']}")
        return None

    try:
        data = await scraper(product["url"])
        if data.get("price") is None:
            print(f"  [{product['name']}] No price found")
            return None

        save_price(
            product_id=product["id"],
            price=data["price"],
            currency=data.get("currency", "USD"),
            in_stock=data.get("in_stock", True),
        )

        print(f"  [{product['name']}] ${data['price']:.2f}"
              f"{' ⚠ OUT OF STOCK' if not data.get('in_stock') else ''}")
        return data

    except Exception as e:
        print(f"  [{product['name']}] Error: {e}")
        return None

async def run_monitor():
    """Run one monitoring cycle across all active products."""
    products = get_active_products()
    if not products:
        print("No products to monitor. Add some with add_product().")
        return

    print(f"\nMonitoring {len(products)} products...\n")
    results = {}

    for product in products:
        data = await check_product(product)
        if data:
            results[product["id"]] = data
            # Polite delay between requests
            await asyncio.sleep(random.uniform(2.0, 5.0))

    return results
Enter fullscreen mode Exit fullscreen mode

Part 6: Price-Drop Email Alerts

# alerts.py
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from db import get_conn, get_lowest_price

# Configure your email credentials
SMTP_HOST     = "smtp.gmail.com"
SMTP_PORT     = 587
SENDER_EMAIL  = "your_monitor@gmail.com"
SENDER_PASS   = "your_app_password"   # Use Gmail App Password, not your real password

def send_alert(to_email: str, product_name: str, current_price: float,
               target_price: float, product_url: str, currency: str = "USD"):
    """Send a price-drop email alert."""
    symbol = {"USD": "$", "GBP": "£", "EUR": "", "INR": ""}.get(currency, "$")

    subject = f"🔔 Price Drop Alert: {product_name}"
    body = f"""
    Great news! The price for a product you're monitoring has dropped.

    Product : {product_name}
    Current price : {symbol}{current_price:.2f}
    Your target   : {symbol}{target_price:.2f}
    Savings       : {symbol}{target_price - current_price:.2f}

    👉 Buy now: {product_url}

    --
    Python Price Monitor
    """

    msg = MIMEMultipart()
    msg["From"]    = SENDER_EMAIL
    msg["To"]      = to_email
    msg["Subject"] = subject
    msg.attach(MIMEText(body, "plain"))

    try:
        with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
            server.starttls()
            server.login(SENDER_EMAIL, SENDER_PASS)
            server.send_message(msg)
        print(f"  📧 Alert sent to {to_email} for {product_name}")
    except Exception as e:
        print(f"  Alert send failed: {e}")

def check_and_alert(product, current_price: float):
    """Check if current price triggers an alert and send email if so."""
    target = product["target_price"]
    email  = product["alert_email"]

    if not target or not email:
        return  # No alert configured for this product

    if current_price <= target:
        print(f"  🎯 Target hit! {product['name']}: ${current_price:.2f} ≤ ${target:.2f}")
        send_alert(
            to_email=email,
            product_name=product["name"],
            current_price=current_price,
            target_price=target,
            product_url=product["url"],
        )

Enter fullscreen mode Exit fullscreen mode

Part 7: Price Trend Reporting

# report.py
import pandas as pd
from db import get_conn, get_active_products, get_price_history

def generate_price_report(days: int = 30) -> pd.DataFrame:
    """Generate a full price trend report for all monitored products."""
    products = get_active_products()
    rows = []

    for product in products:
        history = get_price_history(product["id"], days=days)
        if not history:
            continue

        prices = [row["price"] for row in history if row["price"]]
        if not prices:
            continue

        latest   = prices[-1]
        lowest   = min(prices)
        highest  = max(prices)
        first    = prices[0]
        change   = ((latest - first) / first * 100) if first else 0

        rows.append({
            "product":        product["name"],
            "platform":       product["platform"],
            "current_price":  round(latest, 2),
            "lowest_price":   round(lowest, 2),
            "highest_price":  round(highest, 2),
            "price_change_%": round(change, 1),
            "observations":   len(prices),
            "target_price":   product["target_price"],
            "below_target":   latest <= product["target_price"] if product["target_price"] else None,
        })

    df = pd.DataFrame(rows)
    return df

def print_report():
    df = generate_price_report(days=30)
    if df.empty:
        print("No price history yet. Run the monitor first.")
        return

    print("\n══ PRICE MONITORING REPORT (Last 30 days) ══\n")
    print(df.to_string(index=False))

    below = df[df["below_target"] == True]
    if not below.empty:
        print(f"\n🎯 {len(below)} product(s) currently at or below target price:")
        for _, row in below.iterrows():
            print(f"{row['product']}: ${row['current_price']:.2f} "
                  f"(target: ${row['target_price']:.2f})")
Enter fullscreen mode Exit fullscreen mode

Part 8: Complete Setup and Scheduler

# main.py — run this file to start monitoring
import asyncio
import schedule
import time
from db import init_db, add_product
from monitor import run_monitor
from alerts import check_and_alert
from report import print_report
from db import get_active_products, get_price_history

def setup():
    """Initialise database and add products to monitor."""
    init_db()

    # Amazon products — use full product URLs
    add_product(
        name="Sony WH-1000XM5 Headphones",
        url="https://www.amazon.com/dp/B09XS7JWHH",
        platform="amazon",
        target_price=280.00,
        alert_email="you@example.com"
    )

    # Shopify store products
    add_product(
        name="Example Shopify Product",
        url="https://some-shopify-store.com/products/product-handle",
        platform="shopify",
        target_price=50.00,
        alert_email="you@example.com"
    )

    # eBay listings
    add_product(
        name="Vintage Camera on eBay",
        url="https://www.ebay.com/itm/123456789",
        platform="ebay",
        target_price=120.00,
        alert_email="you@example.com"
    )

async def monitoring_cycle():
    """One full monitoring pass with alert checking."""
    products = get_active_products()
    from monitor import check_product
    from scrapers.amazon import scrape_amazon
    from scrapers.shopify import scrape_shopify
    from scrapers.ebay import scrape_ebay

    for product in products:
        data = await check_product(product)
        if data and data.get("price"):
            check_and_alert(product, data["price"])

    print_report()

def run_scheduled():
    """Run monitoring on a schedule."""
    asyncio.run(monitoring_cycle())
    print("Next run in 6 hours.")

if __name__ == "__main__":
    setup()

    # Run immediately on start
    asyncio.run(monitoring_cycle())

    # Then every 6 hours
    schedule.every(6).hours.do(run_scheduled)

    print("\nScheduler running. Press Ctrl+C to stop.")
    while True:
        schedule.run_pending()
        time.sleep(60)
Enter fullscreen mode Exit fullscreen mode

Pro Tips for Reliable Price Monitoring

Tip 1 — Store raw HTML alongside prices If your parser breaks after a site redesign, you can re-parse from stored HTML without re-scraping:

# Add to save_price()
with open(f"html_cache/{product_id}_{timestamp}.html", "w") as f:
    f.write(raw_html)
Enter fullscreen mode Exit fullscreen mode

Tip 2 — Track percentage drops, not just absolute prices

def is_significant_drop(old_price, new_price, threshold_pct=5.0) -> bool:
    """Return True if price dropped by more than threshold_pct percent."""
    if not old_price or not new_price:
        return False
    drop_pct = (old_price - new_price) / old_price * 100
    return drop_pct >= threshold_pct
Enter fullscreen mode Exit fullscreen mode

Tip 3 — Handle dynamic pricing variations Amazon shows different prices to different users based on their location, login state, browsing history, and time of day. To get consistent readings, always scrape with the same headers, same proxy location, and without cookies from previous sessions.

Tip 4 — Monitor variant-level prices on Shopify Shopify products often have multiple variants (sizes, colours) at different prices. Monitor all variants to catch deals on specific options:

all_variant_prices = {
    v["title"]: float(v["price"])
    for v in product["variants"]
    if v.get("available")
}
Enter fullscreen mode Exit fullscreen mode

FAQ

Q: Is scraping Amazon prices legal? Scraping Amazon is possible and legal as long as you follow the guidelines and stay away from any login-protected personal information. Price data displayed publicly is generally fair game for personal use. Don't republish or resell the scraped data.

Q: How do I avoid getting blocked by Amazon? Use curl_cffi with impersonate="chrome120", rotate residential proxies, add 2–5 second delays between requests, and never scrape more than a few hundred products per day per IP.

Q: What if the price selector breaks? Amazon changes its HTML structure regularly. When that happens: open the product page in Chrome DevTools, find the price element, copy its updated selector, and update parse_amazon_price(). Use data- attributes when available — they're more stable than class names.

Q: Can I monitor prices on Indian e-commerce sites like Flipkart or Meesho? Yes — both are server-rendered and moderately protected. Use httpx with curl_cffi impersonation for Flipkart. Meesho is React-rendered and needs Playwright.


Summary

Component Tool What it does
Amazon scraper curl_cffi + BeautifulSoup TLS impersonation + price extraction
Shopify scraper httpx + JSON API Clean structured data, no CSS selectors
eBay scraper httpx + BeautifulSoup Server-rendered price extraction
Storage SQLite Price history with full trend data
Alerts smtplib Email when target price is hit
Reporting pandas Price trend summary across products
Scheduling schedule Runs every N hours automatically

Originally published on ZyVOP

💡 For more articles like this, subscribe to the ZyVOP newsletter!

Top comments (0)