DEV Community

Cover image for Amazon Multi-Marketplace Data Analysis: Breaking Down the Silos With a Unified API Pipeline
Mox Loop
Mox Loop

Posted on

Amazon Multi-Marketplace Data Analysis: Breaking Down the Silos With a Unified API Pipeline

TL;DR: Amazon stores data separately for every marketplace. Most tools don't actually solve this. Here's a working Python implementation that concurrently pulls normalized BSR data from US, UK, DE, and JP Amazon using Pangolinfo's Scrape API, with currency normalization and competition density analysis built in.


Tags

#api #python #ecommerce #dataengineering #amazon


The Problem in One Sentence

Every Amazon marketplace is an isolated data silo, and genuine Amazon Multi-Marketplace Data Analysis requires solving this at the collection layer—not the reporting layer.

Background

Running Amazon stores across multiple regions is common. Having actual data pipelines that aggregate, normalize, and compare those stores simultaneously is rare. The reason isn't complexity of analysis—it's complexity of collection:

  • Each marketplace has independent ASIN namespaces
  • Category tree node IDs don't map across marketplaces
  • Anti-bot detection behavior varies by region
  • Price data is in different currencies
  • Most seller tools are single-marketplace by design

This post shows a complete working solution using Pangolinfo's Scrape API, which handles multi-marketplace collection with a unified output schema—meaning the JSON structure from a US request and a JP request is identical, which makes building cross-marketplace pipelines significantly less painful.


What We're Building

A concurrent multi-marketplace BSR collector that:

  1. Calls 4 Amazon marketplaces simultaneously (US, UK, DE, JP)
  2. Returns normalized product data with USD price conversion
  3. Generates a competition density matrix (avg Top 20 review count per market)
  4. Identifies lowest-competition markets as expansion opportunities

Setup

pip install aiohttp python-dotenv
Enter fullscreen mode Exit fullscreen mode
# .env
PANGOLINFO_API_KEY=your_key_here
Enter fullscreen mode Exit fullscreen mode

Pangolinfo API docs: https://docs.pangolinfo.com


Core Implementation

"""
amazon_cross_marketplace.py

Multi-marketplace BSR analysis using Pangolinfo Scrape API.
Collects Top 20 bestsellers from US/UK/DE/JP simultaneously,
normalizes prices to USD, and outputs a competition density report.

Requirements: aiohttp, python-dotenv
API docs: https://docs.pangolinfo.com
"""

import asyncio
import aiohttp
import json
import os
import sys
from datetime import datetime, timezone
from dataclasses import dataclass, field, asdict
from statistics import mean, median
from typing import Optional
from dotenv import load_dotenv

load_dotenv()

# ─── Config ────────────────────────────────────────────────────────────────────

API_KEY: str = os.getenv("PANGOLINFO_API_KEY", "")
if not API_KEY:
    print("[ERROR] PANGOLINFO_API_KEY not set. Add to .env or environment.", file=sys.stderr)
    sys.exit(1)

HEADERS: dict = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

# FX rates: local currency → USD (update from live FX API in production)
MARKETPLACE_CONFIG: dict = {
    "US": {"domain": "amazon.com",     "currency": "USD", "fx_to_usd": 1.000},
    "UK": {"domain": "amazon.co.uk",   "currency": "GBP", "fx_to_usd": 1.270},
    "DE": {"domain": "amazon.de",      "currency": "EUR", "fx_to_usd": 1.082},
    "JP": {"domain": "amazon.co.jp",   "currency": "JPY", "fx_to_usd": 0.0067},
    # Add more markets as needed:
    # "CA": {"domain": "amazon.ca",   "currency": "CAD", "fx_to_usd": 0.730},
    # "AU": {"domain": "amazon.com.au","currency": "AUD", "fx_to_usd": 0.630},
}

# Category nodes for "Kitchen & Dining" (replace with your target category)
CATEGORY_NODES: dict = {
    "US": "289913",
    "UK": "11052591031",
    "DE": "3167641",
    "JP": "2277721051",
}

# Global concurrency limiter: prevents overwhelming the API endpoint
CONCURRENCY_LIMIT = asyncio.Semaphore(4)


# ─── Data Models ───────────────────────────────────────────────────────────────

@dataclass
class ProductSnapshot:
    marketplace: str
    asin: str
    title: str
    bsr_rank: int
    price_local: float
    currency: str
    price_usd: float
    rating: float
    review_count: int
    fetched_at: str


@dataclass
class MarketplaceResult:
    marketplace: str
    success: bool
    products: list[ProductSnapshot] = field(default_factory=list)
    error: Optional[str] = None
    latency_ms: int = 0


# ─── Collection Layer ───────────────────────────────────────────────────────────

