DEV Community

Cover image for Build an Amazon Competitor Price Tracker with AI-Powered Alerts in Under 2 Hours
Mox Loop
Mox Loop

Posted on

Build an Amazon Competitor Price Tracker with AI-Powered Alerts in Under 2 Hours

TL;DR

  • Amazon price wars are won by sellers with faster information, not deeper pockets
  • Pangolinfo Scrape API delivers structured ASIN data (BuyBox + all offers) on demand
  • OpenClaw AI Agent adds contextual analysis to raw price change detection
  • Lark/Slack webhook integration delivers alerts in <3 minutes from price change
  • Full Python implementation: 4 files, ~200 lines, deployable in under 2 hours
  • No-code alternative: AMZ Data Tracker — same result, zero Python

The Problem This Solves

Every 10 minutes your competitors can adjust their Amazon pricing. Your monitoring refresh rate determines how long they can operate at a new price point before you know about it.

Daily refresh = 10-24 hour blind spot. During that window:

  • They capture BuyBox impressions at a lower price
  • Your ads drive traffic to a listing where they convert better
  • You don't know this is happening

The fix: hourly or sub-hourly competitor price polling with immediate push notifications to the team channel where decisions actually happen.


Prerequisites

pip install openclaw requests apscheduler python-dotenv aiohttp
Enter fullscreen mode Exit fullscreen mode

Required credentials:

  • Pangolinfo API key → tool.pangolinfo.com
  • Lark bot webhook URL (Group Settings → Bots → Custom Bot)
  • Slack webhook URL (optional)
  • OpenClaw configured with an LLM key

Project Structure

amazon-price-tracker/
├── .env                    # credentials
├── config.py               # ASIN list + thresholds
├── price_collector.py      # Pangolinfo API wrapper
├── analyzer.py             # OpenClaw change detection
├── notifier.py             # Lark + Slack dispatch
└── main.py                 # scheduler + orchestration
Enter fullscreen mode Exit fullscreen mode

Part 1: Data Collection with Pangolinfo Scrape API

The Pangolinfo API returns a clean, parsed JSON structure for any Amazon ASIN—no HTML parsing, no proxy management, no anti-bot workarounds.

# price_collector.py
import requests
from datetime import datetime
from typing import Optional, List
import asyncio, aiohttp

PANGOLINFO_URL = "https://api.pangolinfo.com/v1/amazon/product"

def fetch_sync(asin: str, api_key: str, marketplace: str = "US") -> Optional[dict]:
    """Synchronous single-ASIN fetch."""
    resp = requests.post(
        PANGOLINFO_URL,
        headers={"Authorization": f"Bearer {api_key}"},
        json={
            "asin": asin,
            "marketplace": marketplace,
            "parse": True,           # Structured JSON, not raw HTML
            "include_offers": True,  # All competing seller offers
            "include_buybox": True   # BuyBox owner + price
        },
        timeout=30
    )
    resp.raise_for_status()
    d = resp.json()

    return {
        "asin": asin,
        "collected_at": datetime.utcnow().isoformat() + "Z",
        "marketplace": marketplace,
        "buybox_price": d.get("buybox", {}).get("price"),
        "buybox_seller_name": d.get("buybox", {}).get("seller_name"),
        "buybox_seller_id": d.get("buybox", {}).get("seller_id"),
        "buybox_is_fba": d.get("buybox", {}).get("is_fba"),
        "all_offers": d.get("offers", []),
        "product_title": d.get("title", "")[:80],
        "in_stock": d.get("availability") == "In Stock"
    }


async def _fetch_async(session, asin, api_key, semaphore):
    """Async single-ASIN fetch with semaphore rate limiting."""
    async with semaphore:
        async with session.post(
            PANGOLINFO_URL,
            headers={"Authorization": f"Bearer {api_key}"},
            json={"asin": asin, "parse": True, "include_offers": True, "include_buybox": True},
            timeout=aiohttp.ClientTimeout(total=30)
        ) as r:
            d = await r.json()
            return {
                "asin": asin, "collected_at": datetime.utcnow().isoformat() + "Z",
                "buybox_price": d.get("buybox", {}).get("price"),
                "buybox_seller_name": d.get("buybox", {}).get("seller_name"),
                "buybox_seller_id": d.get("buybox", {}).get("seller_id"),
                "all_offers": d.get("offers", []),
                "product_title": d.get("title", "")[:80]
            }


