DEV Community

Cover image for Amazon Ad Position Monitoring with Open Claw + Pangolinfo SERP API
Mox Loop
Mox Loop

Posted on

Amazon Ad Position Monitoring with Open Claw + Pangolinfo SERP API

TL;DR

Stop reading your ACoS dashboard to understand competitor behavior. Build a real-time SP ad position monitor instead.

This post covers:

  • Async batch SERP capture via Pangolinfo API (98% SP coverage)
  • Tiered keyword management (A/B/C) for signal vs. noise control
  • Change detection: Top1 change, new Top3 entrant, Top3 exit, price drop
  • LLM-enriched alerts via Open Claw + Claude
  • Deduplication and Slack delivery

Prerequisites: Open Claw deployed, Pangolinfo API key configured (see earlier posts in this series). We're jumping straight into the ad monitoring implementation.

# Quick test: is your SERP API working?
import requests

headers = {"Authorization": "Bearer YOUR_KEY"}
payload = {
    "source": "amazon_search",
    "query": "wireless earbuds",
    "marketplace": "US",
    "include_sponsored": True,
    "include_organic": False,
    "output_format": "json"
}

resp = requests.post("https://api.pangolinfo.com/v1/serp",
                     headers=headers, json=payload)
print(resp.json()["sponsored_results"][:3])
Enter fullscreen mode Exit fullscreen mode

The Problem with Existing Monitoring Tools

Your Amazon ad console shows internal metrics. Helium 10 / Jungle Scout show competitor data, but with 24-48h delays and no outbound API. When a new well-funded competitor enters your Top of Search overnight, you find out two days later when your ACoS report arrives. By then they've run 48 hours of aggressive ads at a discount price, accumulating early reviews.

What we want: automated detection of competitor ad position changes within 2 hours of them occurring, categorized by severity, routed to Slack with business context.


Step 1: Async SERP Capture

50 keywords × sequential requests = ~8 minutes per cycle. With asyncio: ~45 seconds.

import asyncio
import aiohttp
from datetime import datetime, timezone
from typing import List, Dict, Optional

PANGOLINFO_API_KEY = "your_key"

async def fetch_serp(
    keyword: str,
    marketplace: str,
    session: aiohttp.ClientSession,
    semaphore: asyncio.Semaphore
) -> Dict:
    """Capture Amazon SP ad positions for one keyword"""
    async with semaphore:
        headers = {"Authorization": f"Bearer {PANGOLINFO_API_KEY}"}
        payload = {
            "source": "amazon_search",
            "query": keyword,
            "marketplace": marketplace,
            "page": 1,
            "include_sponsored": True,
            "include_organic": False,
            "output_format": "json"
        }
        captured_at = datetime.now(timezone.utc).isoformat()

        try:
            async with session.post(
                "https://api.pangolinfo.com/v1/serp",
                headers=headers, json=payload,
                timeout=aiohttp.ClientTimeout(total=25)
            ) as resp:
                resp.raise_for_status()
                data = await resp.json()

            top_ads = sorted(
                [s for s in data.get("sponsored_results", [])
                 if "top" in s.get("ad_placement", "").lower()],
                key=lambda x: x.get("ad_rank", 999)
            )[:5]

            return {
                "keyword": keyword, "marketplace": marketplace,
                "captured_at": captured_at, "success": True,
                "top_of_search": [
                    {"rank": a.get("ad_rank"), "asin": a.get("asin"),
                     "brand": a.get("brand", ""), "price": a.get("price")}
                    for a in top_ads
                ]
            }
        except Exception as e:
            return {"keyword": keyword, "marketplace": marketplace,
                    "captured_at": captured_at,
                    "success": False, "error": str(e), "top_of_search": []}


async def batch_monitor(
    keywords: List[str],
    marketplace: str = "US",
    max_concurrent: int = 8
) -> List[Dict]:
    sem = asyncio.Semaphore(max_concurrent)
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_serp(kw, marketplace, session, sem) for kw in keywords]
        return await asyncio.gather(*tasks)
Enter fullscreen mode Exit fullscreen mode

Step 2: Keyword Tier Configuration

Don't monitor all keywords at the same frequency and alert threshold. It generates noise that gets ignored.

KEYWORD_CONFIG = {
    "tiers": {
        "A": {  # Core product keywords — monitor every 2h
            "keywords": ["wireless earbuds", "bluetooth speaker", "usb c hub"],
            "frequency_hours": 2,
            "alert_escalation": True   # Upgrade HIGH → CRITICAL
        },
        "B": {  # Competitive category keywords — monitor every 6h
            "keywords": ["earbuds under 30 dollars", "tws earbuds 2026"],
            "frequency_hours": 6,
            "alert_escalation": False
        },
        "C": {  # Exploratory long-tail — monitor daily, log only
            "keywords": [],
            "frequency_hours": 24,
            "alert_escalation": False,
            "suppress_alerts": True    # Record but don't notify
        }
    },
    "price_drop_threshold_pct": 12,
    "dedup_window_hours": 6
}
Enter fullscreen mode Exit fullscreen mode

