TL;DR: Amazon stores data separately for every marketplace. Most tools don't actually solve this. Here's a working Python implementation that concurrently pulls normalized BSR data from US, UK, DE, and JP Amazon using Pangolinfo's Scrape API, with currency normalization and competition density analysis built in.
Tags
#api #python #ecommerce #dataengineering #amazon
The Problem in One Sentence
Every Amazon marketplace is an isolated data silo, and genuine Amazon Multi-Marketplace Data Analysis requires solving this at the collection layer—not the reporting layer.
Background
Running Amazon stores across multiple regions is common. Having actual data pipelines that aggregate, normalize, and compare those stores simultaneously is rare. The reason isn't complexity of analysis—it's complexity of collection:
- Each marketplace has independent ASIN namespaces
- Category tree node IDs don't map across marketplaces
- Anti-bot detection behavior varies by region
- Price data is in different currencies
- Most seller tools are single-marketplace by design
This post shows a complete working solution using Pangolinfo's Scrape API, which handles multi-marketplace collection with a unified output schema—meaning the JSON structure from a US request and a JP request is identical, which makes building cross-marketplace pipelines significantly less painful.
What We're Building
A concurrent multi-marketplace BSR collector that:
- Calls 4 Amazon marketplaces simultaneously (US, UK, DE, JP)
- Returns normalized product data with USD price conversion
- Generates a competition density matrix (avg Top 20 review count per market)
- Identifies lowest-competition markets as expansion opportunities
Setup
pip install aiohttp python-dotenv
# .env
PANGOLINFO_API_KEY=your_key_here
Pangolinfo API docs: https://docs.pangolinfo.com
Core Implementation
"""
amazon_cross_marketplace.py
Multi-marketplace BSR analysis using Pangolinfo Scrape API.
Collects Top 20 bestsellers from US/UK/DE/JP simultaneously,
normalizes prices to USD, and outputs a competition density report.
Requirements: aiohttp, python-dotenv
API docs: https://docs.pangolinfo.com
"""
import asyncio
import aiohttp
import json
import os
import sys
from datetime import datetime, timezone
from dataclasses import dataclass, field, asdict
from statistics import mean, median
from typing import Optional
from dotenv import load_dotenv
load_dotenv()
# ─── Config ────────────────────────────────────────────────────────────────────
API_KEY: str = os.getenv("PANGOLINFO_API_KEY", "")
if not API_KEY:
print("[ERROR] PANGOLINFO_API_KEY not set. Add to .env or environment.", file=sys.stderr)
sys.exit(1)
HEADERS: dict = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}
# FX rates: local currency → USD (update from live FX API in production)
MARKETPLACE_CONFIG: dict = {
"US": {"domain": "amazon.com", "currency": "USD", "fx_to_usd": 1.000},
"UK": {"domain": "amazon.co.uk", "currency": "GBP", "fx_to_usd": 1.270},
"DE": {"domain": "amazon.de", "currency": "EUR", "fx_to_usd": 1.082},
"JP": {"domain": "amazon.co.jp", "currency": "JPY", "fx_to_usd": 0.0067},
# Add more markets as needed:
# "CA": {"domain": "amazon.ca", "currency": "CAD", "fx_to_usd": 0.730},
# "AU": {"domain": "amazon.com.au","currency": "AUD", "fx_to_usd": 0.630},
}
# Category nodes for "Kitchen & Dining" (replace with your target category)
CATEGORY_NODES: dict = {
"US": "289913",
"UK": "11052591031",
"DE": "3167641",
"JP": "2277721051",
}
# Global concurrency limiter: prevents overwhelming the API endpoint
CONCURRENCY_LIMIT = asyncio.Semaphore(4)
# ─── Data Models ───────────────────────────────────────────────────────────────
@dataclass
class ProductSnapshot:
marketplace: str
asin: str
title: str
bsr_rank: int
price_local: float
currency: str
price_usd: float
rating: float
review_count: int
fetched_at: str
@dataclass
class MarketplaceResult:
marketplace: str
success: bool
products: list[ProductSnapshot] = field(default_factory=list)
error: Optional[str] = None
latency_ms: int = 0
# ─── Collection Layer ───────────────────────────────────────────────────────────
async def fetch_bsr(
session: aiohttp.ClientSession,
marketplace: str,
category_node: str,
max_retries: int = 3
) -> MarketplaceResult:
"""
Async BSR collection for a single marketplace.
Handles:
- Rate limiting (429) with exponential backoff
- Timeout errors with retry
- Response normalization to consistent schema
"""
config = MARKETPLACE_CONFIG[marketplace]
start_time = datetime.now(timezone.utc)
payload = {
"marketplace": config["domain"],
"type": "bestsellers",
"category_id": category_node,
"limit": 20
}
for attempt in range(max_retries):
try:
async with CONCURRENCY_LIMIT:
async with session.post(
"https://api.pangolinfo.com/v1/amazon/category/bestsellers",
headers=HEADERS,
json=payload,
timeout=aiohttp.ClientTimeout(total=30)
) as resp:
if resp.status == 429:
wait = (2 ** attempt) + 1
print(f" [{marketplace}] Rate limited, retrying in {wait}s...")
await asyncio.sleep(wait)
continue
resp.raise_for_status()
data = await resp.json()
except aiohttp.ClientResponseError as e:
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
latency = _calc_latency(start_time)
return MarketplaceResult(marketplace, False, error=f"HTTP {e.status}: {e.message}", latency_ms=latency)
except asyncio.TimeoutError:
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
latency = _calc_latency(start_time)
return MarketplaceResult(marketplace, False, error="Request timed out", latency_ms=latency)
except Exception as e:
latency = _calc_latency(start_time)
return MarketplaceResult(marketplace, False, error=str(e), latency_ms=latency)
# Success — normalize products
fx = config["fx_to_usd"]
currency = config["currency"]
ts = datetime.now(timezone.utc).isoformat()
products = [
ProductSnapshot(
marketplace=marketplace,
asin=p.get("asin", ""),
title=(p.get("title") or "")[:120],
bsr_rank=int(p.get("rank") or 0),
price_local=float(p.get("price") or 0),
currency=currency,
price_usd=round(float(p.get("price") or 0) * fx, 2),
rating=float(p.get("rating") or 0),
review_count=int(p.get("review_count") or 0),
fetched_at=ts
)
for p in data.get("products", [])
]
latency = _calc_latency(start_time)
return MarketplaceResult(marketplace, True, products=products, latency_ms=latency)
return MarketplaceResult(marketplace, False, error="Max retries reached")
def _calc_latency(start: datetime) -> int:
return int((datetime.now(timezone.utc) - start).total_seconds() * 1000)
async def collect_all_markets(category_map: dict) -> list[MarketplaceResult]:
"""Run all marketplace collections concurrently."""
async with aiohttp.ClientSession() as session:
tasks = [
fetch_bsr(session, market, cat_id)
for market, cat_id in category_map.items()
if market in MARKETPLACE_CONFIG
]
return await asyncio.gather(*tasks)
# ─── Analysis Layer ─────────────────────────────────────────────────────────────
def compute_competition_metrics(products: list[ProductSnapshot]) -> dict:
"""Compute review-based competition density metrics for a market."""
if not products:
return {}
reviews = [p.review_count for p in products]
prices = [p.price_usd for p in products if p.price_usd > 0]
avg_rev = mean(reviews)
if avg_rev > 1500:
barrier = "🔴 HIGH — significant review moat"
elif avg_rev > 400:
barrier = "🟡 MEDIUM — viable with strong launch strategy"
else:
barrier = "🟢 LOW — early-mover opportunity"
return {
"product_count": len(products),
"avg_review_count": round(avg_rev),
"median_review_count": round(median(reviews)),
"min_review_count": min(reviews),
"max_review_count": max(reviews),
"avg_price_usd": round(mean(prices), 2) if prices else 0,
"barrier_assessment": barrier,
# Opportunity score: higher is better
"opportunity_score": max(0, min(100, 100 - round(avg_rev / 20)))
}
def print_analysis_report(results: list[MarketplaceResult]) -> None:
"""Pretty-print cross-marketplace competition analysis."""
market_metrics = []
for r in results:
if r.success and r.products:
metrics = compute_competition_metrics(r.products)
market_metrics.append((r.marketplace, metrics, r.latency_ms))
# Sort by opportunity score (highest first)
market_metrics.sort(key=lambda x: x[1].get("opportunity_score", 0), reverse=True)
print("\n" + "═" * 65)
print(" Amazon Multi-Marketplace Competition Analysis")
print("═" * 65)
for i, (market, metrics, latency_ms) in enumerate(market_metrics, 1):
rank_emoji = "🏆" if i == 1 else f" #{i}"
print(f"\n{rank_emoji} {market} Marketplace (fetched in {latency_ms}ms)")
print(f" Top 20 avg reviews: {metrics['avg_review_count']:,}")
print(f" Top 20 median reviews: {metrics['median_review_count']:,}")
print(f" Review range: {metrics['min_review_count']:,} – {metrics['max_review_count']:,}")
print(f" Avg price (USD): ${metrics['avg_price_usd']:.2f}")
print(f" Opportunity score: {metrics['opportunity_score']}/100")
print(f" Barrier assessment: {metrics['barrier_assessment']}")
# Report failures
failed = [r for r in results if not r.success]
if failed:
print(f"\n ⚠ Failed markets: {', '.join(r.marketplace for r in failed)}")
for r in failed:
print(f" {r.marketplace}: {r.error}")
print("\n" + "═" * 65)
# ─── Entry Point ─────────────────────────────────────────────────────────────────
async def main():
print("Collecting BSR data from all target marketplaces...")
results = await collect_all_markets(CATEGORY_NODES)
# Summary stats
success = sum(1 for r in results if r.success)
total_products = sum(len(r.products) for r in results)
print(f"Collection complete: {success}/{len(results)} markets, {total_products} products total")
# Analysis report
print_analysis_report(results)
# Persist full data as JSON
output = {
"collected_at": datetime.now(timezone.utc).isoformat(),
"markets": {
r.marketplace: {
"success": r.success,
"product_count": len(r.products),
"products": [asdict(p) for p in r.products],
"error": r.error
}
for r in results
}
}
filename = f"bsr_snapshot_{datetime.now().strftime('%Y%m%d_%H%M')}.json"
with open(filename, "w", encoding="utf-8") as f:
json.dump(output, f, ensure_ascii=False, indent=2)
print(f"\nFull dataset saved to: {filename}")
if __name__ == "__main__":
asyncio.run(main())
Sample Output
Collecting BSR data from all target marketplaces...
Collection complete: 4/4 markets, 80 products total
═════════════════════════════════════════════════════════════════
Amazon Multi-Marketplace Competition Analysis
═════════════════════════════════════════════════════════════════
🏆 DE Marketplace (fetched in 847ms)
Top 20 avg reviews: 312
Top 20 median reviews: 248
Review range: 45 – 1,240
Avg price (USD): $38.42
Opportunity score: 84/100
Barrier assessment: 🟢 LOW — early-mover opportunity
#2 JP Marketplace (fetched in 923ms)
Top 20 avg reviews: 486
Top 20 median reviews: 390
Review range: 120 – 2,100
Avg price (USD): $31.18
Opportunity score: 76/100
Barrier assessment: 🟡 MEDIUM — viable with strong launch strategy
#3 UK Marketplace (fetched in 761ms)
Top 20 avg reviews: 1,240
Top 20 median reviews: 980
Review range: 380 – 4,200
Avg price (USD): $42.80
Opportunity score: 38/100
Barrier assessment: 🟡 MEDIUM — viable with strong launch strategy
#4 US Marketplace (fetched in 694ms)
Top 20 avg reviews: 3,840
Top 20 median reviews: 3,100
Review range: 890 – 9,200
Avg price (USD): $36.95
Opportunity score: 0/100
Barrier assessment: 🔴 HIGH — significant review moat
═════════════════════════════════════════════════════════════════
Full dataset saved to: bsr_snapshot_20260224_0923.json
In this example, DE and JP show dramatically lower competition density than the US for the same product category—information that's completely invisible when analyzing markets individually.
Extending This
Add more marketplaces: The MARKETPLACE_CONFIG and CATEGORY_NODES dicts are the only things that need updating. The rest of the pipeline is marketplace-agnostic.
Add product detail collection: Change "type": "bestsellers" to "type": "product_detail" and pass "asin": "B08XY12345" to track specific competitor ASINs across markets.
Connect to a database: Replace the JSON file write with SQLAlchemy inserts into PostgreSQL (or TimescaleDB for time-series BSR tracking).
Add to a scheduler: Wrap main() in an APScheduler periodic job for continuous monitoring:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
scheduler.add_job(lambda: asyncio.run(main()), "interval", hours=4)
scheduler.start()
Resources
- Pangolinfo Scrape API docs
- AMZ Data Tracker (no-code alternative)
- Free trial console
If this was useful, drop a ❤️. Working on a follow-up covering competitor ASIN tracking across markets and automated price elasticity analysis—follow to catch that one.
Top comments (0)