def fetch_batch(asin_list: List[str], api_key: str, concurrency: int = 8) -> List[dict]:
    """Async batch collection with configurable concurrency."""

    async def _run():
        sem = asyncio.Semaphore(concurrency)
        async with aiohttp.ClientSession() as session:
            tasks = [_fetch_async(session, asin, api_key, sem) for asin in asin_list]
            results = await asyncio.gather(*tasks, return_exceptions=True)
        return [r for r in results if isinstance(r, dict)]

    return asyncio.run(_run())
Enter fullscreen mode Exit fullscreen mode

What the API returns:

{
  "asin": "B0D6BFMNN5",
  "buybox_price": 79.99,
  "buybox_seller_name": "TechMart Direct",
  "buybox_seller_id": "A1XYZ123ABC",
  "buybox_is_fba": true,
  "all_offers": [
    {"seller_name": "My Store", "price": 89.99, "is_fba": true},
    {"seller_name": "TechMart Direct", "price": 79.99, "is_fba": true}
  ],
  "in_stock": true
}
Enter fullscreen mode Exit fullscreen mode

Part 2: OpenClaw Agent — Change Detection + AI Analysis

# analyzer.py  
from openclaw import Agent, Task
from typing import Optional
import time

class CompetitorPriceAnalyzer:
    """
    Detects competitor price changes and generates AI-powered competitive analysis.

    Key behaviors:
    - Only alerts on competitor drops (not your own pricing changes)
    - Configurable percentage threshold to filter noise
    - Alert deduplication with cooldown period
    - Historical change log fed to AI for pattern context
    """

    def __init__(self, 
                 threshold_pct: float = 5.0,
                 my_seller_id: str = "",
                 cooldown_minutes: int = 120):
        self.threshold = threshold_pct
        self.my_id = my_seller_id
        self.cooldown = cooldown_minutes * 60
        self._baseline = {}      # {asin: {"price": float, "log": list}}
        self._last_alerted = {}  # {asin: timestamp} for dedup

        self.agent = Agent(
            name="CompetitorAnalyst",
            role="Amazon Competitive Pricing Analyst",
            goal="Rapidly assess competitor pricing threats and recommend tactical responses",
            backstory=(
                "Senior Amazon marketplace expert with deep knowledge of pricing dynamics, "
                "seller behavior patterns, and competitive response strategies across multiple categories."
            )
        )

    def analyze(self, snapshot: dict) -> Optional[dict]:
        asin = snapshot["asin"]
        current_price = snapshot.get("buybox_price")

        if current_price is None:
            return None

        # Initialize baseline on first collection
        if asin not in self._baseline:
            self._baseline[asin] = {"price": current_price, "log": []}
            print(f"[INIT] {asin}: baseline ${current_price}")
            return None

        baseline = self._baseline[asin]
        prev_price = baseline["price"]

        if prev_price <= 0:
            baseline["price"] = current_price
            return None

        change_pct = (current_price - prev_price) / prev_price * 100
        is_competitor = snapshot.get("buybox_seller_id") != self.my_id

        # Check alert conditions
        if change_pct <= -self.threshold and is_competitor:

            # Deduplication cooldown check
            last_alert = self._last_alerted.get(asin, 0)
            if time.time() - last_alert < self.cooldown:
                print(f"[COOLDOWN] {asin} — suppressed (cooling down)")
                baseline["price"] = current_price
                return None

            # Generate AI competitive analysis
            analysis = self.agent.execute_task(Task(
                description=f"""
                Amazon competitor pricing event detected. Provide concise competitive analysis (max 3 sentences):

                ASIN: {asin}  
                Product: {snapshot.get('product_title', 'Unknown')}
                Competitor: {snapshot.get('buybox_seller_name')} (FBA: {snapshot.get('buybox_is_fba')})
                Price change: ${prev_price:.2f} → ${current_price:.2f} ({abs(change_pct):.1f}% drop)
                Historical events for this ASIN: {len(baseline['log'])} previous price changes logged
                Stock status: {'In Stock' if snapshot.get('in_stock') else 'Low/Out'}
                """,
                expected_output=(
                    "3-sentence max analysis: (1) threat level assessment, "
                    "(2) likely seller motivation if discernible, "
                    "(3) recommended immediate response action"
                )
            ))

            # Update tracking state
            baseline["log"].append({
                "from": prev_price, "to": current_price, 
                "change_pct": change_pct,
                "ts": snapshot["collected_at"]
            })
            baseline["price"] = current_price
            self._last_alerted[asin] = time.time()

            return {
                "priority": "HIGH" if abs(change_pct) >= 10 else "MEDIUM",
                "asin": asin,
                "title": snapshot.get("product_title", ""),
                "competitor_seller": snapshot.get("buybox_seller_name"),
                "competitor_is_fba": snapshot.get("buybox_is_fba"),
                "price_from": prev_price,
                "price_to": current_price,
                "change_pct": change_pct,
                "total_offers": len(snapshot.get("all_offers", [])),
                "ai_analysis": analysis,
                "detected_at": snapshot["collected_at"]
            }

        baseline["price"] = current_price
        return None
