DEV Community

Cover image for Find Winning Products on Amazon via API: A Complete Technical Guide (2026)
Mox Loop
Mox Loop

Posted on

Find Winning Products on Amazon via API: A Complete Technical Guide (2026)

TL;DR

Stop relying on SaaS tools with 24-72 hour data lag to find winning Amazon products. Build a real-time discovery pipeline with the Pangolinfo Scrape API instead — faster data, fully customizable filters, no information homogenization problem. Here's the complete implementation.


The Problem With Standard Product Research Tools

Every conventional Amazon research tool — Helium 10, Jungle Scout, and their peers — runs on a crawl-and-cache model. Amazon data is scraped at intervals (typically once every 24-48 hours), processed, stored in a centralized database, and served to all subscribers through a shared interface.

That means:

  • Data you see is 24-72 hours old before you can act on it
  • Every subscriber is looking at the same data at the same time
  • There's no structural advantage to using the same tool as everyone else

In a 2026 Amazon marketplace where category competition compounds rapidly, "discovering" an opportunity that ten thousand other tool subscribers are also seeing is not a discovery. It's a race to second place.

The fix: find winning products on Amazon via API — query Amazon's public data directly, in real time, on your own cadence.


What You'll Build

A three-layer automated product discovery pipeline:

Layer 1: Category bestseller scanning (real-time BSR + review data)
    ↓
Layer 2: Keyword competition analysis (ad density, organic competition)
    ↓
Layer 3: Negative review mining (competitor product gap identification)
    ↓
Output: Ranked shortlist of blue ocean candidates with opportunity scores
Enter fullscreen mode Exit fullscreen mode

Stack: Python 3.11+, Pangolinfo Scrape API, pandas, requests


Setup

pip install requests pandas python-dotenv
Enter fullscreen mode Exit fullscreen mode
# config.py
import os
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv("PANGOLINFO_API_KEY")
BASE_URL = "https://api.pangolinfo.com/v1/scrape"
REVIEWS_URL = "https://api.pangolinfo.com/v1/reviews"
HEADERS = {"Authorization": f"Bearer {API_KEY}"}
Enter fullscreen mode Exit fullscreen mode

Get your API key at tool.pangolinfo.com.


Layer 1: Real-Time Bestseller Scanner

# scanner.py
import requests
import time
from config import BASE_URL, HEADERS


def scan_bestsellers(
    category_node: str,
    marketplace: str = "US",
    bsr_range: tuple = (20, 500),
    max_reviews: int = 500
) -> list[dict]:
    """
    Scan a category node's bestseller list in real time.
    Returns filtered candidates meeting BSR range and review ceiling.

    Args:
        category_node: Amazon category path, e.g. 'kitchen/coffee-makers'
        marketplace: Amazon marketplace code
        bsr_range: Tuple of (min_bsr, max_bsr) for filtering
        max_reviews: Maximum review count to include (lower = less competition)
    """
    payload = {
        "url": f"https://www.amazon.com/best-sellers/{category_node}",
        "parse_type": "bestsellers",
        "marketplace": marketplace,
        "include_sponsored": True,
        "output_format": "json"
    }

    resp = requests.post(BASE_URL, json=payload, headers=HEADERS, timeout=30)
    resp.raise_for_status()
    data = resp.json()

    candidates = []
    for p in data.get("products", []):
        bsr = p.get("bsr_rank", 99999)
        reviews = p.get("review_count", 99999)

        if bsr_range[0] <= bsr <= bsr_range[1] and reviews <= max_reviews:
            candidates.append({
                "asin": p["asin"],
                "title": p.get("title", ""),
                "bsr_rank": bsr,
                "review_count": reviews,
                "rating": p.get("rating", 0.0),
                "price": p.get("price", 0.0),
                "bsr_7d_change": p.get("bsr_7d_change"),  # positive = improving rank
                "source_category": category_node
            })

    return candidates


