DEV Community

Cover image for Amazon Niche Research Data Analysis: Building a Six-Metric Pipeline with Python and Pangolinfo Scrape API
Mox Loop
Mox Loop

Posted on

Amazon Niche Research Data Analysis: Building a Six-Metric Pipeline with Python and Pangolinfo Scrape API

TL;DR — Off-the-shelf Amazon research tools hide their decision logic behind a single "opportunity score." This post walks through building your own six-metric niche research pipeline in Python, using ASIN-level data from Pangolinfo Scrape API. Code is production-ready and runs async for cost efficiency.

Why this exists

I've worked on data tooling for several Amazon-native brands. The recurring frustration was always the same — the team would buy a SaaS tool, get an opportunity score for a category, and have no way to interrogate why the score was what it was. When the inevitable wrong sourcing call happened, there was no decision log to learn from.

The fix is to drop one layer below the SaaS tools, pull ASIN-level data directly, and build your own decision framework in code. Amazon niche research data analysis done at this layer is reproducible, transparent, and version-controllable. Once the pipeline exists, every subsequent category analysis costs near-zero engineering time.

The framework

Six independent metrics, each tied to a specific data source. Cross-reference rather than average them.

Metric Question it answers Data source
BSR concentration (CR10) Is the market locked? Category bestsellers
Review barrier (P25) How many reviews to stand up? Category bestsellers
New-listing velocity Does the algorithm welcome newcomers? Weekly category snapshots
Price-band coverage Is the target price crowded? Product detail
Sponsored placement density How much organic traffic is left? SERP with SP detection
Negative-review clustering What's the unmet need? Reviews + NLP

Picking the data source

I tested several Amazon data APIs. The key constraints are:

  1. Full-category ASIN coverage — not just visible top 100
  2. Reliable Sponsored placement detection — most generic scrapers miss 20–40% of SP slots
  3. Complete review text returned — including full Customer Says keyword set, not truncated top 6

Pangolinfo Scrape API hit all three (98%+ SP detection rate is best-in-class), so the rest of this post uses it. The patterns transfer to any equivalent API.

Architecture

┌──────────────────────────────────────────────┐
│ Pangolinfo Scrape API                        │
│  /amazon/category/bestsellers                │
│  /amazon/serp                                │
│  /amazon/reviews                             │
└─────────────────┬────────────────────────────┘
                  │ JSON
                  ▼
┌──────────────────────────────────────────────┐
│ async data fetcher (aiohttp)                 │
│  with semaphore concurrency control          │
└─────────────────┬────────────────────────────┘
                  │
                  ▼
┌──────────────────────────────────────────────┐
│ pandas/numpy metric calculators              │
│  cr10, p25, velocity, price-band, sp density │
└─────────────────┬────────────────────────────┘
                  │
                  ▼
┌──────────────────────────────────────────────┐
│ NLP clustering (jieba/yake → BERT in prod)   │
└─────────────────┬────────────────────────────┘
                  │
                  ▼
┌──────────────────────────────────────────────┐
│ NicheReport dataclass + opportunity scoring  │
└──────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Code — async client

import asyncio
import aiohttp
import pandas as pd
import numpy as np
from dataclasses import dataclass
from typing import List, Dict, Optional

@dataclass
class NicheReport:
    node_id: str
    cr10: float
    review_p25: float
    price_band_competitors: int
    breakthrough_price: Optional[float]
    sp_density: float
    pain_points: List[tuple]
    opportunity_score: int