Enter fullscreen mode Exit fullscreen mode

Part 3: Notification Dispatch

# notifier.py
import requests
import logging
import json
from datetime import datetime
from pathlib import Path

logger = logging.getLogger(__name__)

class AlertNotifier:

    def __init__(self, lark_webhook: str = "", slack_webhook: str = "",
                 log_dir: str = "./alert_logs"):
        self.lark = lark_webhook
        self.slack = slack_webhook
        Path(log_dir).mkdir(parents=True, exist_ok=True)
        self.log_dir = Path(log_dir)

    def dispatch(self, event: dict):
        results = {}

        if self.lark:
            results["lark"] = self._send_lark(event)
        if self.slack:
            results["slack"] = self._send_slack(event)

        # Persist to local log
        log_file = self.log_dir / f"alerts_{datetime.utcnow().strftime('%Y%m%d')}.jsonl"
        with open(log_file, "a") as f:
            f.write(json.dumps({"event": event, "results": results}) + "\n")

        status = " | ".join(f"{k}: {'' if v else ''}" for k, v in results.items())
        print(f"[DISPATCHED] {event['asin']} -{abs(event['change_pct']):.1f}% | {status}")

    def _send_lark(self, e: dict) -> bool:
        color = "red" if e["priority"] == "HIGH" else "orange"
        fba_badge = "🟢 FBA" if e.get("competitor_is_fba") else "⚪ FBM"
        payload = {
            "msg_type": "interactive",
            "card": {
                "header": {
                    "title": {"content": f"🚨 Competitor Price Drop | {e['asin']}", "tag": "plain_text"},
                    "template": color
                },
                "elements": [
                    {"tag": "div", "text": {"tag": "larkmd", "content": (
                        f"**Product:** {e['title']}\n"
                        f"**Competitor:** {e['competitor_seller']} {fba_badge}\n"
                        f"**Price:** ${e['price_from']:.2f} → **${e['price_to']:.2f}** ({abs(e['change_pct']):.1f}% drop)\n"
                        f"**Active Sellers:** {e['total_offers']}\n"
                        f"**AI Analysis:** {e['ai_analysis']}"
                    )}},
                    {"tag": "action", "actions": [
                        {"tag": "button", "text": {"content": "View Price History", "tag": "plain_text"},
                         "type": "primary", "url": f"https://tool.pangolinfo.com/tracking/{e['asin']}"},
                        {"tag": "button", "text": {"content": "Amazon Listing", "tag": "plain_text"},
                         "type": "default", "url": f"https://www.amazon.com/dp/{e['asin']}"}
                    ]}
                ]
            }
        }
        resp = requests.post(self.lark, json=payload, timeout=10)
        return resp.status_code == 200

    def _send_slack(self, e: dict) -> bool:
        emoji = "🔴" if e["priority"] == "HIGH" else "🟡"
        payload = {
            "blocks": [
                {"type": "header", "text": {"type": "plain_text",
                 "text": f"{emoji} Amazon Competitor Price Drop"}},
                {"type": "section", "fields": [
                    {"type": "mrkdwn", "text": f"*ASIN:*\n`{e['asin']}`"},
                    {"type": "mrkdwn", "text": f"*Drop:*\n${e['price_from']:.2f} → ${e['price_to']:.2f} ({abs(e['change_pct']):.1f}%)"},
                    {"type": "mrkdwn", "text": f"*Competitor:*\n{e['competitor_seller']}"},
                    {"type": "mrkdwn", "text": f"*Offer Count:*\n{e['total_offers']} active sellers"}
                ]},
                {"type": "section", "text": {"type": "mrkdwn", "text": f"*AI:* {e['ai_analysis']}"}},
                {"type": "actions", "elements": [{
                    "type": "button", "text": {"type": "plain_text", "text": "View Price History"},
                    "url": f"https://tool.pangolinfo.com/tracking/{e['asin']}"
                }]}
            ]
        }
        resp = requests.post(self.slack, json=payload, timeout=10)
        return resp.status_code == 200