async def fetch_bsr(
    session: aiohttp.ClientSession,
    marketplace: str,
    category_node: str,
    max_retries: int = 3
) -> MarketplaceResult:
    """
    Async BSR collection for a single marketplace.

    Handles:
    - Rate limiting (429) with exponential backoff
    - Timeout errors with retry
    - Response normalization to consistent schema
    """
    config = MARKETPLACE_CONFIG[marketplace]
    start_time = datetime.now(timezone.utc)

    payload = {
        "marketplace": config["domain"],
        "type": "bestsellers",
        "category_id": category_node,
        "limit": 20
    }

    for attempt in range(max_retries):
        try:
            async with CONCURRENCY_LIMIT:
                async with session.post(
                    "https://api.pangolinfo.com/v1/amazon/category/bestsellers",
                    headers=HEADERS,
                    json=payload,
                    timeout=aiohttp.ClientTimeout(total=30)
                ) as resp:

                    if resp.status == 429:
                        wait = (2 ** attempt) + 1
                        print(f"  [{marketplace}] Rate limited, retrying in {wait}s...")
                        await asyncio.sleep(wait)
                        continue

                    resp.raise_for_status()
                    data = await resp.json()

        except aiohttp.ClientResponseError as e:
            if attempt < max_retries - 1:
                await asyncio.sleep(2 ** attempt)
                continue
            latency = _calc_latency(start_time)
            return MarketplaceResult(marketplace, False, error=f"HTTP {e.status}: {e.message}", latency_ms=latency)

        except asyncio.TimeoutError:
            if attempt < max_retries - 1:
                await asyncio.sleep(2 ** attempt)
                continue
            latency = _calc_latency(start_time)
            return MarketplaceResult(marketplace, False, error="Request timed out", latency_ms=latency)

        except Exception as e:
            latency = _calc_latency(start_time)
            return MarketplaceResult(marketplace, False, error=str(e), latency_ms=latency)

        # Success — normalize products
        fx = config["fx_to_usd"]
        currency = config["currency"]
        ts = datetime.now(timezone.utc).isoformat()

        products = [
            ProductSnapshot(
                marketplace=marketplace,
                asin=p.get("asin", ""),
                title=(p.get("title") or "")[:120],
                bsr_rank=int(p.get("rank") or 0),
                price_local=float(p.get("price") or 0),
                currency=currency,
                price_usd=round(float(p.get("price") or 0) * fx, 2),
                rating=float(p.get("rating") or 0),
                review_count=int(p.get("review_count") or 0),
                fetched_at=ts
            )
            for p in data.get("products", [])
        ]

        latency = _calc_latency(start_time)
        return MarketplaceResult(marketplace, True, products=products, latency_ms=latency)

    return MarketplaceResult(marketplace, False, error="Max retries reached")


def _calc_latency(start: datetime) -> int:
    return int((datetime.now(timezone.utc) - start).total_seconds() * 1000)


async def collect_all_markets(category_map: dict) -> list[MarketplaceResult]:
    """Run all marketplace collections concurrently."""
    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_bsr(session, market, cat_id)
            for market, cat_id in category_map.items()
            if market in MARKETPLACE_CONFIG
        ]
        return await asyncio.gather(*tasks)


# ─── Analysis Layer ─────────────────────────────────────────────────────────────

def compute_competition_metrics(products: list[ProductSnapshot]) -> dict:
    """Compute review-based competition density metrics for a market."""
    if not products:
        return {}

    reviews = [p.review_count for p in products]
    prices = [p.price_usd for p in products if p.price_usd > 0]

    avg_rev = mean(reviews)

    if avg_rev > 1500:
        barrier = "🔴 HIGH — significant review moat"
    elif avg_rev > 400:
        barrier = "🟡 MEDIUM — viable with strong launch strategy"
    else:
        barrier = "🟢 LOW — early-mover opportunity"

    return {
        "product_count": len(products),
        "avg_review_count": round(avg_rev),
        "median_review_count": round(median(reviews)),
        "min_review_count": min(reviews),
        "max_review_count": max(reviews),
        "avg_price_usd": round(mean(prices), 2) if prices else 0,
        "barrier_assessment": barrier,
        # Opportunity score: higher is better
        "opportunity_score": max(0, min(100, 100 - round(avg_rev / 20)))
    }


