TL;DR — Off-the-shelf Amazon research tools hide their decision logic behind a single "opportunity score." This post walks through building your own six-metric niche research pipeline in Python, using ASIN-level data from Pangolinfo Scrape API. Code is production-ready and runs async for cost efficiency.
Why this exists
I've worked on data tooling for several Amazon-native brands. The recurring frustration was always the same — the team would buy a SaaS tool, get an opportunity score for a category, and have no way to interrogate why the score was what it was. When the inevitable wrong sourcing call happened, there was no decision log to learn from.
The fix is to drop one layer below the SaaS tools, pull ASIN-level data directly, and build your own decision framework in code. Amazon niche research data analysis done at this layer is reproducible, transparent, and version-controllable. Once the pipeline exists, every subsequent category analysis costs near-zero engineering time.
The framework
Six independent metrics, each tied to a specific data source. Cross-reference rather than average them.
| Metric | Question it answers | Data source |
|---|---|---|
| BSR concentration (CR10) | Is the market locked? | Category bestsellers |
| Review barrier (P25) | How many reviews to stand up? | Category bestsellers |
| New-listing velocity | Does the algorithm welcome newcomers? | Weekly category snapshots |
| Price-band coverage | Is the target price crowded? | Product detail |
| Sponsored placement density | How much organic traffic is left? | SERP with SP detection |
| Negative-review clustering | What's the unmet need? | Reviews + NLP |
Picking the data source
I tested several Amazon data APIs. The key constraints are:
- Full-category ASIN coverage — not just visible top 100
- Reliable Sponsored placement detection — most generic scrapers miss 20–40% of SP slots
- Complete review text returned — including full Customer Says keyword set, not truncated top 6
Pangolinfo Scrape API hit all three (98%+ SP detection rate is best-in-class), so the rest of this post uses it. The patterns transfer to any equivalent API.
Architecture
┌──────────────────────────────────────────────┐
│ Pangolinfo Scrape API │
│ /amazon/category/bestsellers │
│ /amazon/serp │
│ /amazon/reviews │
└─────────────────┬────────────────────────────┘
│ JSON
▼
┌──────────────────────────────────────────────┐
│ async data fetcher (aiohttp) │
│ with semaphore concurrency control │
└─────────────────┬────────────────────────────┘
│
▼
┌──────────────────────────────────────────────┐
│ pandas/numpy metric calculators │
│ cr10, p25, velocity, price-band, sp density │
└─────────────────┬────────────────────────────┘
│
▼
┌──────────────────────────────────────────────┐
│ NLP clustering (jieba/yake → BERT in prod) │
└─────────────────┬────────────────────────────┘
│
▼
┌──────────────────────────────────────────────┐
│ NicheReport dataclass + opportunity scoring │
└──────────────────────────────────────────────┘
Code — async client
import asyncio
import aiohttp
import pandas as pd
import numpy as np
from dataclasses import dataclass
from typing import List, Dict, Optional
@dataclass
class NicheReport:
node_id: str
cr10: float
review_p25: float
price_band_competitors: int
breakthrough_price: Optional[float]
sp_density: float
pain_points: List[tuple]
opportunity_score: int
class PangolinAsyncClient:
"""
Pangolinfo Amazon Scrape API async wrapper (aiohttp + semaphore).
Targets the synchronous v1/scrape endpoint. Business type is
selected via the parserName field — switch to /api/v1/scrape/async
only when you scrape millions of pages per day.
Full docs: https://docs.pangolinfo.com/en-api-reference/universalApi/universalApi
parserName cheatsheet (the `content` field varies by parser):
amzProductDetail → ASIN (reviews are nested in this response)
amzKeyword → search keyword (SERP, includes SP placements)
amzProductOfCategory → category Browse Node ID
amzBestSellers → category keyword for best-sellers chart
amzNewReleases → category keyword for new-releases chart
amzProductOfSeller → seller/store ID
"""
BASE_URL = "https://scrapeapi.pangolinfo.com/api/v1/scrape"
SITE_MAP = {"US": "www.amazon.com", "DE": "www.amazon.de",
"UK": "www.amazon.co.uk", "JP": "www.amazon.co.jp"}
def __init__(self, api_key: str, concurrency: int = 8):
self.api_key = api_key
self.semaphore = asyncio.Semaphore(concurrency)
async def _post(self, session, parser_name, content, marketplace="US", zipcode="10041"):
payload = {
"parserName": parser_name,
"content": content,
"site": self.SITE_MAP[marketplace],
"format": "json",
"bizContext": {"zipcode": zipcode},
}
async with self.semaphore:
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
async with session.post(self.BASE_URL, json=payload, headers=headers) as resp:
resp.raise_for_status()
data = await resp.json()
return data["data"]["json"]
async def fetch_category_listings(self, session, node_id):
return await self._post(session, "amzProductOfCategory", node_id)
async def fetch_keyword_serp(self, session, keyword):
return await self._post(session, "amzKeyword", keyword)
async def fetch_product_detail(self, session, asin):
"""Returns the full product detail. The `reviews` array is nested in the
response — filter client-side by `star` to get negative reviews only."""
return await self._post(session, "amzProductDetail", asin)
Code — metric computation
def compute_cr10(top100):
df = pd.DataFrame(top100).sort_values("estimated_monthly_sales", ascending=False)
top10 = df.head(10)["estimated_monthly_sales"].sum()
total = df["estimated_monthly_sales"].sum()
return top10 / total if total else 0.0
def compute_review_p25(top50):
reviews = [item["review_count"] for item in top50 if item.get("review_count")]
return float(np.percentile(reviews, 25)) if reviews else 0.0
def compute_price_band(top200, target_price, band=5):
df = pd.DataFrame(top200)
in_band = df[(df["price"] >= target_price - band) & (df["price"] <= target_price + band)]
low_review_winners = df[(df["review_count"] < 500)].head(100)
breakthrough = float(low_review_winners["price"].median()) if len(low_review_winners) else None
return len(in_band), breakthrough
def compute_sp_density(serp):
top48 = serp[:48]
sp = sum(1 for item in top48 if item.get("is_sponsored"))
return sp / len(top48) if top48 else 0.0
def cluster_pain_points(reviews_text, top_k=5):
"""MVP: keyword extraction. Production: swap for BERT/sentence-transformers."""
import yake
extractor = yake.KeywordExtractor(top=top_k, n=2)
text = " ".join(reviews_text)
return extractor.extract_keywords(text)
Code — the orchestration
async def evaluate_niche(client, node_id, target_price, head_keyword):
async with aiohttp.ClientSession() as session:
listings, serp = await asyncio.gather(
client.fetch_category_listings(session, node_id),
client.fetch_keyword_serp(session, head_keyword),
)
top200 = listings[:200]
# Reviews are nested inside amzProductDetail responses
detail_tasks = [client.fetch_product_detail(session, item["asin"]) for item in top200[:30]]
all_details = await asyncio.gather(*detail_tasks)
flat_reviews = []
for detail in all_details:
results = detail[0].get("data", {}).get("results", [{}])[0]
for review in results.get("reviews", []):
star_value = float(review.get("star", "5").split()[0]) # "4.0 out of 5 stars"
if star_value <= 3:
flat_reviews.append(review.get("content", ""))
cr10 = compute_cr10(top200[:100])
review_p25 = compute_review_p25(top200[:50])
band_count, breakthrough = compute_price_band(top200, target_price)
sp_density = compute_sp_density(serp)
pain_points = cluster_pain_points(flat_reviews)
score = score_opportunity(cr10, review_p25, sp_density, band_count)
return NicheReport(
node_id=node_id,
cr10=cr10,
review_p25=review_p25,
price_band_competitors=band_count,
breakthrough_price=breakthrough,
sp_density=sp_density,
pain_points=pain_points,
opportunity_score=score,
)
def score_opportunity(cr10, review_p25, sp_density, band_count):
score = 100
score -= 30 if cr10 > 0.70 else (10 if cr10 > 0.55 else 0)
score -= 25 if review_p25 > 1500 else (10 if review_p25 > 800 else 0)
score -= 20 if sp_density > 0.45 else (10 if sp_density > 0.35 else 0)
score -= 15 if band_count > 30 else 0
return max(0, score)
Running it
import asyncio
async def main():
client = PangolinAsyncClient(api_key="your_key", concurrency=8)
report = await evaluate_niche(
client,
node_id="2251606011", # Coffee Filters
target_price=18.99,
head_keyword="coffee filters",
)
print(f"Opportunity score: {report.opportunity_score}/100")
print(f"CR10: {report.cr10:.1%}")
print(f"Review barrier (P25): {report.review_p25:.0f}")
print(f"Competitors in target price band: {report.price_band_competitors}")
print(f"Breakthrough price anchor: ${report.breakthrough_price}")
print(f"SP density: {report.sp_density:.1%}")
print(f"Top pain points: {report.pain_points}")
asyncio.run(main())
Sample output for a representative category:
Opportunity score: 65/100
CR10: 48.3%
Review barrier (P25): 412
Competitors in target price band: 14
Breakthrough price anchor: $16.50
SP density: 31.2%
Top pain points: [('bleach taste', 0.041), ('cup size fit', 0.038), ('packaging fragile', 0.029), ...]
Production hardening
A few things to add before this hits production:
Caching — BSR doesn't need minute-level refresh. Redis-cache the bestseller responses with a 24-hour TTL.
import redis, json
r = redis.Redis()
async def fetch_category_cached(client, session, node_id, ttl=86400):
key = f"cat:{node_id}:{pd.Timestamp.now().date()}"
cached = r.get(key)
if cached:
return json.loads(cached)
data = await client.fetch_category_listings(session, node_id)
r.setex(key, ttl, json.dumps(data))
return data
Rate limiting — token bucket on the client side to stay under API quota:
from aiolimiter import AsyncLimiter
limiter = AsyncLimiter(max_rate=10, time_period=1) # 10 req/s
async def rate_limited_get(...):
async with limiter:
return await client._get(...)
NLP upgrade path — yake or jieba is fine for MVP but misses semantic clusters ("hard to clean" and "won't come off in dishwasher" should cluster together). For production, swap in sentence-transformers with HDBSCAN clustering, or call an LLM for thematic summarization.
New-listing velocity — needs historical snapshots, which means scheduling a weekly job that diffs current top-200 ASINs against a stored ledger. Use Celery + Redis for the scheduling.
Why this beats SaaS tools
- Cross-metric filters work natively. "ASINs with under 500 reviews that still rank in top 100" is one DataFrame query, not a CSV export.
- Refresh frequency under your control. Run it hourly during a launch monitoring window if you need to.
- Decision logic is in code. Future-you (or your team) can always answer "why did we score this category at 65?"
- Marginal cost per category approaches zero. Once the pipeline runs, scanning 100 candidates per week costs only API calls and compute.
If you don't want to self-build, AMZ Data Tracker ships the same six-metric framework as a hosted dashboard with daily refresh and built-in NLP. The trade-off is less customization but a 30-minute setup instead of 4–6 weeks.
Closing
The compounding payoff of this kind of pipeline is what makes it worth building. Every category you evaluate is logged, every decision is reproducible, every threshold can be tuned based on actual outcomes. After six months of running it you have a calibrated model that's specific to your supply chain and risk tolerance — something no off-the-shelf tool can give you.
Code repository structure if you want to extend this — break it into clients/, metrics/, scoring/, pipelines/, and tasks/. Drop the async client into clients/, the metric functions into metrics/, and use a Celery beat schedule in tasks/ for the recurring jobs.
Happy to answer questions in the comments — particularly interested in hearing how others have approached the new-listing velocity tracking, since the snapshot diffing is the trickiest part of the pipeline to operationalize cleanly.
Top comments (0)