Enter fullscreen mode Exit fullscreen mode

Part 4: Scheduler Orchestration

# main.py
import os, logging
from dotenv import load_dotenv
from apscheduler.schedulers.blocking import BlockingScheduler
from price_collector import fetch_batch
from analyzer import CompetitorPriceAnalyzer
from notifier import AlertNotifier

load_dotenv()
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")

# ---- Configuration ----
MONITOR_ASINS = [
    "B0D6BFMNN5",     # Replace with your tracked competitor ASINs
    "B08K2S9X7Q",
]
POLL_INTERVAL    = 10     # minutes
ALERT_THRESHOLD  = 5.0    # % drop to trigger alert
COOLDOWN_MINUTES = 120    # min gap between repeat alerts for same ASIN
# -----------------------

analyzer = CompetitorPriceAnalyzer(
    threshold_pct=ALERT_THRESHOLD,
    my_seller_id=os.getenv("MY_SELLER_ID", ""),
    cooldown_minutes=COOLDOWN_MINUTES
)
notifier = AlertNotifier(
    lark_webhook=os.getenv("LARK_WEBHOOK_URL", ""),
    slack_webhook=os.getenv("SLACK_WEBHOOK_URL", "")
)

def run_cycle():
    logging.info(f"Starting collection cycle for {len(MONITOR_ASINS)} ASINs")
    snapshots = fetch_batch(MONITOR_ASINS, os.getenv("PANGOLINFO_API_KEY"))
    alerts = 0
    for snap in snapshots:
        event = analyzer.analyze(snap)
        if event:
            notifier.dispatch(event)
            alerts += 1
    logging.info(f"Cycle complete: {len(snapshots)} collected, {alerts} alerts")

if __name__ == "__main__":
    logging.info("🚀 Amazon Competitor Price Tracker starting")
    run_cycle()   # immediate first run

    scheduler = BlockingScheduler()
    scheduler.add_job(run_cycle, 'interval', minutes=POLL_INTERVAL, id='price_monitor')
    scheduler.start()
Enter fullscreen mode Exit fullscreen mode

Production Checklist

  • [ ] .env file configured with all credentials
  • [ ] ASINs restricted to high-priority competitors only (don't over-monitor)
  • [ ] Alert threshold calibrated (test with 5.0%, adjust based on noise level)
  • [ ] Cooldown period set to avoid alert fatigue (120 minutes is a reasonable start)
  • [ ] Health check configured: alert if collect cycle fails 3+ consecutive times
  • [ ] Log rotation for ./alert_logs/ to prevent unbounded disk usage
  • [ ] Team response SOP documented before first alert fires

Performance Reference

Metric Value
Single ASIN API latency 800ms—2s
20-ASIN batch (concurrency=8) 4—7s
Price change → Lark notification < 3 min
Storage per ASIN/day (alert log) ~2—5KB

Resources

Top comments (0)