TL;DR
Stop relying on SaaS tools with 24-72 hour data lag to find winning Amazon products. Build a real-time discovery pipeline with the Pangolinfo Scrape API instead — faster data, fully customizable filters, no information homogenization problem. Here's the complete implementation.
The Problem With Standard Product Research Tools
Every conventional Amazon research tool — Helium 10, Jungle Scout, and their peers — runs on a crawl-and-cache model. Amazon data is scraped at intervals (typically once every 24-48 hours), processed, stored in a centralized database, and served to all subscribers through a shared interface.
That means:
- Data you see is 24-72 hours old before you can act on it
- Every subscriber is looking at the same data at the same time
- There's no structural advantage to using the same tool as everyone else
In a 2026 Amazon marketplace where category competition compounds rapidly, "discovering" an opportunity that ten thousand other tool subscribers are also seeing is not a discovery. It's a race to second place.
The fix: find winning products on Amazon via API — query Amazon's public data directly, in real time, on your own cadence.
What You'll Build
A three-layer automated product discovery pipeline:
Layer 1: Category bestseller scanning (real-time BSR + review data)
↓
Layer 2: Keyword competition analysis (ad density, organic competition)
↓
Layer 3: Negative review mining (competitor product gap identification)
↓
Output: Ranked shortlist of blue ocean candidates with opportunity scores
Stack: Python 3.11+, Pangolinfo Scrape API, pandas, requests
Setup
pip install requests pandas python-dotenv
# config.py
import os
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv("PANGOLINFO_API_KEY")
BASE_URL = "https://api.pangolinfo.com/v1/scrape"
REVIEWS_URL = "https://api.pangolinfo.com/v1/reviews"
HEADERS = {"Authorization": f"Bearer {API_KEY}"}
Get your API key at tool.pangolinfo.com.
Layer 1: Real-Time Bestseller Scanner
# scanner.py
import requests
import time
from config import BASE_URL, HEADERS
def scan_bestsellers(
category_node: str,
marketplace: str = "US",
bsr_range: tuple = (20, 500),
max_reviews: int = 500
) -> list[dict]:
"""
Scan a category node's bestseller list in real time.
Returns filtered candidates meeting BSR range and review ceiling.
Args:
category_node: Amazon category path, e.g. 'kitchen/coffee-makers'
marketplace: Amazon marketplace code
bsr_range: Tuple of (min_bsr, max_bsr) for filtering
max_reviews: Maximum review count to include (lower = less competition)
"""
payload = {
"url": f"https://www.amazon.com/best-sellers/{category_node}",
"parse_type": "bestsellers",
"marketplace": marketplace,
"include_sponsored": True,
"output_format": "json"
}
resp = requests.post(BASE_URL, json=payload, headers=HEADERS, timeout=30)
resp.raise_for_status()
data = resp.json()
candidates = []
for p in data.get("products", []):
bsr = p.get("bsr_rank", 99999)
reviews = p.get("review_count", 99999)
if bsr_range[0] <= bsr <= bsr_range[1] and reviews <= max_reviews:
candidates.append({
"asin": p["asin"],
"title": p.get("title", ""),
"bsr_rank": bsr,
"review_count": reviews,
"rating": p.get("rating", 0.0),
"price": p.get("price", 0.0),
"bsr_7d_change": p.get("bsr_7d_change"), # positive = improving rank
"source_category": category_node
})
return candidates
def scan_multiple_categories(
categories: list[str],
delay_between: float = 0.5,
**filter_kwargs
) -> list[dict]:
"""Scan multiple category nodes with rate limiting."""
all_candidates = []
for cat in categories:
try:
results = scan_bestsellers(cat, **filter_kwargs)
all_candidates.extend(results)
print(f" [{cat}] → {len(results)} candidates")
except requests.RequestException as e:
print(f" [{cat}] → Error: {e}")
time.sleep(delay_between)
return all_candidates
Layer 2: Keyword Competition Analyzer
# competition.py
import requests
from config import BASE_URL, HEADERS
def analyze_keyword(keyword: str, marketplace: str = "US") -> dict:
"""
Fetch keyword search results page and calculate competition metrics.
Key metric: ad_density = sponsored_listings / total_results
Lower density = lower cost-per-click competition = easier organic entry
"""
payload = {
"keyword": keyword,
"parse_type": "search_results",
"marketplace": marketplace,
"include_ads": True,
"output_format": "json"
}
resp = requests.post(BASE_URL, json=payload, headers=HEADERS, timeout=30)
resp.raise_for_status()
data = resp.json()
total = data.get("total_results", 1)
sponsored = data.get("sponsored_count", 0)
results = data.get("results", [])
return {
"keyword": keyword,
"total_listings": total,
"sponsored_count": sponsored,
"ad_density": round(sponsored / max(total, 1), 3),
"avg_price_top10": round(
sum(r.get("price", 0) for r in results[:10]) / max(len(results[:10]), 1), 2
),
"top_asins": [r["asin"] for r in results[:5]]
}
def batch_keyword_analysis(keywords: list[str]) -> dict[str, dict]:
"""Analyze multiple keywords and return aggregated competition data."""
results = {}
for kw in keywords:
try:
results[kw] = analyze_keyword(kw)
except Exception as e:
print(f" Keyword '{kw}' failed: {e}")
return results
Layer 3: Negative Review Miner
# review_miner.py
import requests
from collections import Counter
from config import REVIEWS_URL, HEADERS
# Complaint taxonomy for Amazon product categories
COMPLAINT_TAXONOMY = {
"durability": ["broke", "broken", "stopped working", "defective", "cracked", "damaged"],
"ease_of_use": ["hard to use", "complicated", "confusing", "difficult", "not intuitive"],
"missing_feature":["wish it had", "should include", "missing", "lacks", "no option for", "would be better if"],
"size_fit": ["too small", "too large", "doesn't fit", "wrong size", "bulky"],
"value": ["overpriced", "not worth", "too expensive", "cheap quality", "waste of money"],
"reliability": ["inconsistent", "unreliable", "sometimes works", "random", "hit or miss"]
}
def mine_competitor_gaps(asin: str, max_pages: int = 5) -> dict:
"""
Extract and categorize negative reviews for a competitor ASIN.
Returns complaint category counts — higher count = bigger unmet need.
"""
payload = {
"asin": asin,
"filter_star_rating": "three_star_and_below",
"sort_by": "helpful",
"max_pages": max_pages,
"output_format": "json"
}
resp = requests.post(REVIEWS_URL, json=payload, headers=HEADERS, timeout=60)
resp.raise_for_status()
reviews = resp.json().get("reviews", [])
complaint_counts = Counter()
for review in reviews:
text = (review.get("title", "") + " " + review.get("body", "")).lower()
for category, keywords in COMPLAINT_TAXONOMY.items():
if any(kw in text for kw in keywords):
complaint_counts[category] += 1
return {
"asin": asin,
"total_negative_reviews_analyzed": len(reviews),
"complaint_breakdown": dict(complaint_counts.most_common()),
"primary_gap": complaint_counts.most_common(1)[0][0] if complaint_counts else None
}
Putting It All Together
# main.py
import pandas as pd
from scanner import scan_multiple_categories
from competition import batch_keyword_analysis
from review_miner import mine_competitor_gaps
def opportunity_score(product: dict, avg_ad_density: float) -> float:
"""
Composite opportunity score (0-100):
Market traction (BSR position): 40 pts
Competition headroom (review count): 30 pts
Growth momentum (BSR trend): 20 pts
Ad market efficiency (density): 10 pts
"""
bsr = product.get("bsr_rank", 9999)
reviews = product.get("review_count", 9999)
trend = product.get("bsr_7d_change") or 0
market = 40 * max(0, 1 - abs(bsr - 175) / 475)
compete = 30 * max(0, (500 - reviews) / 500)
momentum = 20 * min(1.0, max(0, trend / 200))
ad_eff = 10 * (1 - avg_ad_density)
return round(market + compete + momentum + ad_eff, 2)
def run_pipeline(categories: list[str], keywords: list[str]) -> pd.DataFrame:
print("Phase 1 — Scanning categories...")
candidates = scan_multiple_categories(
categories, bsr_range=(30, 400), max_reviews=400
)
print(f" Total candidates: {len(candidates)}")
print("\nPhase 2 — Analyzing keyword competition...")
kw_data = batch_keyword_analysis(keywords)
avg_density = sum(v["ad_density"] for v in kw_data.values()) / len(kw_data)
print(f" Average ad density: {avg_density:.1%}")
print("\nPhase 3 — Scoring and filtering...")
for c in candidates:
c["opportunity_score"] = opportunity_score(c, avg_density)
shortlist = sorted(
[c for c in candidates if c["opportunity_score"] >= 55],
key=lambda x: x["opportunity_score"],
reverse=True
)
# Phase 4: Review mining on top 5
print(f"\nPhase 4 — Mining competitor reviews ({min(5, len(shortlist))} ASINs)...")
for item in shortlist[:5]:
gap = mine_competitor_gaps(item["asin"])
item["primary_product_gap"] = gap["primary_gap"]
item["gap_count"] = gap["total_negative_reviews_analyzed"]
return pd.DataFrame(shortlist[:20])
if __name__ == "__main__":
df = run_pipeline(
categories=["kitchen/pour-over-coffee", "kitchen/coffee-makers"],
keywords=["pour over coffee maker", "compact cold brew coffee maker"]
)
print("\n🏆 Top 10 Blue Ocean Candidates:")
print(df[["asin", "bsr_rank", "review_count", "opportunity_score", "primary_product_gap"]].head(10).to_string())
df.to_csv("discovery_results.csv", index=False)
Performance Tips
-
Rate limiting: Add
time.sleep(0.5)between API calls in production - Caching: Cache keyword competition results daily — they change slowly
- Retry logic: Wrap all API calls in exponential backoff (retry up to 3x)
- Scheduling: Use APScheduler or Celery for automated daily runs
- Storage: Write results to PostgreSQL with timestamps for trend tracking
Scaling Beyond the Script
Once you've validated the pipeline logic, AMZ Data Tracker provides a no-code visual layer on top of the same Pangolinfo data infrastructure — useful for sharing category monitoring views with non-technical teammates.
For AI-driven automation, Pangolinfo also provides an Amazon Scraper Skill compatible with MCP-protocol AI agents, enabling natural-language product research queries without manual pipeline management.
Resources
Questions or implementation issues? Drop them in the comments — happy to help debug.
Top comments (0)