DEV Community

Cover image for Amazon Top 100 Category Scraper: Why I Stopped Building My Own and What I Use Instead
Mox Loop
Mox Loop

Posted on

Amazon Top 100 Category Scraper: Why I Stopped Building My Own and What I Use Instead

TL;DR

Building your own Amazon top 100 category scraper in 2026 means you're signing up for a continuous anti-bot arms race that consumes 40% of your data engineering capacity (per Jungle Scout's own data). After going through the full cycle — DIY scraper, then headless browsers, then duct-taping together proxy pools — I switched to Pangolinfo Scrape API. Here's the honest technical breakdown of why, with all the code you need to replicate it.


Why Amazon Category Top 100 Data Matters

The Best Sellers list for any Amazon category is one of the cleanest public signals of real consumer demand. Unlike keyword search volume (which includes curiosity clicks) or ad impression data (which reflects advertiser intent), BSR reflects actual purchases.

What makes it even more valuable is the temporal dimension. A static snapshot tells you who's winning. A time-series of rank positions tells you who's accelerating — and rank velocity is where the real opportunity signal lives.

Three key analytical patterns from Top 100 data:

  1. Rank velocity — products moving from #80 → #14 in 48h signal either competitor stockouts, viral traction, or pre-holiday inventory buildup. These require completely different responses.

  2. Category concentration — if the top 10 positions are increasingly dominated by 2–3 brands, the competitive moat is deepening. If new products regularly stabilize in the top 30 within 6 weeks, the window is open.

  3. Price band migration — the median price of the Top 50 shifting over time indicates where consumer willingness-to-pay is moving. More reliable than any market research report.


The Real Technical Challenges

Amazon's anti-bot system is a layered defense, not a single check:

Layer 1: IP rate limiting (~30 req/min/IP before soft block)
Layer 2: TLS fingerprint validation (your requests library ≠ Chrome)  
Layer 3: Behavioral analysis (Cookie chain, request timing patterns)
Layer 4: Dynamic CAPTCHA injection (60–80% trigger rate at high frequency)
Enter fullscreen mode Exit fullscreen mode

Plus the parsing fragility problem: Amazon made 11 structural updates to Best Sellers pages in 2024, with 3 causing complete scraper failures. You don't get notified. Your data pipeline just silently returns None for every field.


Four Approaches Compared

Approach 1: DIY Python (requests + BeautifulSoup)

What works: Fast to prototype, zero infrastructure cost to start.

What breaks: TLS fingerprints expose you immediately. Layer 4 CAPTCHA at 60–80% trigger rate with residential proxies. Every Amazon HTML update breaks your selectors. Residential proxy costs at scale: $1,860–5,640/month for 200 categories/day.

# This looks fine in local testing...
import requests
from bs4 import BeautifulSoup

resp = requests.get(
    "https://www.amazon.com/Best-Sellers-Electronics/zgbs/electronics/",
    headers={"User-Agent": "Mozilla/5.0..."}
)
soup = BeautifulSoup(resp.text, 'html.parser')
items = soup.select('.zg-item-immersion')
# ...until Amazon updates the DOM or bans your IP
Enter fullscreen mode Exit fullscreen mode

Verdict: Fine for proof of concept, breaks at production scale.

Approach 2: Playwright / Puppeteer

What works: Handles JS rendering, better TLS profile than raw requests.

What breaks: Amazon detects navigator.webdriver. 200–500MB RAM per browser instance. 3–8s per page load vs ~0.5s for a static request. Scaling to 500 categories/day requires more server capacity than a proper proxy pool.

Verdict: Use for screenshots or interaction testing, not bulk data collection.

Approach 3: SaaS Tool APIs (Jungle Scout / Helium 10)

What works: No code required, clean data.

What breaks: 24–72h data freshness lag. Helium 10 API requires Diamond ($279/month minimum). Hard API request caps. You're accessing their database, not the raw market data — their collection strategy, field definitions, and category coverage are outside your control.

Verdict: Fine for individual sellers checking trends occasionally, not for building data pipelines.

Approach 4: Pangolinfo Scrape API ✅

This is what I use now. The core value prop: it's a purpose-built REST API for Amazon data with maintained parsing templates, real-time collection (not cached data), and built-in anti-bot handling.

What's genuinely different:

  • Parsing templates are updated automatically when Amazon changes structure (2–4h turnaround vs your 6–48h outage)
  • Real-time: every call hits Amazon live, not a cache
  • Field depth: includes Customer Says summaries, full subcategory breadcrumb path, SP ad slot data (98% collection rate)
  • Output: JSON, Markdown (for LLM pipelines), or raw HTML

Complete Python Pipeline

"""
amazon_top100_pipeline.py
Production-ready Amazon category Top 100 data collection pipeline
Using Pangolinfo Scrape API: https://www.pangolinfo.com/scrape-api/
"""

import requests
import sqlite3
import schedule
import time
import pandas as pd
from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed
from loguru import logger

# --- Config ---
API_KEY = "your_pangolinfo_api_key"  # https://tool.pangolinfo.com
API_URL = "https://api.pangolinfo.com/scrape"

CATEGORIES = [
    {"url": "https://www.amazon.com/Best-Sellers-Electronics/zgbs/electronics/", "marketplace": "US"},
    {"url": "https://www.amazon.com/Best-Sellers-Home-Kitchen/zgbs/kitchen/", "marketplace": "US"},
    {"url": "https://www.amazon.com/Best-Sellers-Sports-Outdoors/zgbs/sporting-goods/", "marketplace": "US"},
    {"url": "https://www.amazon.co.uk/Best-Sellers-Electronics/zgbs/electronics/", "marketplace": "UK"},
    # Add more...
]