Step 3: Change Detection Engine

from dataclasses import dataclass, field
from enum import Enum

class AlertLevel(Enum):
    CRITICAL = "CRITICAL"
    HIGH = "HIGH"
    MEDIUM = "MEDIUM"
    INFO = "INFO"

@dataclass
class AdAlert:
    keyword: str
    level: AlertLevel
    event: str          # top1_change / new_top3 / top3_exit / price_drop
    message: str
    asin: str
    timestamp: str
    llm_context: str = ""


def detect_changes(
    current: Dict,
    baseline: Optional[Dict],
    tier_config: Dict
) -> List[AdAlert]:
    """
    Compare SERP snapshots. Returns [] on first run (baseline only).
    """
    if baseline is None:
        return []

    kw = current["keyword"]
    ts = current["captured_at"]
    tier = tier_config.get("tier", "B")
    suppress = tier_config.get("suppress_alerts", False)
    escalate = tier_config.get("alert_escalation", False)
    threshold = KEYWORD_CONFIG["price_drop_threshold_pct"] / 100

    curr_top = [a["asin"] for a in current.get("top_of_search", [])[:3]]
    base_top = [a["asin"] for a in baseline.get("top_of_search", [])[:3]]
    curr_prices = {a["asin"]: a.get("price")
                   for a in current.get("top_of_search", [])[:3]}
    base_prices = {a["asin"]: a.get("price")
                   for a in baseline.get("top_of_search", [])[:3]}

    def resolve(base: AlertLevel) -> AlertLevel:
        if suppress:
            return AlertLevel.INFO
        if escalate and base == AlertLevel.HIGH:
            return AlertLevel.CRITICAL
        return base

    alerts = []

    # Top #1 changed
    if curr_top and base_top and curr_top[0] != base_top[0]:
        is_new = curr_top[0] not in base_top
        alerts.append(AdAlert(
            keyword=kw,
            level=resolve(AlertLevel.CRITICAL if is_new else AlertLevel.HIGH),
            event="top1_change",
            message=(f"Top of Search #1: {base_top[0]}{curr_top[0]}"
                     + (" [NEW ENTRANT]" if is_new else "")),
            asin=curr_top[0], timestamp=ts
        ))

    # New Top 3 entrant (excluding #1 already handled)
    for asin in set(curr_top) - set(base_top) - {curr_top[0] if curr_top else ""}:
        rank = next((a["rank"] for a in current["top_of_search"]
                     if a["asin"] == asin), -1)
        alerts.append(AdAlert(
            keyword=kw, level=resolve(AlertLevel.HIGH),
            event="new_top3",
            message=f"New entrant at Top 3 position #{rank}: {asin}",
            asin=asin, timestamp=ts
        ))

    # Top 3 exit
    for asin in set(base_top) - set(curr_top):
        alerts.append(AdAlert(
            keyword=kw, level=resolve(AlertLevel.MEDIUM),
            event="top3_exit",
            message=f"{asin} exited Top 3 — possible budget reduction",
            asin=asin, timestamp=ts
        ))

    # Price drop in Top 3
    for asin in set(curr_top) & set(base_top):
        old, new = base_prices.get(asin), curr_prices.get(asin)
        if old and new and old > 0 and (old - new) / old >= threshold:
            drop = (old - new) / old * 100
            alerts.append(AdAlert(
                keyword=kw, level=resolve(AlertLevel.HIGH),
                event="price_drop",
                message=f"{asin} price drop {drop:.1f}%: ${old:.2f} → ${new:.2f}",
                asin=asin, timestamp=ts
            ))

    return alerts
Enter fullscreen mode Exit fullscreen mode

Step 4: LLM Context Enrichment (Open Claw Layer)

from anthropic import Anthropic

_llm = Anthropic()

def add_llm_context(alert: AdAlert, snapshot: Dict) -> AdAlert:
    """Enrich CRITICAL/HIGH alerts with Claude business analysis"""
    if alert.level not in (AlertLevel.CRITICAL, AlertLevel.HIGH):
        return alert

    top3 = "\n".join([
        f"  #{a['rank']}: {a['asin']} | {a['brand']} | ${a['price'] or 'N/A'}"
        for a in snapshot.get("top_of_search", [])[:3]
    ])

    resp = _llm.messages.create(
        model="claude-3-7-sonnet-20250219",
        max_tokens=180,
        messages=[{"role": "user", "content":
            f"Amazon ad monitoring alert — keyword: '{alert.keyword}'\n"
            f"Event: {alert.message}\n"
            f"Current Top 3:\n{top3}\n\n"
            "In 70 words or fewer: likely cause + what the PPC team should check. "
            "Direct, specific, no preamble."
        }]
    )
    alert.llm_context = resp.content[0].text
    return alert
