DEV Community

Cover image for Building an End-to-End Amazon Movers & Shakers Data Pipeline: Engineering Guide from Real-Time Crawling to Automated Alerting
Mox Loop
Mox Loop

Posted on

Building an End-to-End Amazon Movers & Shakers Data Pipeline: Engineering Guide from Real-Time Crawling to Automated Alerting

Background

I've been building data infrastructure for Amazon seller tools for the past three years. The most consistently requested feature from our seller clients is early trend detection — specifically, getting notified when a product starts showing unusual rank velocity before the broader market notices.

Amazon's Movers and Shakers (MnS) list is the best public data source for this. It tracks the largest BSR gainers in each category over a rolling 24-hour window, updating hourly. A product climbing from rank #100,000 to #2,000 represents a +4,900% gain — invisible in the Best Sellers list, immediately flagged in MnS.

The technical challenge: scraping MnS at scale (50+ categories, hourly) while maintaining data quality and managing Amazon's aggressive rate limiting.


Why Self-Built Scrapers Break at Scale

Our initial implementation used rotating proxies + Playwright for JavaScript rendering. It worked well for 5-10 categories but degraded rapidly at scale:

Scale Success Rate P95 Latency Monthly Cost
10 categories/hour 94% 8.2s ~$45 (proxy)
50 categories/hour 71% 22.5s ~$180 (proxy)
200 categories/hour 43% 48.1s ~$620 (proxy)

The root causes:

  1. IP reputation decay: Amazon's ML-based bot detection spots residential proxy patterns after sustained usage
  2. Session state management: Price and availability fields require maintaining valid Amazon session cookies
  3. HTML structure drift: MnS pages have A/B test variants; a parser built on one variant breaks on another

Solution: Pangolinfo Scrape API

Pangolinfo Scrape API provides pre-built parsing templates for Amazon page types including MnS. The managed infrastructure handles proxy rotation, session management, and parser maintenance — the output is clean structured JSON regardless of which A/B variant Amazon serves.

API response schema for MnS:

{
  "status": "success",
  "category_id": "1055398",
  "category_name": "Kitchen & Dining",
  "retrieved_at": "2026-04-22T09:00:00Z",
  "items": [
    {
      "rank": 1,
      "asin": "B09XK3K3T3",
      "title": "Electric Milk Frother Handheld",
      "current_rank": 47,
      "previous_rank": 1847,
      "rank_gain_pct": 3826.0,
      "rank_gain_absolute": 1800,
      "price": 24.99,
      "currency": "USD",
      "rating": 4.3,
      "review_count": 128,
      "is_prime": true,
      "badge": "Best Seller",
      "image_url": "https://m.media-amazon.com/images/...",
      "listing_url": "https://www.amazon.com/dp/B09XK3K3T3"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Full Implementation

Async Multi-Category Collector

"""
mns_async_collector.py

Async implementation for high-frequency, multi-category MnS data collection.
Reduces collection time for 50 categories from ~110s (sequential) to ~12s (async).

Requirements: pip install aiohttp python-dotenv
"""

import asyncio
import aiohttp
import os
import json
import logging
from datetime import datetime, timezone
from typing import List, Dict, Optional

logger = logging.getLogger(__name__)

API_KEY = os.environ["PANGOLINFO_API_KEY"]
API_ENDPOINT = "https://api.pangolinfo.com/scrape"

# Semaphore: limit concurrent requests to avoid overwhelming the API
MAX_CONCURRENT = 10


async def fetch_category(
    session: aiohttp.ClientSession,
    sem: asyncio.Semaphore,
    category_id: str,
    locale: str = "us"
) -> Optional[Dict]:
    """Async fetch for a single MnS category."""
    url = (
        f"https://www.amazon.{'co.uk' if locale == 'uk' else 'com'}"
        f"/gp/movers-and-shakers/{category_id}"
    )
    payload = {
        "url": url,
        "parse_type": "movers_shakers",
        "output_format": "json",
        "locale": locale
    }
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }

    async with sem:
        try:
            async with session.post(
                API_ENDPOINT, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=45)
            ) as resp:
                resp.raise_for_status()
                data = await resp.json()
                return {"category_id": category_id, "items": data.get("items", [])}
        except aiohttp.ClientResponseError as e:
            logger.error(f"Category {category_id}: HTTP {e.status}")
        except asyncio.TimeoutError:
            logger.error(f"Category {category_id}: Timeout")
        except Exception as e:
            logger.error(f"Category {category_id}: {e}")
        return None


async def collect_all_categories(
    category_ids: List[str],
    locale: str = "us"
) -> List[Dict]:
    """
    Collect MnS data for all specified categories concurrently.
    Returns list of category result dicts (may contain None for failed categories).
    """
    sem = asyncio.Semaphore(MAX_CONCURRENT)
    connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT)

    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [
            fetch_category(session, sem, cat_id, locale)
            for cat_id in category_ids
        ]
        results = await asyncio.gather(*tasks, return_exceptions=False)

    return [r for r in results if r is not None]


