Background
I've been building data infrastructure for Amazon seller tools for the past three years. The most consistently requested feature from our seller clients is early trend detection — specifically, getting notified when a product starts showing unusual rank velocity before the broader market notices.
Amazon's Movers and Shakers (MnS) list is the best public data source for this. It tracks the largest BSR gainers in each category over a rolling 24-hour window, updating hourly. A product climbing from rank #100,000 to #2,000 represents a +4,900% gain — invisible in the Best Sellers list, immediately flagged in MnS.
The technical challenge: scraping MnS at scale (50+ categories, hourly) while maintaining data quality and managing Amazon's aggressive rate limiting.
Why Self-Built Scrapers Break at Scale
Our initial implementation used rotating proxies + Playwright for JavaScript rendering. It worked well for 5-10 categories but degraded rapidly at scale:
| Scale | Success Rate | P95 Latency | Monthly Cost |
|---|---|---|---|
| 10 categories/hour | 94% | 8.2s | ~$45 (proxy) |
| 50 categories/hour | 71% | 22.5s | ~$180 (proxy) |
| 200 categories/hour | 43% | 48.1s | ~$620 (proxy) |
The root causes:
- IP reputation decay: Amazon's ML-based bot detection spots residential proxy patterns after sustained usage
- Session state management: Price and availability fields require maintaining valid Amazon session cookies
- HTML structure drift: MnS pages have A/B test variants; a parser built on one variant breaks on another
Solution: Pangolinfo Scrape API
Pangolinfo Scrape API provides pre-built parsing templates for Amazon page types including MnS. The managed infrastructure handles proxy rotation, session management, and parser maintenance — the output is clean structured JSON regardless of which A/B variant Amazon serves.
API response schema for MnS:
{
"status": "success",
"category_id": "1055398",
"category_name": "Kitchen & Dining",
"retrieved_at": "2026-04-22T09:00:00Z",
"items": [
{
"rank": 1,
"asin": "B09XK3K3T3",
"title": "Electric Milk Frother Handheld",
"current_rank": 47,
"previous_rank": 1847,
"rank_gain_pct": 3826.0,
"rank_gain_absolute": 1800,
"price": 24.99,
"currency": "USD",
"rating": 4.3,
"review_count": 128,
"is_prime": true,
"badge": "Best Seller",
"image_url": "https://m.media-amazon.com/images/...",
"listing_url": "https://www.amazon.com/dp/B09XK3K3T3"
}
]
}
Full Implementation
Async Multi-Category Collector
"""
mns_async_collector.py
Async implementation for high-frequency, multi-category MnS data collection.
Reduces collection time for 50 categories from ~110s (sequential) to ~12s (async).
Requirements: pip install aiohttp python-dotenv
"""
import asyncio
import aiohttp
import os
import json
import logging
from datetime import datetime, timezone
from typing import List, Dict, Optional
logger = logging.getLogger(__name__)
API_KEY = os.environ["PANGOLINFO_API_KEY"]
API_ENDPOINT = "https://api.pangolinfo.com/scrape"
# Semaphore: limit concurrent requests to avoid overwhelming the API
MAX_CONCURRENT = 10
async def fetch_category(
session: aiohttp.ClientSession,
sem: asyncio.Semaphore,
category_id: str,
locale: str = "us"
) -> Optional[Dict]:
"""Async fetch for a single MnS category."""
url = (
f"https://www.amazon.{'co.uk' if locale == 'uk' else 'com'}"
f"/gp/movers-and-shakers/{category_id}"
)
payload = {
"url": url,
"parse_type": "movers_shakers",
"output_format": "json",
"locale": locale
}
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
async with sem:
try:
async with session.post(
API_ENDPOINT, json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=45)
) as resp:
resp.raise_for_status()
data = await resp.json()
return {"category_id": category_id, "items": data.get("items", [])}
except aiohttp.ClientResponseError as e:
logger.error(f"Category {category_id}: HTTP {e.status}")
except asyncio.TimeoutError:
logger.error(f"Category {category_id}: Timeout")
except Exception as e:
logger.error(f"Category {category_id}: {e}")
return None
async def collect_all_categories(
category_ids: List[str],
locale: str = "us"
) -> List[Dict]:
"""
Collect MnS data for all specified categories concurrently.
Returns list of category result dicts (may contain None for failed categories).
"""
sem = asyncio.Semaphore(MAX_CONCURRENT)
connector = aiohttp.TCPConnector(limit=MAX_CONCURRENT)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [
fetch_category(session, sem, cat_id, locale)
for cat_id in category_ids
]
results = await asyncio.gather(*tasks, return_exceptions=False)
return [r for r in results if r is not None]
# --- Spike Detection & Scoring ---
def compute_opportunity_score(item: Dict) -> float:
"""
Composite opportunity score [0-100].
Higher = stronger gain + lower competition barrier.
"""
gain = item.get("rank_gain_pct", 0)
reviews = item.get("review_count") or 9999
rating = item.get("rating") or 0
gain_score = min(50.0, gain / 200)
if reviews < 50:
comp_score = 30.0
elif reviews < 200:
comp_score = 20.0
elif reviews < 500:
comp_score = 8.0
else:
comp_score = 0.0
rating_score = max(0.0, (rating - 3.0) / 2.0 * 20) if rating >= 3.0 else 0.0
return round(gain_score + comp_score + rating_score, 2)
def extract_alerts(
results: List[Dict],
gain_threshold: float = 1000.0,
min_score: float = 40.0
) -> List[Dict]:
"""
Extract and score alert items across all collected categories.
Returns sorted list of high-opportunity items.
"""
alerts = []
for result in results:
cat_id = result["category_id"]
for item in result["items"]:
if item.get("rank_gain_pct", 0) < gain_threshold:
continue
score = compute_opportunity_score(item)
if score < min_score:
continue
alerts.append({
"score": score,
"category_id": cat_id,
"asin": item["asin"],
"title": item.get("title", "")[:70],
"gain_pct": item["rank_gain_pct"],
"current_rank": item.get("current_rank"),
"review_count": item.get("review_count"),
"price": item.get("price"),
"rating": item.get("rating"),
"listing_url": item.get("listing_url"),
"detected_at": datetime.now(timezone.utc).isoformat()
})
return sorted(alerts, key=lambda x: x["score"], reverse=True)
# --- Main Orchestration ---
WATCH_CATEGORIES = [
"1055398", # Kitchen & Dining
"3375251", # Sports & Outdoors
"1063498", # Home Storage
"172282", # Electronics
"284507", # Tools & Home Improvement
]
async def run_pipeline(interval_minutes: int = 30):
print(f"[MnS Pipeline] Monitoring {len(WATCH_CATEGORIES)} categories "
f"every {interval_minutes} min")
while True:
start = datetime.now(timezone.utc)
print(f"\n[{start.strftime('%Y-%m-%d %H:%M UTC')}] Starting collection cycle...")
results = await collect_all_categories(WATCH_CATEGORIES)
total_items = sum(len(r["items"]) for r in results)
print(f" Collected: {total_items} items across {len(results)} categories")
alerts = extract_alerts(results)
print(f" Alerts: {len(alerts)} high-opportunity items detected")
for alert in alerts[:5]:
print(
f" 🎯 Score {alert['score']} | ASIN {alert['asin']} | "
f"+{alert['gain_pct']:.0f}% | BSR #{alert['current_rank']} | "
f"Reviews: {alert['review_count']} | {alert['title']}"
)
# TODO: Integrate downstream:
# await send_slack_notification(alerts)
# await write_to_postgres(results)
# await trigger_erp_reorder_check(alerts)
elapsed = (datetime.now(timezone.utc) - start).total_seconds()
sleep_time = max(0, interval_minutes * 60 - elapsed)
print(f" Cycle completed in {elapsed:.1f}s. Next run in {sleep_time:.0f}s.")
await asyncio.sleep(sleep_time)
if __name__ == "__main__":
asyncio.run(run_pipeline(interval_minutes=30))
Production Deployment Notes
Environment setup:
# .env
PANGOLINFO_API_KEY=your_key_here
# Run with systemd or Docker
docker run -d \
--env-file .env \
--restart unless-stopped \
python:3.11-slim \
python main.py
Redis deduplication (optional but recommended):
import redis
r = redis.Redis(host="localhost", port=6379, db=0)
def is_new_alert(asin: str, category_id: str, ttl_seconds: int = 7200) -> bool:
"""Returns True if this alert hasn't been seen in the last `ttl_seconds`."""
key = f"mns_alert:{category_id}:{asin}"
if r.exists(key):
return False
r.setex(key, ttl_seconds, "1")
return True
Rate limit handling:
The Pangolinfo API handles Amazon-side rate limits internally. The only rate limit to manage is the Pangolinfo API's own request quota, which is generous for standard tiers. For 50 categories × every 30 minutes = 2,400 requests/day — well within free tier limits for initial testing.
References & Further Reading
- Pangolinfo Scrape API Documentation — Full field reference for MnS and other Amazon page types
- AMZ Data Tracker — No-code visual dashboard for the same MnS monitoring use case
- Amazon MnS base URL:
https://www.amazon.com/gp/movers-and-shakers/
Found this useful? Drop a ❤️ or comment with questions. Happy to discuss async optimization strategies or downstream integration patterns.
Top comments (0)