DEV Community

Cover image for Building a Real-Time Amazon Sales Tracker with Scraper API: Complete Python Guide
Mox Loop
Mox Loop

Posted on

Building a Real-Time Amazon Sales Tracker with Scraper API: Complete Python Guide

TL;DR

Amazon doesn't expose sales data via any official API. Every "sales tracker tool" estimates from BSR (Best Seller Rank) signals. This tutorial shows you how to:

  1. Understand the BSR-to-sales estimation model
  2. Query real-time ASIN data via Pangolinfo Scrape API
  3. Build async batch queries for large ASIN sets
  4. Set up a daily monitoring pipeline with change alerts

Why BSR-Based Estimation Works (And Where It Breaks)

Amazon updates BSR frequently — hourly in hot categories, daily in slower ones. Sales intelligence tools build regression models that map BSR position to approximate unit volume. The models are category-specific because a BSR of 500 in Electronics represents very different volume than BSR 500 in Collectibles.

Accuracy in practice: ±25-30% in major US categories, ±100%+ in niche subcategories. Freshness: SaaS tools typically snapshot every 1-7 days. If you need real-time data or more than a few thousand queries per day, you need direct API access.


Setup

pip install aiohttp python-dotenv schedule
Enter fullscreen mode Exit fullscreen mode
# .env
PANGOLINFO_API_KEY=your_api_key_here
Enter fullscreen mode Exit fullscreen mode

Core Module: Single ASIN Query

# amazon_tracker.py
import os
import requests
from datetime import datetime
from dataclasses import dataclass
from typing import Optional
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv("PANGOLINFO_API_KEY")
API_URL = "https://api.pangolinfo.com/v1/amazon/product"

# BSR → monthly sales reference table (US, major categories)
# Source: industry benchmarks — calibrate per-category for production use
BSR_SALES_REF = {
    100: 12000,
    500: 4000,
    1000: 2200,
    3000: 900,
    5000: 600,
    10000: 300,
    30000: 80,
    100000: 20,
}

@dataclass
class ASINSnapshot:
    asin: str
    marketplace: str
    timestamp: str
    main_bsr: Optional[int]
    category: Optional[str]
    sub_bsr: Optional[int]
    sub_category: Optional[str]
    estimated_sales: Optional[int]
    price: Optional[float]
    review_count: Optional[int]
    availability: Optional[str]
    buybox_seller: Optional[str]
    error: Optional[str] = None

def bsr_to_sales(bsr: int) -> int:
    for threshold in sorted(BSR_SALES_REF):
        if bsr <= threshold:
            return BSR_SALES_REF[threshold]
    return 5

def query_asin(asin: str, marketplace: str = "US") -> ASINSnapshot:
    headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
    payload = {"asin": asin, "marketplace": marketplace}

    snapshot = ASINSnapshot(
        asin=asin, marketplace=marketplace,
        timestamp=datetime.utcnow().isoformat(),
        main_bsr=None, category=None, sub_bsr=None, sub_category=None,
        estimated_sales=None, price=None, review_count=None,
        availability=None, buybox_seller=None
    )

    try:
        resp = requests.post(API_URL, headers=headers, json=payload, timeout=30)
        resp.raise_for_status()
        data = resp.json()

        bsr_list = data.get("best_sellers_rank", [])
        if bsr_list:
            snapshot.main_bsr = bsr_list[0].get("rank")
            snapshot.category = bsr_list[0].get("category")
            if len(bsr_list) > 1:
                snapshot.sub_bsr = bsr_list[1].get("rank")
                snapshot.sub_category = bsr_list[1].get("category")
            if snapshot.main_bsr:
                snapshot.estimated_sales = bsr_to_sales(snapshot.main_bsr)

        snapshot.price = data.get("price")
        snapshot.review_count = data.get("review_count")
        snapshot.availability = data.get("availability")
        snapshot.buybox_seller = data.get("buybox_winner", {}).get("seller_name")

    except Exception as e:
        snapshot.error = str(e)

    return snapshot
Enter fullscreen mode Exit fullscreen mode

Async Batch Queries (10x faster for large ASIN sets)

# async_batch.py
import asyncio
import aiohttp
from typing import List
from amazon_tracker import BSR_SALES_REF, bsr_to_sales, ASINSnapshot
from datetime import datetime
import os

API_KEY = os.getenv("PANGOLINFO_API_KEY")
API_URL = "https://api.pangolinfo.com/v1/amazon/product"
CONCURRENCY = 10  # adjust based on your API rate limit