Enter fullscreen mode Exit fullscreen mode

Step 5: Slack Delivery with Deduplication

import sqlite3
import aiohttp
from datetime import timedelta

DB_PATH = "/data/ad_monitor.db"
SLACK_WEBHOOK = "https://hooks.slack.com/services/YOUR/WEBHOOK"


def init_db():
    with sqlite3.connect(DB_PATH) as conn:
        conn.executescript("""
            CREATE TABLE IF NOT EXISTS snapshots (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                keyword TEXT, marketplace TEXT,
                captured_at TEXT, data_json TEXT
            );
            CREATE TABLE IF NOT EXISTS sent_alerts (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                keyword TEXT, asin TEXT, event TEXT,
                triggered_at TEXT
            );
        """)


def is_duplicate(alert: AdAlert, window_hours: int = 6) -> bool:
    cutoff = (datetime.utcnow() - timedelta(hours=window_hours)).isoformat()
    with sqlite3.connect(DB_PATH) as conn:
        return bool(conn.execute(
            "SELECT 1 FROM sent_alerts WHERE keyword=? AND asin=? AND event=?"
            " AND triggered_at>? LIMIT 1",
            (alert.keyword, alert.asin, alert.event, cutoff)
        ).fetchone())


def record_alert(alert: AdAlert):
    with sqlite3.connect(DB_PATH) as conn:
        conn.execute(
            "INSERT INTO sent_alerts (keyword, asin, event, triggered_at)"
            " VALUES (?,?,?,?)",
            (alert.keyword, alert.asin, alert.event, alert.timestamp)
        )


async def send_to_slack(alert: AdAlert):
    emoji = {"CRITICAL": "🚨", "HIGH": "⚠️", "MEDIUM": "📊"}.get(
        alert.level.value, "ℹ️"
    )
    text = (
        f"{emoji} *[Ad Monitor]* `{alert.level.value}` — "
        f"`{alert.keyword}`\n"
        f"{alert.message}\n"
        f"ASIN: `{alert.asin}` | {alert.timestamp[:16]} UTC"
        + (f"\n> _{alert.llm_context}_" if alert.llm_context else "")
    )
    async with aiohttp.ClientSession() as session:
        await session.post(SLACK_WEBHOOK, json={"text": text},
                           timeout=aiohttp.ClientTimeout(total=10))
Enter fullscreen mode Exit fullscreen mode

Step 6: Wiring It All Together

import json

async def run_monitoring_cycle(tier: str, marketplace: str = "US"):
    """One complete monitoring run for a keyword tier"""
    tier_cfg = KEYWORD_CONFIG["tiers"][tier]
    keywords = tier_cfg["keywords"]
    if not keywords:
        return

    snapshots = await batch_monitor(keywords, marketplace)

    for snap in snapshots:
        if not snap.get("success"):
            continue

        # Load baseline (second-to-last snapshot)
        with sqlite3.connect(DB_PATH) as conn:
            row = conn.execute(
                "SELECT data_json FROM snapshots WHERE keyword=? AND marketplace=?"
                " ORDER BY captured_at DESC LIMIT 1 OFFSET 1",
                (snap["keyword"], marketplace)
            ).fetchone()
            baseline = json.loads(row[0]) if row else None

            # Save current snapshot
            conn.execute(
                "INSERT INTO snapshots (keyword, marketplace, captured_at, data_json)"
                " VALUES (?,?,?,?)",
                (snap["keyword"], marketplace, snap["captured_at"], json.dumps(snap))
            )

        # Detect + dispatch
        alerts = detect_changes(snap, baseline, tier_cfg)
        for alert in alerts:
            if not is_duplicate(alert, KEYWORD_CONFIG["dedup_window_hours"]):
                alert = add_llm_context(alert, snap)
                await send_to_slack(alert)
                record_alert(alert)


# Entry point — call from APScheduler or cron
async def main():
    init_db()
    await run_monitoring_cycle("A")   # Tier A every 2h
    # await run_monitoring_cycle("B") # Tier B every 6h

if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Production Notes

Database scaling: SQLite works fine up to ~100 keywords at 2h frequency. Beyond that, switch to PostgreSQL — SQLite's write lock becomes a bottleneck under concurrent writes.

Rate limiting: If you hit HTTP 429, add exponential backoff (start 2s, max 30s) around the session.post call in fetch_serp.

Monitoring the monitor: Set up a heartbeat check — if no rows appear in snapshots for the expected time window, alert your on-call channel. Silent failures in monitoring systems are the worst kind.

Data retention: Keep 90 days of snapshots. At 30 days you see monthly patterns; at 90 days you start recognizing seasonal behaviors and pre-promotion signals.


Further Reading

Top comments (0)