class PangolinAsyncClient:
    """
    Pangolinfo Amazon Scrape API async wrapper (aiohttp + semaphore).
    Targets the synchronous v1/scrape endpoint. Business type is
    selected via the parserName field — switch to /api/v1/scrape/async
    only when you scrape millions of pages per day.
    Full docs: https://docs.pangolinfo.com/en-api-reference/universalApi/universalApi

    parserName cheatsheet (the `content` field varies by parser):
      amzProductDetail      → ASIN (reviews are nested in this response)
      amzKeyword            → search keyword (SERP, includes SP placements)
      amzProductOfCategory  → category Browse Node ID
      amzBestSellers        → category keyword for best-sellers chart
      amzNewReleases        → category keyword for new-releases chart
      amzProductOfSeller    → seller/store ID
    """
    BASE_URL = "https://scrapeapi.pangolinfo.com/api/v1/scrape"
    SITE_MAP = {"US": "www.amazon.com", "DE": "www.amazon.de",
                "UK": "www.amazon.co.uk", "JP": "www.amazon.co.jp"}

    def __init__(self, api_key: str, concurrency: int = 8):
        self.api_key = api_key
        self.semaphore = asyncio.Semaphore(concurrency)

    async def _post(self, session, parser_name, content, marketplace="US", zipcode="10041"):
        payload = {
            "parserName": parser_name,
            "content": content,
            "site": self.SITE_MAP[marketplace],
            "format": "json",
            "bizContext": {"zipcode": zipcode},
        }
        async with self.semaphore:
            headers = {
                "Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json",
            }
            async with session.post(self.BASE_URL, json=payload, headers=headers) as resp:
                resp.raise_for_status()
                data = await resp.json()
                return data["data"]["json"]

    async def fetch_category_listings(self, session, node_id):
        return await self._post(session, "amzProductOfCategory", node_id)

    async def fetch_keyword_serp(self, session, keyword):
        return await self._post(session, "amzKeyword", keyword)

    async def fetch_product_detail(self, session, asin):
        """Returns the full product detail. The `reviews` array is nested in the
        response — filter client-side by `star` to get negative reviews only."""
        return await self._post(session, "amzProductDetail", asin)
Enter fullscreen mode Exit fullscreen mode

Code — metric computation

def compute_cr10(top100):
    df = pd.DataFrame(top100).sort_values("estimated_monthly_sales", ascending=False)
    top10 = df.head(10)["estimated_monthly_sales"].sum()
    total = df["estimated_monthly_sales"].sum()
    return top10 / total if total else 0.0

def compute_review_p25(top50):
    reviews = [item["review_count"] for item in top50 if item.get("review_count")]
    return float(np.percentile(reviews, 25)) if reviews else 0.0

def compute_price_band(top200, target_price, band=5):
    df = pd.DataFrame(top200)
    in_band = df[(df["price"] >= target_price - band) & (df["price"] <= target_price + band)]
    low_review_winners = df[(df["review_count"] < 500)].head(100)
    breakthrough = float(low_review_winners["price"].median()) if len(low_review_winners) else None
    return len(in_band), breakthrough

def compute_sp_density(serp):
    top48 = serp[:48]
    sp = sum(1 for item in top48 if item.get("is_sponsored"))
    return sp / len(top48) if top48 else 0.0

def cluster_pain_points(reviews_text, top_k=5):
    """MVP: keyword extraction. Production: swap for BERT/sentence-transformers."""
    import yake
    extractor = yake.KeywordExtractor(top=top_k, n=2)
    text = " ".join(reviews_text)
    return extractor.extract_keywords(text)
Enter fullscreen mode Exit fullscreen mode

Code — the orchestration

async def evaluate_niche(client, node_id, target_price, head_keyword):
    async with aiohttp.ClientSession() as session:
        listings, serp = await asyncio.gather(
            client.fetch_category_listings(session, node_id),
            client.fetch_keyword_serp(session, head_keyword),
        )
        top200 = listings[:200]

        # Reviews are nested inside amzProductDetail responses
        detail_tasks = [client.fetch_product_detail(session, item["asin"]) for item in top200[:30]]
        all_details = await asyncio.gather(*detail_tasks)

        flat_reviews = []
        for detail in all_details:
            results = detail[0].get("data", {}).get("results", [{}])[0]
            for review in results.get("reviews", []):
                star_value = float(review.get("star", "5").split()[0])  # "4.0 out of 5 stars"
                if star_value <= 3:
                    flat_reviews.append(review.get("content", ""))

    cr10 = compute_cr10(top200[:100])
    review_p25 = compute_review_p25(top200[:50])
    band_count, breakthrough = compute_price_band(top200, target_price)
    sp_density = compute_sp_density(serp)
    pain_points = cluster_pain_points(flat_reviews)
    score = score_opportunity(cr10, review_p25, sp_density, band_count)

    return NicheReport(
        node_id=node_id,
        cr10=cr10,
        review_p25=review_p25,
        price_band_competitors=band_count,
        breakthrough_price=breakthrough,
        sp_density=sp_density,
        pain_points=pain_points,
        opportunity_score=score,
    )

