TL;DR
Build an automated Amazon keyword ranking monitor in 5 steps:
- Sign up for Pangolinfo Scrape API
- Write a Python script to search keywords and find ASIN rankings
- Store data in PostgreSQL for historical tracking
- Set up automated scheduling with cron
- Configure alerts for significant ranking changes
Time to build: 2-3 hours
Monthly cost: ~$70 (API) + $10 (hosting)
Time saved: 40+ hours/month
The Problem
If you're selling on Amazon, you know that keyword rankings = visibility = sales. But manually checking rankings is:
- β° Time-consuming: 2+ hours daily for 30 keywords
- π Reactive: You discover drops days later
- π No historical data: Can't analyze trends
- π« Doesn't scale: More products = impossible workload
Let's build a better solution.
Prerequisites
Before we start, you'll need:
- Python 3.8+ installed
- Basic Python knowledge
- PostgreSQL (or any database)
- A Pangolinfo account (sign up here)
- 2-3 hours of time
Step 1: Understanding the Scrape API
Pangolinfo's Scrape API provides structured Amazon data without dealing with:
- Anti-bot measures
- HTML parsing
- Proxy rotation
- CAPTCHA solving
API Basics
Authentication: Get a token via email/password
Endpoint: https://scrapeapi.pangolinfo.com/api/v1/scrape
Parser: amzKeyword for search results
Cost: 1 credit per request
Quick Test
# 1. Authenticate
curl -X POST https://scrapeapi.pangolinfo.com/api/v1/auth \
-H 'Content-Type: application/json' \
-d '{"email": "your@email.com", "password": "your_password"}'
# Response: {"code":0, "data":"your_token_here"}
# 2. Search a keyword
curl -X POST https://scrapeapi.pangolinfo.com/api/v1/scrape \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer YOUR_TOKEN' \
-d '{
"url": "https://www.amazon.com/s?k=wireless+earbuds",
"parserName": "amzKeyword",
"format": "json",
"bizContext": {"zipcode": "10041"}
}'
Step 2: Building the Core Monitor
Project Structure
keyword-monitor/
βββ config.py # Configuration
βββ api_client.py # API wrapper
βββ monitor.py # Main monitoring logic
βββ database.py # Database operations
βββ alerts.py # Alert system
βββ requirements.txt # Dependencies
βββ .env # Secrets (gitignored)
Install Dependencies
pip install requests psycopg2-binary python-dotenv pandas
Configuration (config.py)
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
# API Configuration
PANGOLIN_EMAIL = os.getenv('PANGOLIN_EMAIL')
PANGOLIN_PASSWORD = os.getenv('PANGOLIN_PASSWORD')
API_BASE_URL = 'https://scrapeapi.pangolinfo.com'
# Database Configuration
DB_HOST = os.getenv('DB_HOST', 'localhost')
DB_PORT = int(os.getenv('DB_PORT', 5432))
DB_NAME = os.getenv('DB_NAME', 'keyword_monitor')
DB_USER = os.getenv('DB_USER', 'postgres')
DB_PASSWORD = os.getenv('DB_PASSWORD')
# Monitoring Configuration
MAX_PAGES = 3 # Search up to 3 pages
ALERT_THRESHOLD = 5 # Alert if rank changes by 5+
API Client (api_client.py)
import requests
import logging
from typing import Dict, List, Optional
logger = logging.getLogger(__name__)
class ScrapeAPIClient:
"""Wrapper for Pangolinfo Scrape API"""
def __init__(self, email: str, password: str):
self.base_url = "https://scrapeapi.pangolinfo.com"
self.email = email
self.password = password
self.token = None
def authenticate(self) -> bool:
"""Authenticate and get access token"""
url = f"{self.base_url}/api/v1/auth"
payload = {
"email": self.email,
"password": self.password
}
try:
response = requests.post(url, json=payload, timeout=10)
response.raise_for_status()
result = response.json()
if result['code'] == 0:
self.token = result['data']
logger.info("β Authentication successful")
return True
else:
logger.error(f"β Authentication failed: {result['message']}")
return False
except Exception as e:
logger.error(f"β Authentication error: {str(e)}")
return False
def search_keyword(
self,
keyword: str,
marketplace: str = "com",
zipcode: str = "10041",
page: int = 1
) -> Optional[Dict]:
"""
Search for a keyword on Amazon
Args:
keyword: Search term
marketplace: Amazon domain (com, co.uk, de, etc.)
zipcode: Delivery location
page: Page number
Returns:
Parsed search results or None if failed
"""
url = f"{self.base_url}/api/v1/scrape"
# Build Amazon search URL
amazon_url = f"https://www.amazon.{marketplace}/s?k={keyword}&page={page}"
payload = {
"url": amazon_url,
"parserName": "amzKeyword",
"format": "json",
"bizContext": {
"zipcode": zipcode
}
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.token}"
}
try:
response = requests.post(url, json=payload, headers=headers, timeout=30)
response.raise_for_status()
result = response.json()
if result['code'] == 0:
logger.info(f"β Searched '{keyword}' page {page}")
return result['data']
else:
logger.error(f"β Search failed: {result['message']}")
return None
except Exception as e:
logger.error(f"β Search error: {str(e)}")
return None
def find_asin_rank(
self,
keyword: str,
target_asin: str,
marketplace: str = "com",
max_pages: int = 3
) -> Optional[Dict]:
"""
Find the ranking position of an ASIN for a keyword
Args:
keyword: Search keyword
target_asin: ASIN to find
marketplace: Amazon marketplace
max_pages: Maximum pages to search
Returns:
Ranking information or None if not found
"""
for page in range(1, max_pages + 1):
data = self.search_keyword(keyword, marketplace, page=page)
if not data or 'json' not in data:
continue
# Parse results
results = data['json'][0]['data']['results']
for index, product in enumerate(results):
if product.get('asin') == target_asin:
# Calculate absolute rank (48 products per page)
rank = (page - 1) * 48 + index + 1
logger.info(f"β Found {target_asin} at rank {rank}")
return {
'keyword': keyword,
'asin': target_asin,
'rank': rank,
'page': page,
'position': index + 1,
'title': product.get('title', ''),
'price': product.get('price', ''),
'rating': product.get('star', ''),
'reviews': product.get('rating', 0),
'image': product.get('image', '')
}
logger.warning(f"β {target_asin} not found in top {max_pages * 48} results")
return None
Database Setup (database.py)
import psycopg2
from psycopg2.extras import execute_values
from typing import List, Dict
import logging
logger = logging.getLogger(__name__)
class Database:
"""PostgreSQL database handler"""
def __init__(self, config):
self.config = config
self.conn = None
def connect(self):
"""Connect to database"""
try:
self.conn = psycopg2.connect(
host=self.config.DB_HOST,
port=self.config.DB_PORT,
database=self.config.DB_NAME,
user=self.config.DB_USER,
password=self.config.DB_PASSWORD
)
logger.info("β Database connected")
except Exception as e:
logger.error(f"β Database connection failed: {str(e)}")
raise
def create_tables(self):
"""Create necessary tables"""
create_table_sql = """
CREATE TABLE IF NOT EXISTS keyword_rankings (
id SERIAL PRIMARY KEY,
keyword VARCHAR(255) NOT NULL,
asin VARCHAR(20) NOT NULL,
marketplace VARCHAR(10) NOT NULL DEFAULT 'com',
rank INTEGER,
page INTEGER,
position INTEGER,
title TEXT,
price VARCHAR(50),
rating DECIMAL(3,2),
reviews INTEGER,
image TEXT,
timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_keyword_asin
ON keyword_rankings(keyword, asin);
CREATE INDEX IF NOT EXISTS idx_timestamp
ON keyword_rankings(timestamp);
"""
cursor = self.conn.cursor()
cursor.execute(create_table_sql)
self.conn.commit()
cursor.close()
logger.info("β Tables created")
def save_ranking(self, ranking: Dict):
"""Save a single ranking record"""
insert_sql = """
INSERT INTO keyword_rankings
(keyword, asin, marketplace, rank, page, position,
title, price, rating, reviews, image)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor = self.conn.cursor()
cursor.execute(insert_sql, (
ranking['keyword'],
ranking['asin'],
ranking.get('marketplace', 'com'),
ranking.get('rank'),
ranking.get('page'),
ranking.get('position'),
ranking.get('title'),
ranking.get('price'),
ranking.get('rating'),
ranking.get('reviews'),
ranking.get('image')
))
self.conn.commit()
cursor.close()
logger.info(f"β Saved ranking: {ranking['keyword']} - {ranking['asin']}")
def get_latest_rankings(self) -> List[Dict]:
"""Get the most recent rankings for all keyword-ASIN pairs"""
query = """
SELECT DISTINCT ON (keyword, asin)
keyword, asin, rank, timestamp
FROM keyword_rankings
ORDER BY keyword, asin, timestamp DESC
"""
cursor = self.conn.cursor()
cursor.execute(query)
results = cursor.fetchall()
cursor.close()
return [
{
'keyword': r[0],
'asin': r[1],
'rank': r[2],
'timestamp': r[3]
}
for r in results
]
Main Monitor (monitor.py)
import logging
import time
from datetime import datetime
from typing import List, Dict
from config import Config
from api_client import ScrapeAPIClient
from database import Database
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class KeywordMonitor:
"""Main monitoring orchestrator"""
def __init__(self):
self.config = Config()
self.api = ScrapeAPIClient(
self.config.PANGOLIN_EMAIL,
self.config.PANGOLIN_PASSWORD
)
self.db = Database(self.config)
def setup(self):
"""Initialize system"""
logger.info("π Starting Keyword Monitor Setup")
# Authenticate API
if not self.api.authenticate():
raise Exception("API authentication failed")
# Connect database
self.db.connect()
self.db.create_tables()
logger.info("β Setup complete")
def monitor_keywords(self, keywords: List[Dict]):
"""
Monitor a list of keyword-ASIN pairs
Args:
keywords: List of dicts with 'keyword', 'asin', 'marketplace'
"""
logger.info(f"π Monitoring {len(keywords)} keyword-ASIN pairs")
results = []
for item in keywords:
keyword = item['keyword']
asin = item['asin']
marketplace = item.get('marketplace', 'com')
logger.info(f"π Checking: {keyword} - {asin}")
# Find ranking
ranking = self.api.find_asin_rank(
keyword,
asin,
marketplace,
self.config.MAX_PAGES
)
if ranking:
# Save to database
self.db.save_ranking(ranking)
results.append(ranking)
else:
# Save as "not found"
self.db.save_ranking({
'keyword': keyword,
'asin': asin,
'marketplace': marketplace,
'rank': None
})
# Rate limiting (2 requests per second)
time.sleep(0.5)
logger.info(f"β Monitoring complete: {len(results)}/{len(keywords)} found")
return results
def main():
"""Main entry point"""
monitor = KeywordMonitor()
monitor.setup()
# Define keywords to monitor
keywords = [
{
'keyword': 'wireless earbuds',
'asin': 'B08XYZ123',
'marketplace': 'com'
},
{
'keyword': 'bluetooth speaker',
'asin': 'B09ABC456',
'marketplace': 'com'
},
{
'keyword': 'phone case',
'asin': 'B07DEF789',
'marketplace': 'com'
}
]
# Run monitoring
results = monitor.monitor_keywords(keywords)
# Print summary
print("\nπ Monitoring Results:")
print("-" * 60)
for r in results:
print(f"{r['keyword']:30} | Rank: {r['rank']:3} | Page: {r['page']}")
print("-" * 60)
if __name__ == "__main__":
main()
Step 3: Setting Up Automation
Create Cron Job
# Edit crontab
crontab -e
# Add this line to run every 6 hours
0 */6 * * * cd /path/to/keyword-monitor && /usr/bin/python3 monitor.py >> logs/monitor.log 2>&1
Alternative: systemd Timer (Linux)
# /etc/systemd/system/keyword-monitor.service
[Unit]
Description=Amazon Keyword Ranking Monitor
[Service]
Type=oneshot
User=your_user
WorkingDirectory=/path/to/keyword-monitor
ExecStart=/usr/bin/python3 monitor.py
# /etc/systemd/system/keyword-monitor.timer
[Unit]
Description=Run Keyword Monitor every 6 hours
[Timer]
OnBootSec=5min
OnUnitActiveSec=6h
[Install]
WantedBy=timers.target
Enable and start:
sudo systemctl enable keyword-monitor.timer
sudo systemctl start keyword-monitor.timer
Step 4: Adding Alerts
Slack Integration (alerts.py)
import requests
import logging
from typing import List, Dict
logger = logging.getLogger(__name__)
class AlertService:
"""Send alerts for ranking changes"""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def send_slack_alert(self, changes: List[Dict]):
"""Send Slack notification for ranking changes"""
if not changes:
return
# Build message
message = {
"text": "π¨ *Amazon Keyword Ranking Changes*",
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": "π¨ Ranking Changes Detected"
}
}
]
}
for change in changes[:10]: # Max 10 changes
emoji = "π" if change['change'] > 0 else "π"
message["blocks"].append({
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"{emoji} *{change['keyword']}* (ASIN: {change['asin']})\n"
f"Current: #{change['current_rank']} | "
f"Previous: #{change['previous_rank']} | "
f"Change: {change['change']:+d}"
}
})
# Send to Slack
try:
response = requests.post(self.webhook_url, json=message, timeout=10)
response.raise_for_status()
logger.info("β Slack alert sent")
except Exception as e:
logger.error(f"β Slack alert failed: {str(e)}")
def detect_changes(db: Database, threshold: int = 5) -> List[Dict]:
"""Detect significant ranking changes"""
query = """
WITH latest AS (
SELECT DISTINCT ON (keyword, asin)
keyword, asin, rank, timestamp
FROM keyword_rankings
ORDER BY keyword, asin, timestamp DESC
),
previous AS (
SELECT DISTINCT ON (keyword, asin)
keyword, asin, rank, timestamp
FROM keyword_rankings
WHERE timestamp < (SELECT MAX(timestamp) FROM keyword_rankings)
ORDER BY keyword, asin, timestamp DESC
)
SELECT
l.keyword,
l.asin,
l.rank AS current_rank,
p.rank AS previous_rank,
(p.rank - l.rank) AS change
FROM latest l
JOIN previous p ON l.keyword = p.keyword AND l.asin = p.asin
WHERE ABS(p.rank - l.rank) >= %s
"""
cursor = db.conn.cursor()
cursor.execute(query, (threshold,))
results = cursor.fetchall()
cursor.close()
return [
{
'keyword': r[0],
'asin': r[1],
'current_rank': r[2],
'previous_rank': r[3],
'change': r[4]
}
for r in results
]
Update monitor.py to include alerts:
from alerts import AlertService, detect_changes
# In main():
# After monitoring, check for changes
changes = detect_changes(monitor.db, threshold=5)
if changes:
alert = AlertService(os.getenv('SLACK_WEBHOOK_URL'))
alert.send_slack_alert(changes)
Step 5: Visualization Dashboard
Simple Flask Dashboard
# dashboard.py
from flask import Flask, render_template
import pandas as pd
from database import Database
from config import Config
app = Flask(__name__)
db = Database(Config())
db.connect()
@app.route('/')
def index():
"""Main dashboard"""
# Get latest rankings
query = """
SELECT DISTINCT ON (keyword, asin)
keyword, asin, rank, timestamp
FROM keyword_rankings
ORDER BY keyword, asin, timestamp DESC
"""
df = pd.read_sql_query(query, db.conn)
return render_template('dashboard.html', rankings=df.to_dict('records'))
@app.route('/history/<keyword>/<asin>')
def history(keyword, asin):
"""Ranking history for a keyword-ASIN pair"""
query = """
SELECT rank, timestamp
FROM keyword_rankings
WHERE keyword = %s AND asin = %s
ORDER BY timestamp DESC
LIMIT 100
"""
df = pd.read_sql_query(query, db.conn, params=(keyword, asin))
return render_template('history.html',
keyword=keyword,
asin=asin,
data=df.to_dict('records'))
if __name__ == '__main__':
app.run(debug=True, port=5000)
Testing
Unit Tests
# test_monitor.py
import unittest
from api_client import ScrapeAPIClient
from config import Config
class TestScrapeAPI(unittest.TestCase):
def setUp(self):
config = Config()
self.client = ScrapeAPIClient(
config.PANGOLIN_EMAIL,
config.PANGOLIN_PASSWORD
)
def test_authentication(self):
"""Test API authentication"""
result = self.client.authenticate()
self.assertTrue(result)
self.assertIsNotNone(self.client.token)
def test_keyword_search(self):
"""Test keyword search"""
self.client.authenticate()
result = self.client.search_keyword('wireless earbuds')
self.assertIsNotNone(result)
self.assertIn('json', result)
def test_find_asin_rank(self):
"""Test ASIN ranking"""
self.client.authenticate()
result = self.client.find_asin_rank(
'wireless earbuds',
'B08XYZ123'
)
# May or may not find (depends on actual ranking)
self.assertIsInstance(result, (dict, type(None)))
if __name__ == '__main__':
unittest.main()
Run tests:
python -m pytest test_monitor.py -v
Deployment
Docker Setup
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application
COPY . .
# Run monitor
CMD ["python", "monitor.py"]
# docker-compose.yml
version: '3.8'
services:
monitor:
build: .
environment:
- PANGOLIN_EMAIL=${PANGOLIN_EMAIL}
- PANGOLIN_PASSWORD=${PANGOLIN_PASSWORD}
- DB_HOST=postgres
- DB_NAME=keyword_monitor
- DB_USER=postgres
- DB_PASSWORD=${DB_PASSWORD}
depends_on:
- postgres
postgres:
image: postgres:14-alpine
environment:
- POSTGRES_DB=keyword_monitor
- POSTGRES_PASSWORD=${DB_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
volumes:
postgres_data:
Deploy:
docker-compose up -d
Cost Analysis
Monthly Costs
| Item | Cost |
|---|---|
| Scrape API (80 keywords, 4x/day) | ~$70 |
| DigitalOcean Droplet (1GB) | $10 |
| PostgreSQL (managed, optional) | $15 |
| Total | $80-95/month |
ROI Calculation
- Time saved: 40 hours/month (@ $50/hr = $2,000)
- Cost: $95/month
- ROI: 2,005%
Troubleshooting
Common Issues
Issue: API authentication fails
Solution: Check email/password, ensure account has credits
Issue: ASIN not found
Solution: Increase MAX_PAGES, verify ASIN is correct
Issue: Database connection error
Solution: Check PostgreSQL is running, verify credentials
Issue: Rate limiting
Solution: Add delays between requests (time.sleep(0.5))
Next Steps
Enhancements
- Multi-marketplace support: Monitor US, UK, DE, FR simultaneously
- Competitor tracking: Track top 10 competitors for each keyword
- Trend analysis: ML models to predict ranking changes
- Mobile app: React Native app for on-the-go monitoring
- Advanced alerts: Telegram, SMS, email integration
Resources
Conclusion
You now have a production-ready Amazon keyword ranking monitor that:
β
Runs automatically every 6 hours
β
Stores historical data for trend analysis
β
Sends alerts for significant changes
β
Costs less than $100/month
β
Saves 40+ hours/month
The best part? It's completely customizable. Add features, integrate with your existing tools, and scale as your business grows.
Ready to build yours? Sign up for Pangolinfo Scrape API and start monitoring in minutes.
Questions?
Drop your questions in the comments! I'll answer every one.
Found this helpful? Give it a β€οΈ and share with fellow Amazon sellers!
Top comments (1)
If you're trying to run this in production, you gotta tighten things up a bit. First off add some exponential backoff because the code doesn't handle those 429 rate-limit errors which'll bite you eventually. If you're monitoring 500+ keywords, swap out your database for TimescaleDB to keep your indexes from getting bloated. You'll also want to implement hysteresis-based alerts. Basically make sure something happens twice in a row before you freak out because Amazon's platform is naturally noisy and you'll get hammered with false alarms otherwise. Here's the real kicker though - Amazon doesn't actually stick products in fixed 48-per-page slots anymore, they throw in sponsored listings and dynamic content all over the place, so your rank calculation (page - 1) * 48 + index is gonna be off. Once you start scaling past 100 keywords think about tiered monitoring like hit the top 20 keywords every hour, check the rest daily and throw Redis caching in there to avoid hammering the same API calls within 5-minute windows