TL;DR
Stop reading your ACoS dashboard to understand competitor behavior. Build a real-time SP ad position monitor instead.
This post covers:
- Async batch SERP capture via Pangolinfo API (98% SP coverage)
- Tiered keyword management (A/B/C) for signal vs. noise control
- Change detection: Top1 change, new Top3 entrant, Top3 exit, price drop
- LLM-enriched alerts via Open Claw + Claude
- Deduplication and Slack delivery
Prerequisites: Open Claw deployed, Pangolinfo API key configured (see earlier posts in this series). We're jumping straight into the ad monitoring implementation.
# Quick test: is your SERP API working?
import requests
headers = {"Authorization": "Bearer YOUR_KEY"}
payload = {
"source": "amazon_search",
"query": "wireless earbuds",
"marketplace": "US",
"include_sponsored": True,
"include_organic": False,
"output_format": "json"
}
resp = requests.post("https://api.pangolinfo.com/v1/serp",
headers=headers, json=payload)
print(resp.json()["sponsored_results"][:3])
The Problem with Existing Monitoring Tools
Your Amazon ad console shows internal metrics. Helium 10 / Jungle Scout show competitor data, but with 24-48h delays and no outbound API. When a new well-funded competitor enters your Top of Search overnight, you find out two days later when your ACoS report arrives. By then they've run 48 hours of aggressive ads at a discount price, accumulating early reviews.
What we want: automated detection of competitor ad position changes within 2 hours of them occurring, categorized by severity, routed to Slack with business context.
Step 1: Async SERP Capture
50 keywords × sequential requests = ~8 minutes per cycle. With asyncio: ~45 seconds.
import asyncio
import aiohttp
from datetime import datetime, timezone
from typing import List, Dict, Optional
PANGOLINFO_API_KEY = "your_key"
async def fetch_serp(
keyword: str,
marketplace: str,
session: aiohttp.ClientSession,
semaphore: asyncio.Semaphore
) -> Dict:
"""Capture Amazon SP ad positions for one keyword"""
async with semaphore:
headers = {"Authorization": f"Bearer {PANGOLINFO_API_KEY}"}
payload = {
"source": "amazon_search",
"query": keyword,
"marketplace": marketplace,
"page": 1,
"include_sponsored": True,
"include_organic": False,
"output_format": "json"
}
captured_at = datetime.now(timezone.utc).isoformat()
try:
async with session.post(
"https://api.pangolinfo.com/v1/serp",
headers=headers, json=payload,
timeout=aiohttp.ClientTimeout(total=25)
) as resp:
resp.raise_for_status()
data = await resp.json()
top_ads = sorted(
[s for s in data.get("sponsored_results", [])
if "top" in s.get("ad_placement", "").lower()],
key=lambda x: x.get("ad_rank", 999)
)[:5]
return {
"keyword": keyword, "marketplace": marketplace,
"captured_at": captured_at, "success": True,
"top_of_search": [
{"rank": a.get("ad_rank"), "asin": a.get("asin"),
"brand": a.get("brand", ""), "price": a.get("price")}
for a in top_ads
]
}
except Exception as e:
return {"keyword": keyword, "marketplace": marketplace,
"captured_at": captured_at,
"success": False, "error": str(e), "top_of_search": []}
async def batch_monitor(
keywords: List[str],
marketplace: str = "US",
max_concurrent: int = 8
) -> List[Dict]:
sem = asyncio.Semaphore(max_concurrent)
async with aiohttp.ClientSession() as session:
tasks = [fetch_serp(kw, marketplace, session, sem) for kw in keywords]
return await asyncio.gather(*tasks)
Step 2: Keyword Tier Configuration
Don't monitor all keywords at the same frequency and alert threshold. It generates noise that gets ignored.
KEYWORD_CONFIG = {
"tiers": {
"A": { # Core product keywords — monitor every 2h
"keywords": ["wireless earbuds", "bluetooth speaker", "usb c hub"],
"frequency_hours": 2,
"alert_escalation": True # Upgrade HIGH → CRITICAL
},
"B": { # Competitive category keywords — monitor every 6h
"keywords": ["earbuds under 30 dollars", "tws earbuds 2026"],
"frequency_hours": 6,
"alert_escalation": False
},
"C": { # Exploratory long-tail — monitor daily, log only
"keywords": [],
"frequency_hours": 24,
"alert_escalation": False,
"suppress_alerts": True # Record but don't notify
}
},
"price_drop_threshold_pct": 12,
"dedup_window_hours": 6
}
Step 3: Change Detection Engine
from dataclasses import dataclass, field
from enum import Enum
class AlertLevel(Enum):
CRITICAL = "CRITICAL"
HIGH = "HIGH"
MEDIUM = "MEDIUM"
INFO = "INFO"
@dataclass
class AdAlert:
keyword: str
level: AlertLevel
event: str # top1_change / new_top3 / top3_exit / price_drop
message: str
asin: str
timestamp: str
llm_context: str = ""
def detect_changes(
current: Dict,
baseline: Optional[Dict],
tier_config: Dict
) -> List[AdAlert]:
"""
Compare SERP snapshots. Returns [] on first run (baseline only).
"""
if baseline is None:
return []
kw = current["keyword"]
ts = current["captured_at"]
tier = tier_config.get("tier", "B")
suppress = tier_config.get("suppress_alerts", False)
escalate = tier_config.get("alert_escalation", False)
threshold = KEYWORD_CONFIG["price_drop_threshold_pct"] / 100
curr_top = [a["asin"] for a in current.get("top_of_search", [])[:3]]
base_top = [a["asin"] for a in baseline.get("top_of_search", [])[:3]]
curr_prices = {a["asin"]: a.get("price")
for a in current.get("top_of_search", [])[:3]}
base_prices = {a["asin"]: a.get("price")
for a in baseline.get("top_of_search", [])[:3]}
def resolve(base: AlertLevel) -> AlertLevel:
if suppress:
return AlertLevel.INFO
if escalate and base == AlertLevel.HIGH:
return AlertLevel.CRITICAL
return base
alerts = []
# Top #1 changed
if curr_top and base_top and curr_top[0] != base_top[0]:
is_new = curr_top[0] not in base_top
alerts.append(AdAlert(
keyword=kw,
level=resolve(AlertLevel.CRITICAL if is_new else AlertLevel.HIGH),
event="top1_change",
message=(f"Top of Search #1: {base_top[0]} → {curr_top[0]}"
+ (" [NEW ENTRANT]" if is_new else "")),
asin=curr_top[0], timestamp=ts
))
# New Top 3 entrant (excluding #1 already handled)
for asin in set(curr_top) - set(base_top) - {curr_top[0] if curr_top else ""}:
rank = next((a["rank"] for a in current["top_of_search"]
if a["asin"] == asin), -1)
alerts.append(AdAlert(
keyword=kw, level=resolve(AlertLevel.HIGH),
event="new_top3",
message=f"New entrant at Top 3 position #{rank}: {asin}",
asin=asin, timestamp=ts
))
# Top 3 exit
for asin in set(base_top) - set(curr_top):
alerts.append(AdAlert(
keyword=kw, level=resolve(AlertLevel.MEDIUM),
event="top3_exit",
message=f"{asin} exited Top 3 — possible budget reduction",
asin=asin, timestamp=ts
))
# Price drop in Top 3
for asin in set(curr_top) & set(base_top):
old, new = base_prices.get(asin), curr_prices.get(asin)
if old and new and old > 0 and (old - new) / old >= threshold:
drop = (old - new) / old * 100
alerts.append(AdAlert(
keyword=kw, level=resolve(AlertLevel.HIGH),
event="price_drop",
message=f"{asin} price drop {drop:.1f}%: ${old:.2f} → ${new:.2f}",
asin=asin, timestamp=ts
))
return alerts
Step 4: LLM Context Enrichment (Open Claw Layer)
from anthropic import Anthropic
_llm = Anthropic()
def add_llm_context(alert: AdAlert, snapshot: Dict) -> AdAlert:
"""Enrich CRITICAL/HIGH alerts with Claude business analysis"""
if alert.level not in (AlertLevel.CRITICAL, AlertLevel.HIGH):
return alert
top3 = "\n".join([
f" #{a['rank']}: {a['asin']} | {a['brand']} | ${a['price'] or 'N/A'}"
for a in snapshot.get("top_of_search", [])[:3]
])
resp = _llm.messages.create(
model="claude-3-7-sonnet-20250219",
max_tokens=180,
messages=[{"role": "user", "content":
f"Amazon ad monitoring alert — keyword: '{alert.keyword}'\n"
f"Event: {alert.message}\n"
f"Current Top 3:\n{top3}\n\n"
"In 70 words or fewer: likely cause + what the PPC team should check. "
"Direct, specific, no preamble."
}]
)
alert.llm_context = resp.content[0].text
return alert
Step 5: Slack Delivery with Deduplication
import sqlite3
import aiohttp
from datetime import timedelta
DB_PATH = "/data/ad_monitor.db"
SLACK_WEBHOOK = "https://hooks.slack.com/services/YOUR/WEBHOOK"
def init_db():
with sqlite3.connect(DB_PATH) as conn:
conn.executescript("""
CREATE TABLE IF NOT EXISTS snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT, marketplace TEXT,
captured_at TEXT, data_json TEXT
);
CREATE TABLE IF NOT EXISTS sent_alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
keyword TEXT, asin TEXT, event TEXT,
triggered_at TEXT
);
""")
def is_duplicate(alert: AdAlert, window_hours: int = 6) -> bool:
cutoff = (datetime.utcnow() - timedelta(hours=window_hours)).isoformat()
with sqlite3.connect(DB_PATH) as conn:
return bool(conn.execute(
"SELECT 1 FROM sent_alerts WHERE keyword=? AND asin=? AND event=?"
" AND triggered_at>? LIMIT 1",
(alert.keyword, alert.asin, alert.event, cutoff)
).fetchone())
def record_alert(alert: AdAlert):
with sqlite3.connect(DB_PATH) as conn:
conn.execute(
"INSERT INTO sent_alerts (keyword, asin, event, triggered_at)"
" VALUES (?,?,?,?)",
(alert.keyword, alert.asin, alert.event, alert.timestamp)
)
async def send_to_slack(alert: AdAlert):
emoji = {"CRITICAL": "🚨", "HIGH": "⚠️", "MEDIUM": "📊"}.get(
alert.level.value, "ℹ️"
)
text = (
f"{emoji} *[Ad Monitor]* `{alert.level.value}` — "
f"`{alert.keyword}`\n"
f"{alert.message}\n"
f"ASIN: `{alert.asin}` | {alert.timestamp[:16]} UTC"
+ (f"\n> _{alert.llm_context}_" if alert.llm_context else "")
)
async with aiohttp.ClientSession() as session:
await session.post(SLACK_WEBHOOK, json={"text": text},
timeout=aiohttp.ClientTimeout(total=10))
Step 6: Wiring It All Together
import json
async def run_monitoring_cycle(tier: str, marketplace: str = "US"):
"""One complete monitoring run for a keyword tier"""
tier_cfg = KEYWORD_CONFIG["tiers"][tier]
keywords = tier_cfg["keywords"]
if not keywords:
return
snapshots = await batch_monitor(keywords, marketplace)
for snap in snapshots:
if not snap.get("success"):
continue
# Load baseline (second-to-last snapshot)
with sqlite3.connect(DB_PATH) as conn:
row = conn.execute(
"SELECT data_json FROM snapshots WHERE keyword=? AND marketplace=?"
" ORDER BY captured_at DESC LIMIT 1 OFFSET 1",
(snap["keyword"], marketplace)
).fetchone()
baseline = json.loads(row[0]) if row else None
# Save current snapshot
conn.execute(
"INSERT INTO snapshots (keyword, marketplace, captured_at, data_json)"
" VALUES (?,?,?,?)",
(snap["keyword"], marketplace, snap["captured_at"], json.dumps(snap))
)
# Detect + dispatch
alerts = detect_changes(snap, baseline, tier_cfg)
for alert in alerts:
if not is_duplicate(alert, KEYWORD_CONFIG["dedup_window_hours"]):
alert = add_llm_context(alert, snap)
await send_to_slack(alert)
record_alert(alert)
# Entry point — call from APScheduler or cron
async def main():
init_db()
await run_monitoring_cycle("A") # Tier A every 2h
# await run_monitoring_cycle("B") # Tier B every 6h
if __name__ == "__main__":
asyncio.run(main())
Production Notes
Database scaling: SQLite works fine up to ~100 keywords at 2h frequency. Beyond that, switch to PostgreSQL — SQLite's write lock becomes a bottleneck under concurrent writes.
Rate limiting: If you hit HTTP 429, add exponential backoff (start 2s, max 30s) around the session.post call in fetch_serp.
Monitoring the monitor: Set up a heartbeat check — if no rows appear in snapshots for the expected time window, alert your on-call channel. Silent failures in monitoring systems are the worst kind.
Data retention: Keep 90 days of snapshots. At 30 days you see monthly patterns; at 90 days you start recognizing seasonal behaviors and pre-promotion signals.
Further Reading
- Pangolinfo SERP API Docs
- Pangolinfo Scrape API — the data foundation
- Tags: #python #ecommerce #api #automation #amazon
Top comments (0)