def print_analysis_report(results: list[MarketplaceResult]) -> None:
    """Pretty-print cross-marketplace competition analysis."""

    market_metrics = []
    for r in results:
        if r.success and r.products:
            metrics = compute_competition_metrics(r.products)
            market_metrics.append((r.marketplace, metrics, r.latency_ms))

    # Sort by opportunity score (highest first)
    market_metrics.sort(key=lambda x: x[1].get("opportunity_score", 0), reverse=True)

    print("\n" + "" * 65)
    print("   Amazon Multi-Marketplace Competition Analysis")
    print("" * 65)

    for i, (market, metrics, latency_ms) in enumerate(market_metrics, 1):
        rank_emoji = "🏆" if i == 1 else f"  #{i}"
        print(f"\n{rank_emoji} {market} Marketplace  (fetched in {latency_ms}ms)")
        print(f"     Top 20 avg reviews:    {metrics['avg_review_count']:,}")
        print(f"     Top 20 median reviews: {metrics['median_review_count']:,}")
        print(f"     Review range:          {metrics['min_review_count']:,}{metrics['max_review_count']:,}")
        print(f"     Avg price (USD):       ${metrics['avg_price_usd']:.2f}")
        print(f"     Opportunity score:     {metrics['opportunity_score']}/100")
        print(f"     Barrier assessment:    {metrics['barrier_assessment']}")

    # Report failures
    failed = [r for r in results if not r.success]
    if failed:
        print(f"\n  ⚠ Failed markets: {', '.join(r.marketplace for r in failed)}")
        for r in failed:
            print(f"    {r.marketplace}: {r.error}")

    print("\n" + "" * 65)


# ─── Entry Point ─────────────────────────────────────────────────────────────────

async def main():
    print("Collecting BSR data from all target marketplaces...")

    results = await collect_all_markets(CATEGORY_NODES)

    # Summary stats
    success = sum(1 for r in results if r.success)
    total_products = sum(len(r.products) for r in results)
    print(f"Collection complete: {success}/{len(results)} markets, {total_products} products total")

    # Analysis report
    print_analysis_report(results)

    # Persist full data as JSON
    output = {
        "collected_at": datetime.now(timezone.utc).isoformat(),
        "markets": {
            r.marketplace: {
                "success": r.success,
                "product_count": len(r.products),
                "products": [asdict(p) for p in r.products],
                "error": r.error
            }
            for r in results
        }
    }

    filename = f"bsr_snapshot_{datetime.now().strftime('%Y%m%d_%H%M')}.json"
    with open(filename, "w", encoding="utf-8") as f:
        json.dump(output, f, ensure_ascii=False, indent=2)
    print(f"\nFull dataset saved to: {filename}")


if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Sample Output

Collecting BSR data from all target marketplaces...
Collection complete: 4/4 markets, 80 products total

═════════════════════════════════════════════════════════════════
   Amazon Multi-Marketplace Competition Analysis
═════════════════════════════════════════════════════════════════

🏆 DE Marketplace  (fetched in 847ms)
     Top 20 avg reviews:    312
     Top 20 median reviews: 248
     Review range:          45 – 1,240
     Avg price (USD):       $38.42
     Opportunity score:     84/100
     Barrier assessment:    🟢 LOW — early-mover opportunity

  #2 JP Marketplace  (fetched in 923ms)
     Top 20 avg reviews:    486
     Top 20 median reviews: 390
     Review range:          120 – 2,100
     Avg price (USD):       $31.18
     Opportunity score:     76/100
     Barrier assessment:    🟡 MEDIUM — viable with strong launch strategy

  #3 UK Marketplace  (fetched in 761ms)
     Top 20 avg reviews:    1,240
     Top 20 median reviews: 980
     Review range:          380 – 4,200
     Avg price (USD):       $42.80
     Opportunity score:     38/100
     Barrier assessment:    🟡 MEDIUM — viable with strong launch strategy

  #4 US Marketplace  (fetched in 694ms)
     Top 20 avg reviews:    3,840
     Top 20 median reviews: 3,100
     Review range:          890 – 9,200
     Avg price (USD):       $36.95
     Opportunity score:     0/100
     Barrier assessment:    🔴 HIGH — significant review moat

═════════════════════════════════════════════════════════════════

Full dataset saved to: bsr_snapshot_20260224_0923.json
Enter fullscreen mode Exit fullscreen mode

In this example, DE and JP show dramatically lower competition density than the US for the same product category—information that's completely invisible when analyzing markets individually.


Extending This

Add more marketplaces: The MARKETPLACE_CONFIG and CATEGORY_NODES dicts are the only things that need updating. The rest of the pipeline is marketplace-agnostic.

Add product detail collection: Change "type": "bestsellers" to "type": "product_detail" and pass "asin": "B08XY12345" to track specific competitor ASINs across markets.

Connect to a database: Replace the JSON file write with SQLAlchemy inserts into PostgreSQL (or TimescaleDB for time-series BSR tracking).

Add to a scheduler: Wrap main() in an APScheduler periodic job for continuous monitoring:

from apscheduler.schedulers.asyncio import AsyncIOScheduler

scheduler = AsyncIOScheduler()
scheduler.add_job(lambda: asyncio.run(main()), "interval", hours=4)
scheduler.start()
Enter fullscreen mode Exit fullscreen mode

Resources


If this was useful, drop a ❤️. Working on a follow-up covering competitor ASIN tracking across markets and automated price elasticity analysis—follow to catch that one.

Top comments (0)