def scan_multiple_categories(
    categories: list[str],
    delay_between: float = 0.5,
    **filter_kwargs
) -> list[dict]:
    """Scan multiple category nodes with rate limiting."""
    all_candidates = []
    for cat in categories:
        try:
            results = scan_bestsellers(cat, **filter_kwargs)
            all_candidates.extend(results)
            print(f"  [{cat}] → {len(results)} candidates")
        except requests.RequestException as e:
            print(f"  [{cat}] → Error: {e}")
        time.sleep(delay_between)
    return all_candidates
Enter fullscreen mode Exit fullscreen mode

Layer 2: Keyword Competition Analyzer

# competition.py
import requests
from config import BASE_URL, HEADERS


def analyze_keyword(keyword: str, marketplace: str = "US") -> dict:
    """
    Fetch keyword search results page and calculate competition metrics.

    Key metric: ad_density = sponsored_listings / total_results
    Lower density = lower cost-per-click competition = easier organic entry
    """
    payload = {
        "keyword": keyword,
        "parse_type": "search_results",
        "marketplace": marketplace,
        "include_ads": True,
        "output_format": "json"
    }

    resp = requests.post(BASE_URL, json=payload, headers=HEADERS, timeout=30)
    resp.raise_for_status()
    data = resp.json()

    total = data.get("total_results", 1)
    sponsored = data.get("sponsored_count", 0)
    results = data.get("results", [])

    return {
        "keyword": keyword,
        "total_listings": total,
        "sponsored_count": sponsored,
        "ad_density": round(sponsored / max(total, 1), 3),
        "avg_price_top10": round(
            sum(r.get("price", 0) for r in results[:10]) / max(len(results[:10]), 1), 2
        ),
        "top_asins": [r["asin"] for r in results[:5]]
    }


def batch_keyword_analysis(keywords: list[str]) -> dict[str, dict]:
    """Analyze multiple keywords and return aggregated competition data."""
    results = {}
    for kw in keywords:
        try:
            results[kw] = analyze_keyword(kw)
        except Exception as e:
            print(f"  Keyword '{kw}' failed: {e}")
    return results
Enter fullscreen mode Exit fullscreen mode

Layer 3: Negative Review Miner

# review_miner.py
import requests
from collections import Counter
from config import REVIEWS_URL, HEADERS

# Complaint taxonomy for Amazon product categories
COMPLAINT_TAXONOMY = {
    "durability":     ["broke", "broken", "stopped working", "defective", "cracked", "damaged"],
    "ease_of_use":    ["hard to use", "complicated", "confusing", "difficult", "not intuitive"],
    "missing_feature":["wish it had", "should include", "missing", "lacks", "no option for", "would be better if"],
    "size_fit":       ["too small", "too large", "doesn't fit", "wrong size", "bulky"],
    "value":          ["overpriced", "not worth", "too expensive", "cheap quality", "waste of money"],
    "reliability":    ["inconsistent", "unreliable", "sometimes works", "random", "hit or miss"]
}


def mine_competitor_gaps(asin: str, max_pages: int = 5) -> dict:
    """
    Extract and categorize negative reviews for a competitor ASIN.
    Returns complaint category counts — higher count = bigger unmet need.
    """
    payload = {
        "asin": asin,
        "filter_star_rating": "three_star_and_below",
        "sort_by": "helpful",
        "max_pages": max_pages,
        "output_format": "json"
    }

    resp = requests.post(REVIEWS_URL, json=payload, headers=HEADERS, timeout=60)
    resp.raise_for_status()
    reviews = resp.json().get("reviews", [])

    complaint_counts = Counter()
    for review in reviews:
        text = (review.get("title", "") + " " + review.get("body", "")).lower()
        for category, keywords in COMPLAINT_TAXONOMY.items():
            if any(kw in text for kw in keywords):
                complaint_counts[category] += 1

    return {
        "asin": asin,
        "total_negative_reviews_analyzed": len(reviews),
        "complaint_breakdown": dict(complaint_counts.most_common()),
        "primary_gap": complaint_counts.most_common(1)[0][0] if complaint_counts else None
    }