HEADERS = {"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}


# --- Collection ---

def fetch_top100(url: str, marketplace: str = "US") -> list[dict]:
    """Fetch Top 100 for a single category."""
    try:
        resp = requests.post(API_URL, headers=HEADERS, json={
            "url": url,
            "marketplace": marketplace,
            "output_format": "json",
            "parse_template": "amazon_bestsellers",
            "include_fields": [
                "rank", "asin", "title", "price", "rating", "review_count",
                "brand", "is_prime", "badge", "subcategory_path",
                "image_url", "customer_says", "sp_ad_slot"
            ]
        }, timeout=30)
        resp.raise_for_status()

        products = resp.json().get("products", [])
        ts = datetime.now(timezone.utc).isoformat()
        for p in products:
            p.update({"_scraped_at": ts, "_marketplace": marketplace, "_category_url": url})

        logger.success(f"{marketplace} | {url.split('/zgbs/')[-1].rstrip('/')}{len(products)} products")
        return products
    except Exception as e:
        logger.error(f"Failed {url}: {e}")
        return []


def collect_all(categories: list[dict], workers: int = 5) -> list[dict]:
    """Parallel collection across all categories."""
    all_products = []
    with ThreadPoolExecutor(max_workers=workers) as ex:
        futures = {ex.submit(fetch_top100, c["url"], c.get("marketplace", "US")): c for c in categories}
        for f in as_completed(futures):
            all_products.extend(f.result())
    logger.info(f"Total: {len(all_products)} products from {len(categories)} categories")
    return all_products


# --- Storage ---

def init_db(path: str = "top100.db") -> sqlite3.Connection:
    conn = sqlite3.connect(path)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS top100 (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            scraped_at TEXT, marketplace TEXT, category_url TEXT,
            rank INTEGER, asin TEXT, title TEXT, price REAL,
            rating REAL, review_count INTEGER, brand TEXT,
            is_prime INTEGER, badge TEXT, subcategory_path TEXT,
            image_url TEXT, customer_says TEXT, sp_ad_slot INTEGER,
            UNIQUE(scraped_at, asin, category_url)
        )
    """)
    conn.execute("CREATE INDEX IF NOT EXISTS idx_asin ON top100(asin)")
    conn.execute("CREATE INDEX IF NOT EXISTS idx_cat ON top100(category_url, scraped_at)")
    conn.commit()
    return conn


def save(conn: sqlite3.Connection, products: list[dict]) -> int:
    saved = 0
    for p in products:
        try:
            conn.execute("""
                INSERT OR IGNORE INTO top100
                (scraped_at,marketplace,category_url,rank,asin,title,price,
                 rating,review_count,brand,is_prime,badge,subcategory_path,
                 image_url,customer_says,sp_ad_slot)
                VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
            """, (
                p.get("_scraped_at"), p.get("_marketplace"), p.get("_category_url"),
                p.get("rank"), p.get("asin"), p.get("title"), p.get("price"),
                p.get("rating"), p.get("review_count"), p.get("brand"),
                int(bool(p.get("is_prime"))), p.get("badge"),
                p.get("subcategory_path"), p.get("image_url"),
                p.get("customer_says"), p.get("sp_ad_slot")
            ))
            saved += 1
        except Exception as e:
            logger.debug(f"Skip {p.get('asin')}: {e}")
    conn.commit()
    return saved


# --- Analysis ---

def find_rising_products(conn: sqlite3.Connection, days: int = 7) -> pd.DataFrame:
    """Products with fastest rank improvement over N days."""
    df = pd.read_sql_query(f"""
        SELECT asin, title, brand, category_url,
               MAX(rank) as rank_first, MIN(rank) as rank_best,
               MAX(rank) - MIN(rank) as improvement,
               ROUND(AVG(price), 2) as avg_price,
               MAX(review_count) as reviews
        FROM top100
        WHERE scraped_at >= datetime('now', '-{days} days')
        GROUP BY asin, category_url
        HAVING improvement >= 20
        ORDER BY improvement DESC
        LIMIT 20
    """, conn)
    return df


# --- Scheduler ---

def job():
    logger.info("=== Collection job started ===")
    products = collect_all(CATEGORIES)
    conn = init_db()
    saved = save(conn, products)
    logger.info(f"Saved {saved} records")

    rising = find_rising_products(conn)
    if not rising.empty:
        logger.info(f"\nTop Rising Products:\n{rising[['asin','title','rank_first','rank_best','improvement']].to_string()}")
    conn.close()

schedule.every(8).hours.do(job)

if __name__ == "__main__":
    job()
    while True:
        schedule.run_pending()
        time.sleep(60)
Enter fullscreen mode Exit fullscreen mode

Real Results: Before and After

A kitchen appliance brand I consulted with ran this pipeline for 6 months across 8 subcategories. Before: manual Best Sellers checks + Jungle Scout, 42% new product launch success rate, $68k in stranded inventory. After: 8-hour interval automated collection, rank velocity alerts, price band analysis. Result: 78% launch success rate, stranded inventory down 82%.

The key insight wasn't some magical ML model. It was simply having current data instead of 48-72 hour old data at the moment decisions needed to be made.


Wrapping Up

The Amazon top 100 category scraper problem has a clean engineering solution: stop fighting Amazon's anti-bot infrastructure yourself, and use an API built specifically for this. The math on DIY vs API is clear once you account for engineering maintenance time.

Get started: Pangolinfo Scrape API | Docs: docs.pangolinfo.com

Questions or thoughts? Drop them in the comments — happy to dig into specific implementation challenges.

Top comments (0)