async def query_asin_async(
    session: aiohttp.ClientSession, asin: str, marketplace: str = "US"
) -> ASINSnapshot:
    payload = {"asin": asin, "marketplace": marketplace}
    snapshot = ASINSnapshot(
        asin=asin, marketplace=marketplace,
        timestamp=datetime.utcnow().isoformat(),
        main_bsr=None, category=None, sub_bsr=None, sub_category=None,
        estimated_sales=None, price=None, review_count=None,
        availability=None, buybox_seller=None
    )

    try:
        async with session.post(API_URL, json=payload) as resp:
            resp.raise_for_status()
            data = await resp.json()

            bsr_list = data.get("best_sellers_rank", [])
            if bsr_list:
                snapshot.main_bsr = bsr_list[0].get("rank")
                snapshot.category = bsr_list[0].get("category")
                if snapshot.main_bsr:
                    snapshot.estimated_sales = bsr_to_sales(snapshot.main_bsr)

            snapshot.price = data.get("price")
            snapshot.review_count = data.get("review_count")
            snapshot.availability = data.get("availability")
    except Exception as e:
        snapshot.error = str(e)

    return snapshot

async def batch_query(asins: List[str], marketplace: str = "US") -> List[ASINSnapshot]:
    headers = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}
    semaphore = asyncio.Semaphore(CONCURRENCY)

    async def bounded(asin):
        async with semaphore:
            return await query_asin_async(session, asin, marketplace)

    async with aiohttp.ClientSession(headers=headers) as session:
        return await asyncio.gather(*[bounded(asin) for asin in asins])

# Usage
if __name__ == "__main__":
    import json
    from dataclasses import asdict

    test_asins = ["B08N5WRWNW", "B07XJ8C8F5", "B09G9FPHY6"]
    results = asyncio.run(batch_query(test_asins))

    for r in results:
        if not r.error:
            print(f"{r.asin}: BSR={r.main_bsr}, est_sales={r.estimated_sales}, price={r.price}")

    with open("results.json", "w") as f:
        json.dump([asdict(r) for r in results], f, indent=2)
Enter fullscreen mode Exit fullscreen mode

Daily Monitoring Pipeline with Change Detection

# monitor.py
import sqlite3
import schedule
import time
import asyncio
import requests
from dataclasses import asdict
from async_batch import batch_query
from datetime import datetime

DB_PATH = "asin_monitor.db"
WATCH_LIST = ["B08N5WRWNW", "B07XJ8C8F5", "B09G9FPHY6"]
FEISHU_WEBHOOK = "https://open.feishu.cn/open-apis/bot/v2/hook/YOUR_TOKEN"
BSR_ALERT_THRESHOLD = 0.30  # alert on >30% BSR change

def init_db():
    with sqlite3.connect(DB_PATH) as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS snapshots (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                asin TEXT, main_bsr INTEGER, estimated_sales INTEGER,
                price REAL, review_count INTEGER, availability TEXT,
                recorded_at TEXT
            )
        """)

def save_snapshots(snapshots):
    with sqlite3.connect(DB_PATH) as conn:
        for s in snapshots:
            if not s.error:
                conn.execute(
                    "INSERT INTO snapshots VALUES (NULL,?,?,?,?,?,?,?)",
                    (s.asin, s.main_bsr, s.estimated_sales, s.price,
                     s.review_count, s.availability, s.timestamp)
                )

def check_alerts(snapshots):
    with sqlite3.connect(DB_PATH) as conn:
        for s in snapshots:
            if s.error or not s.main_bsr:
                continue
            prev = conn.execute(
                "SELECT main_bsr FROM snapshots WHERE asin=? ORDER BY recorded_at DESC LIMIT 1 OFFSET 1",
                (s.asin,)
            ).fetchone()
            if prev and prev[0]:
                change = (s.main_bsr - prev[0]) / prev[0]
                if abs(change) > BSR_ALERT_THRESHOLD:
                    direction = "improved" if change < 0 else "dropped"
                    msg = f"BSR Alert: {s.asin} rank {direction} {change:+.1%} | {prev[0]} -> {s.main_bsr}"
                    print(f"[ALERT] {msg}")
                    send_feishu(msg)

def send_feishu(message: str):
    try:
        requests.post(FEISHU_WEBHOOK, json={"msg_type": "text", "content": {"text": message}}, timeout=5)
    except Exception:
        pass

def daily_job():
    print(f"[{datetime.now().isoformat()}] Running daily BSR snapshot...")
    snapshots = asyncio.run(batch_query(WATCH_LIST, marketplace="US"))
    save_snapshots(snapshots)
    check_alerts(snapshots)
    success = sum(1 for s in snapshots if not s.error)
    print(f"Done: {success}/{len(snapshots)} succeeded")

init_db()
schedule.every().day.at("09:00").do(daily_job)

print("Monitor started. Running daily at 09:00.")
while True:
    schedule.run_pending()
    time.sleep(60)
Enter fullscreen mode Exit fullscreen mode

Key Design Decisions

Why not scrape Amazon directly? Rate limiting, IP blocking, CAPTCHA challenges, and JavaScript rendering make direct scraping expensive to maintain reliably at scale. Pangolinfo Scrape API handles all of that transparently.

BSR-to-sales model accuracy? The reference table in this code is a simplified version for illustration. For production, calibrate per category using a sample of ASINs where you have verified sales data.

Rate limits? The CONCURRENCY = 10 semaphore in the async module keeps you well within typical API limits. Adjust based on your tier.


Resources

Questions or issues? Drop them in the comments.

Top comments (0)