def score_opportunity(cr10, review_p25, sp_density, band_count):
    score = 100
    score -= 30 if cr10 > 0.70 else (10 if cr10 > 0.55 else 0)
    score -= 25 if review_p25 > 1500 else (10 if review_p25 > 800 else 0)
    score -= 20 if sp_density > 0.45 else (10 if sp_density > 0.35 else 0)
    score -= 15 if band_count > 30 else 0
    return max(0, score)
Enter fullscreen mode Exit fullscreen mode

Running it

import asyncio

async def main():
    client = PangolinAsyncClient(api_key="your_key", concurrency=8)
    report = await evaluate_niche(
        client,
        node_id="2251606011",   # Coffee Filters
        target_price=18.99,
        head_keyword="coffee filters",
    )
    print(f"Opportunity score: {report.opportunity_score}/100")
    print(f"CR10: {report.cr10:.1%}")
    print(f"Review barrier (P25): {report.review_p25:.0f}")
    print(f"Competitors in target price band: {report.price_band_competitors}")
    print(f"Breakthrough price anchor: ${report.breakthrough_price}")
    print(f"SP density: {report.sp_density:.1%}")
    print(f"Top pain points: {report.pain_points}")

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Sample output for a representative category:

Opportunity score: 65/100
CR10: 48.3%
Review barrier (P25): 412
Competitors in target price band: 14
Breakthrough price anchor: $16.50
SP density: 31.2%
Top pain points: [('bleach taste', 0.041), ('cup size fit', 0.038), ('packaging fragile', 0.029), ...]
Enter fullscreen mode Exit fullscreen mode

Production hardening

A few things to add before this hits production:

Caching — BSR doesn't need minute-level refresh. Redis-cache the bestseller responses with a 24-hour TTL.

import redis, json
r = redis.Redis()

async def fetch_category_cached(client, session, node_id, ttl=86400):
    key = f"cat:{node_id}:{pd.Timestamp.now().date()}"
    cached = r.get(key)
    if cached:
        return json.loads(cached)
    data = await client.fetch_category_listings(session, node_id)
    r.setex(key, ttl, json.dumps(data))
    return data
Enter fullscreen mode Exit fullscreen mode

Rate limiting — token bucket on the client side to stay under API quota:

from aiolimiter import AsyncLimiter
limiter = AsyncLimiter(max_rate=10, time_period=1)  # 10 req/s

async def rate_limited_get(...):
    async with limiter:
        return await client._get(...)
Enter fullscreen mode Exit fullscreen mode

NLP upgrade pathyake or jieba is fine for MVP but misses semantic clusters ("hard to clean" and "won't come off in dishwasher" should cluster together). For production, swap in sentence-transformers with HDBSCAN clustering, or call an LLM for thematic summarization.

New-listing velocity — needs historical snapshots, which means scheduling a weekly job that diffs current top-200 ASINs against a stored ledger. Use Celery + Redis for the scheduling.

Why this beats SaaS tools

  • Cross-metric filters work natively. "ASINs with under 500 reviews that still rank in top 100" is one DataFrame query, not a CSV export.
  • Refresh frequency under your control. Run it hourly during a launch monitoring window if you need to.
  • Decision logic is in code. Future-you (or your team) can always answer "why did we score this category at 65?"
  • Marginal cost per category approaches zero. Once the pipeline runs, scanning 100 candidates per week costs only API calls and compute.

If you don't want to self-build, AMZ Data Tracker ships the same six-metric framework as a hosted dashboard with daily refresh and built-in NLP. The trade-off is less customization but a 30-minute setup instead of 4–6 weeks.

Closing

The compounding payoff of this kind of pipeline is what makes it worth building. Every category you evaluate is logged, every decision is reproducible, every threshold can be tuned based on actual outcomes. After six months of running it you have a calibrated model that's specific to your supply chain and risk tolerance — something no off-the-shelf tool can give you.

Code repository structure if you want to extend this — break it into clients/, metrics/, scoring/, pipelines/, and tasks/. Drop the async client into clients/, the metric functions into metrics/, and use a Celery beat schedule in tasks/ for the recurring jobs.

Happy to answer questions in the comments — particularly interested in hearing how others have approached the new-listing velocity tracking, since the snapshot diffing is the trickiest part of the pipeline to operationalize cleanly.

Top comments (0)