Enter fullscreen mode Exit fullscreen mode

Putting It All Together

# main.py
import pandas as pd
from scanner import scan_multiple_categories
from competition import batch_keyword_analysis
from review_miner import mine_competitor_gaps


def opportunity_score(product: dict, avg_ad_density: float) -> float:
    """
    Composite opportunity score (0-100):
      Market traction (BSR position): 40 pts
      Competition headroom (review count): 30 pts
      Growth momentum (BSR trend): 20 pts
      Ad market efficiency (density): 10 pts
    """
    bsr = product.get("bsr_rank", 9999)
    reviews = product.get("review_count", 9999)
    trend = product.get("bsr_7d_change") or 0

    market   = 40 * max(0, 1 - abs(bsr - 175) / 475)
    compete  = 30 * max(0, (500 - reviews) / 500)
    momentum = 20 * min(1.0, max(0, trend / 200))
    ad_eff   = 10 * (1 - avg_ad_density)

    return round(market + compete + momentum + ad_eff, 2)


def run_pipeline(categories: list[str], keywords: list[str]) -> pd.DataFrame:
    print("Phase 1 — Scanning categories...")
    candidates = scan_multiple_categories(
        categories, bsr_range=(30, 400), max_reviews=400
    )
    print(f"  Total candidates: {len(candidates)}")

    print("\nPhase 2 — Analyzing keyword competition...")
    kw_data = batch_keyword_analysis(keywords)
    avg_density = sum(v["ad_density"] for v in kw_data.values()) / len(kw_data)
    print(f"  Average ad density: {avg_density:.1%}")

    print("\nPhase 3 — Scoring and filtering...")
    for c in candidates:
        c["opportunity_score"] = opportunity_score(c, avg_density)

    shortlist = sorted(
        [c for c in candidates if c["opportunity_score"] >= 55],
        key=lambda x: x["opportunity_score"],
        reverse=True
    )

    # Phase 4: Review mining on top 5
    print(f"\nPhase 4 — Mining competitor reviews ({min(5, len(shortlist))} ASINs)...")
    for item in shortlist[:5]:
        gap = mine_competitor_gaps(item["asin"])
        item["primary_product_gap"] = gap["primary_gap"]
        item["gap_count"] = gap["total_negative_reviews_analyzed"]

    return pd.DataFrame(shortlist[:20])


if __name__ == "__main__":
    df = run_pipeline(
        categories=["kitchen/pour-over-coffee", "kitchen/coffee-makers"],
        keywords=["pour over coffee maker", "compact cold brew coffee maker"]
    )
    print("\n🏆 Top 10 Blue Ocean Candidates:")
    print(df[["asin", "bsr_rank", "review_count", "opportunity_score", "primary_product_gap"]].head(10).to_string())
    df.to_csv("discovery_results.csv", index=False)
Enter fullscreen mode Exit fullscreen mode

Performance Tips

  • Rate limiting: Add time.sleep(0.5) between API calls in production
  • Caching: Cache keyword competition results daily — they change slowly
  • Retry logic: Wrap all API calls in exponential backoff (retry up to 3x)
  • Scheduling: Use APScheduler or Celery for automated daily runs
  • Storage: Write results to PostgreSQL with timestamps for trend tracking

Scaling Beyond the Script

Once you've validated the pipeline logic, AMZ Data Tracker provides a no-code visual layer on top of the same Pangolinfo data infrastructure — useful for sharing category monitoring views with non-technical teammates.

For AI-driven automation, Pangolinfo also provides an Amazon Scraper Skill compatible with MCP-protocol AI agents, enabling natural-language product research queries without manual pipeline management.


Resources


Questions or implementation issues? Drop them in the comments — happy to help debug.

Top comments (0)