# --- Spike Detection & Scoring ---

def compute_opportunity_score(item: Dict) -> float:
    """
    Composite opportunity score [0-100].
    Higher = stronger gain + lower competition barrier.
    """
    gain = item.get("rank_gain_pct", 0)
    reviews = item.get("review_count") or 9999
    rating = item.get("rating") or 0

    gain_score = min(50.0, gain / 200)

    if reviews < 50:
        comp_score = 30.0
    elif reviews < 200:
        comp_score = 20.0
    elif reviews < 500:
        comp_score = 8.0
    else:
        comp_score = 0.0

    rating_score = max(0.0, (rating - 3.0) / 2.0 * 20) if rating >= 3.0 else 0.0

    return round(gain_score + comp_score + rating_score, 2)


def extract_alerts(
    results: List[Dict],
    gain_threshold: float = 1000.0,
    min_score: float = 40.0
) -> List[Dict]:
    """
    Extract and score alert items across all collected categories.
    Returns sorted list of high-opportunity items.
    """
    alerts = []
    for result in results:
        cat_id = result["category_id"]
        for item in result["items"]:
            if item.get("rank_gain_pct", 0) < gain_threshold:
                continue
            score = compute_opportunity_score(item)
            if score < min_score:
                continue
            alerts.append({
                "score": score,
                "category_id": cat_id,
                "asin": item["asin"],
                "title": item.get("title", "")[:70],
                "gain_pct": item["rank_gain_pct"],
                "current_rank": item.get("current_rank"),
                "review_count": item.get("review_count"),
                "price": item.get("price"),
                "rating": item.get("rating"),
                "listing_url": item.get("listing_url"),
                "detected_at": datetime.now(timezone.utc).isoformat()
            })

    return sorted(alerts, key=lambda x: x["score"], reverse=True)


# --- Main Orchestration ---

WATCH_CATEGORIES = [
    "1055398",   # Kitchen & Dining
    "3375251",   # Sports & Outdoors
    "1063498",   # Home Storage
    "172282",    # Electronics
    "284507",    # Tools & Home Improvement
]

async def run_pipeline(interval_minutes: int = 30):
    print(f"[MnS Pipeline] Monitoring {len(WATCH_CATEGORIES)} categories "
          f"every {interval_minutes} min")

    while True:
        start = datetime.now(timezone.utc)
        print(f"\n[{start.strftime('%Y-%m-%d %H:%M UTC')}] Starting collection cycle...")

        results = await collect_all_categories(WATCH_CATEGORIES)
        total_items = sum(len(r["items"]) for r in results)
        print(f"  Collected: {total_items} items across {len(results)} categories")

        alerts = extract_alerts(results)
        print(f"  Alerts: {len(alerts)} high-opportunity items detected")

        for alert in alerts[:5]:
            print(
                f"  🎯 Score {alert['score']} | ASIN {alert['asin']} | "
                f"+{alert['gain_pct']:.0f}% | BSR #{alert['current_rank']} | "
                f"Reviews: {alert['review_count']} | {alert['title']}"
            )

        # TODO: Integrate downstream:
        # await send_slack_notification(alerts)
        # await write_to_postgres(results)
        # await trigger_erp_reorder_check(alerts)

        elapsed = (datetime.now(timezone.utc) - start).total_seconds()
        sleep_time = max(0, interval_minutes * 60 - elapsed)
        print(f"  Cycle completed in {elapsed:.1f}s. Next run in {sleep_time:.0f}s.")
        await asyncio.sleep(sleep_time)


if __name__ == "__main__":
    asyncio.run(run_pipeline(interval_minutes=30))
Enter fullscreen mode Exit fullscreen mode

Production Deployment Notes

Environment setup:

# .env
PANGOLINFO_API_KEY=your_key_here

# Run with systemd or Docker
docker run -d \
  --env-file .env \
  --restart unless-stopped \
  python:3.11-slim \
  python main.py
Enter fullscreen mode Exit fullscreen mode

Redis deduplication (optional but recommended):

import redis

r = redis.Redis(host="localhost", port=6379, db=0)

def is_new_alert(asin: str, category_id: str, ttl_seconds: int = 7200) -> bool:
    """Returns True if this alert hasn't been seen in the last `ttl_seconds`."""
    key = f"mns_alert:{category_id}:{asin}"
    if r.exists(key):
        return False
    r.setex(key, ttl_seconds, "1")
    return True
Enter fullscreen mode Exit fullscreen mode

Rate limit handling:
The Pangolinfo API handles Amazon-side rate limits internally. The only rate limit to manage is the Pangolinfo API's own request quota, which is generous for standard tiers. For 50 categories × every 30 minutes = 2,400 requests/day — well within free tier limits for initial testing.


References & Further Reading


Found this useful? Drop a ❤️ or comment with questions. Happy to discuss async optimization strategies or downstream integration patterns.

Top comments (0)