DEV Community

Cover image for Building an Amazon Keyword Ranking Monitor with Scrape API
Mox Loop
Mox Loop

Posted on

Building an Amazon Keyword Ranking Monitor with Scrape API

TL;DR

Build an automated Amazon keyword ranking monitor in 5 steps:

  1. Sign up for Pangolinfo Scrape API
  2. Write a Python script to search keywords and find ASIN rankings
  3. Store data in PostgreSQL for historical tracking
  4. Set up automated scheduling with cron
  5. Configure alerts for significant ranking changes

Time to build: 2-3 hours

Monthly cost: ~$70 (API) + $10 (hosting)

Time saved: 40+ hours/month


The Problem

If you're selling on Amazon, you know that keyword rankings = visibility = sales. But manually checking rankings is:

  • ⏰ Time-consuming: 2+ hours daily for 30 keywords
  • πŸ“‰ Reactive: You discover drops days later
  • πŸ“Š No historical data: Can't analyze trends
  • 🚫 Doesn't scale: More products = impossible workload

Let's build a better solution.


Prerequisites

Before we start, you'll need:

  • Python 3.8+ installed
  • Basic Python knowledge
  • PostgreSQL (or any database)
  • A Pangolinfo account (sign up here)
  • 2-3 hours of time

Step 1: Understanding the Scrape API

Pangolinfo's Scrape API provides structured Amazon data without dealing with:

  • Anti-bot measures
  • HTML parsing
  • Proxy rotation
  • CAPTCHA solving

API Basics

Authentication: Get a token via email/password
Endpoint: https://scrapeapi.pangolinfo.com/api/v1/scrape
Parser: amzKeyword for search results
Cost: 1 credit per request

Quick Test

# 1. Authenticate
curl -X POST https://scrapeapi.pangolinfo.com/api/v1/auth \
  -H 'Content-Type: application/json' \
  -d '{"email": "your@email.com", "password": "your_password"}'

# Response: {"code":0, "data":"your_token_here"}

# 2. Search a keyword
curl -X POST https://scrapeapi.pangolinfo.com/api/v1/scrape \
  -H 'Content-Type: application/json' \
  -H 'Authorization: Bearer YOUR_TOKEN' \
  -d '{
    "url": "https://www.amazon.com/s?k=wireless+earbuds",
    "parserName": "amzKeyword",
    "format": "json",
    "bizContext": {"zipcode": "10041"}
  }'
Enter fullscreen mode Exit fullscreen mode

Step 2: Building the Core Monitor

Project Structure

keyword-monitor/
β”œβ”€β”€ config.py          # Configuration
β”œβ”€β”€ api_client.py      # API wrapper
β”œβ”€β”€ monitor.py         # Main monitoring logic
β”œβ”€β”€ database.py        # Database operations
β”œβ”€β”€ alerts.py          # Alert system
β”œβ”€β”€ requirements.txt   # Dependencies
└── .env              # Secrets (gitignored)
Enter fullscreen mode Exit fullscreen mode

Install Dependencies

pip install requests psycopg2-binary python-dotenv pandas
Enter fullscreen mode Exit fullscreen mode

Configuration (config.py)

import os
from dotenv import load_dotenv

load_dotenv()

class Config:
    # API Configuration
    PANGOLIN_EMAIL = os.getenv('PANGOLIN_EMAIL')
    PANGOLIN_PASSWORD = os.getenv('PANGOLIN_PASSWORD')
    API_BASE_URL = 'https://scrapeapi.pangolinfo.com'

    # Database Configuration
    DB_HOST = os.getenv('DB_HOST', 'localhost')
    DB_PORT = int(os.getenv('DB_PORT', 5432))
    DB_NAME = os.getenv('DB_NAME', 'keyword_monitor')
    DB_USER = os.getenv('DB_USER', 'postgres')
    DB_PASSWORD = os.getenv('DB_PASSWORD')

    # Monitoring Configuration
    MAX_PAGES = 3  # Search up to 3 pages
    ALERT_THRESHOLD = 5  # Alert if rank changes by 5+
Enter fullscreen mode Exit fullscreen mode

API Client (api_client.py)

import requests
import logging
from typing import Dict, List, Optional

logger = logging.getLogger(__name__)

class ScrapeAPIClient:
    """Wrapper for Pangolinfo Scrape API"""

    def __init__(self, email: str, password: str):
        self.base_url = "https://scrapeapi.pangolinfo.com"
        self.email = email
        self.password = password
        self.token = None

    def authenticate(self) -> bool:
        """Authenticate and get access token"""
        url = f"{self.base_url}/api/v1/auth"
        payload = {
            "email": self.email,
            "password": self.password
        }

        try:
            response = requests.post(url, json=payload, timeout=10)
            response.raise_for_status()
            result = response.json()

            if result['code'] == 0:
                self.token = result['data']
                logger.info("βœ“ Authentication successful")
                return True
            else:
                logger.error(f"βœ— Authentication failed: {result['message']}")
                return False
        except Exception as e:
            logger.error(f"βœ— Authentication error: {str(e)}")
            return False

    def search_keyword(
        self, 
        keyword: str, 
        marketplace: str = "com",
        zipcode: str = "10041",
        page: int = 1
    ) -> Optional[Dict]:
        """
        Search for a keyword on Amazon

        Args:
            keyword: Search term
            marketplace: Amazon domain (com, co.uk, de, etc.)
            zipcode: Delivery location
            page: Page number

        Returns:
            Parsed search results or None if failed
        """
        url = f"{self.base_url}/api/v1/scrape"

        # Build Amazon search URL
        amazon_url = f"https://www.amazon.{marketplace}/s?k={keyword}&page={page}"

        payload = {
            "url": amazon_url,
            "parserName": "amzKeyword",
            "format": "json",
            "bizContext": {
                "zipcode": zipcode
            }
        }

        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.token}"
        }

        try:
            response = requests.post(url, json=payload, headers=headers, timeout=30)
            response.raise_for_status()
            result = response.json()

            if result['code'] == 0:
                logger.info(f"βœ“ Searched '{keyword}' page {page}")
                return result['data']
            else:
                logger.error(f"βœ— Search failed: {result['message']}")
                return None
        except Exception as e:
            logger.error(f"βœ— Search error: {str(e)}")
            return None

    def find_asin_rank(
        self,
        keyword: str,
        target_asin: str,
        marketplace: str = "com",
        max_pages: int = 3
    ) -> Optional[Dict]:
        """
        Find the ranking position of an ASIN for a keyword

        Args:
            keyword: Search keyword
            target_asin: ASIN to find
            marketplace: Amazon marketplace
            max_pages: Maximum pages to search

        Returns:
            Ranking information or None if not found
        """
        for page in range(1, max_pages + 1):
            data = self.search_keyword(keyword, marketplace, page=page)

            if not data or 'json' not in data:
                continue

            # Parse results
            results = data['json'][0]['data']['results']

            for index, product in enumerate(results):
                if product.get('asin') == target_asin:
                    # Calculate absolute rank (48 products per page)
                    rank = (page - 1) * 48 + index + 1

                    logger.info(f"βœ“ Found {target_asin} at rank {rank}")

                    return {
                        'keyword': keyword,
                        'asin': target_asin,
                        'rank': rank,
                        'page': page,
                        'position': index + 1,
                        'title': product.get('title', ''),
                        'price': product.get('price', ''),
                        'rating': product.get('star', ''),
                        'reviews': product.get('rating', 0),
                        'image': product.get('image', '')
                    }

        logger.warning(f"βœ— {target_asin} not found in top {max_pages * 48} results")
        return None
Enter fullscreen mode Exit fullscreen mode

Database Setup (database.py)

import psycopg2
from psycopg2.extras import execute_values
from typing import List, Dict
import logging

logger = logging.getLogger(__name__)

class Database:
    """PostgreSQL database handler"""

    def __init__(self, config):
        self.config = config
        self.conn = None

    def connect(self):
        """Connect to database"""
        try:
            self.conn = psycopg2.connect(
                host=self.config.DB_HOST,
                port=self.config.DB_PORT,
                database=self.config.DB_NAME,
                user=self.config.DB_USER,
                password=self.config.DB_PASSWORD
            )
            logger.info("βœ“ Database connected")
        except Exception as e:
            logger.error(f"βœ— Database connection failed: {str(e)}")
            raise

    def create_tables(self):
        """Create necessary tables"""
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS keyword_rankings (
            id SERIAL PRIMARY KEY,
            keyword VARCHAR(255) NOT NULL,
            asin VARCHAR(20) NOT NULL,
            marketplace VARCHAR(10) NOT NULL DEFAULT 'com',
            rank INTEGER,
            page INTEGER,
            position INTEGER,
            title TEXT,
            price VARCHAR(50),
            rating DECIMAL(3,2),
            reviews INTEGER,
            image TEXT,
            timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        CREATE INDEX IF NOT EXISTS idx_keyword_asin 
            ON keyword_rankings(keyword, asin);
        CREATE INDEX IF NOT EXISTS idx_timestamp 
            ON keyword_rankings(timestamp);
        """

        cursor = self.conn.cursor()
        cursor.execute(create_table_sql)
        self.conn.commit()
        cursor.close()
        logger.info("βœ“ Tables created")

    def save_ranking(self, ranking: Dict):
        """Save a single ranking record"""
        insert_sql = """
        INSERT INTO keyword_rankings 
        (keyword, asin, marketplace, rank, page, position, 
         title, price, rating, reviews, image)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """

        cursor = self.conn.cursor()
        cursor.execute(insert_sql, (
            ranking['keyword'],
            ranking['asin'],
            ranking.get('marketplace', 'com'),
            ranking.get('rank'),
            ranking.get('page'),
            ranking.get('position'),
            ranking.get('title'),
            ranking.get('price'),
            ranking.get('rating'),
            ranking.get('reviews'),
            ranking.get('image')
        ))
        self.conn.commit()
        cursor.close()
        logger.info(f"βœ“ Saved ranking: {ranking['keyword']} - {ranking['asin']}")

    def get_latest_rankings(self) -> List[Dict]:
        """Get the most recent rankings for all keyword-ASIN pairs"""
        query = """
        SELECT DISTINCT ON (keyword, asin)
            keyword, asin, rank, timestamp
        FROM keyword_rankings
        ORDER BY keyword, asin, timestamp DESC
        """

        cursor = self.conn.cursor()
        cursor.execute(query)
        results = cursor.fetchall()
        cursor.close()

        return [
            {
                'keyword': r[0],
                'asin': r[1],
                'rank': r[2],
                'timestamp': r[3]
            }
            for r in results
        ]
Enter fullscreen mode Exit fullscreen mode

Main Monitor (monitor.py)

import logging
import time
from datetime import datetime
from typing import List, Dict
from config import Config
from api_client import ScrapeAPIClient
from database import Database

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class KeywordMonitor:
    """Main monitoring orchestrator"""

    def __init__(self):
        self.config = Config()
        self.api = ScrapeAPIClient(
            self.config.PANGOLIN_EMAIL,
            self.config.PANGOLIN_PASSWORD
        )
        self.db = Database(self.config)

    def setup(self):
        """Initialize system"""
        logger.info("πŸš€ Starting Keyword Monitor Setup")

        # Authenticate API
        if not self.api.authenticate():
            raise Exception("API authentication failed")

        # Connect database
        self.db.connect()
        self.db.create_tables()

        logger.info("βœ“ Setup complete")

    def monitor_keywords(self, keywords: List[Dict]):
        """
        Monitor a list of keyword-ASIN pairs

        Args:
            keywords: List of dicts with 'keyword', 'asin', 'marketplace'
        """
        logger.info(f"πŸ“Š Monitoring {len(keywords)} keyword-ASIN pairs")

        results = []

        for item in keywords:
            keyword = item['keyword']
            asin = item['asin']
            marketplace = item.get('marketplace', 'com')

            logger.info(f"πŸ” Checking: {keyword} - {asin}")

            # Find ranking
            ranking = self.api.find_asin_rank(
                keyword, 
                asin, 
                marketplace,
                self.config.MAX_PAGES
            )

            if ranking:
                # Save to database
                self.db.save_ranking(ranking)
                results.append(ranking)
            else:
                # Save as "not found"
                self.db.save_ranking({
                    'keyword': keyword,
                    'asin': asin,
                    'marketplace': marketplace,
                    'rank': None
                })

            # Rate limiting (2 requests per second)
            time.sleep(0.5)

        logger.info(f"βœ“ Monitoring complete: {len(results)}/{len(keywords)} found")
        return results

def main():
    """Main entry point"""
    monitor = KeywordMonitor()
    monitor.setup()

    # Define keywords to monitor
    keywords = [
        {
            'keyword': 'wireless earbuds',
            'asin': 'B08XYZ123',
            'marketplace': 'com'
        },
        {
            'keyword': 'bluetooth speaker',
            'asin': 'B09ABC456',
            'marketplace': 'com'
        },
        {
            'keyword': 'phone case',
            'asin': 'B07DEF789',
            'marketplace': 'com'
        }
    ]

    # Run monitoring
    results = monitor.monitor_keywords(keywords)

    # Print summary
    print("\nπŸ“ˆ Monitoring Results:")
    print("-" * 60)
    for r in results:
        print(f"{r['keyword']:30} | Rank: {r['rank']:3} | Page: {r['page']}")
    print("-" * 60)

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Step 3: Setting Up Automation

Create Cron Job

# Edit crontab
crontab -e

# Add this line to run every 6 hours
0 */6 * * * cd /path/to/keyword-monitor && /usr/bin/python3 monitor.py >> logs/monitor.log 2>&1
Enter fullscreen mode Exit fullscreen mode

Alternative: systemd Timer (Linux)

# /etc/systemd/system/keyword-monitor.service
[Unit]
Description=Amazon Keyword Ranking Monitor

[Service]
Type=oneshot
User=your_user
WorkingDirectory=/path/to/keyword-monitor
ExecStart=/usr/bin/python3 monitor.py

# /etc/systemd/system/keyword-monitor.timer
[Unit]
Description=Run Keyword Monitor every 6 hours

[Timer]
OnBootSec=5min
OnUnitActiveSec=6h

[Install]
WantedBy=timers.target
Enter fullscreen mode Exit fullscreen mode

Enable and start:

sudo systemctl enable keyword-monitor.timer
sudo systemctl start keyword-monitor.timer
Enter fullscreen mode Exit fullscreen mode

Step 4: Adding Alerts

Slack Integration (alerts.py)

import requests
import logging
from typing import List, Dict

logger = logging.getLogger(__name__)

class AlertService:
    """Send alerts for ranking changes"""

    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url

    def send_slack_alert(self, changes: List[Dict]):
        """Send Slack notification for ranking changes"""

        if not changes:
            return

        # Build message
        message = {
            "text": "🚨 *Amazon Keyword Ranking Changes*",
            "blocks": [
                {
                    "type": "header",
                    "text": {
                        "type": "plain_text",
                        "text": "🚨 Ranking Changes Detected"
                    }
                }
            ]
        }

        for change in changes[:10]:  # Max 10 changes
            emoji = "πŸ“ˆ" if change['change'] > 0 else "πŸ“‰"

            message["blocks"].append({
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f"{emoji} *{change['keyword']}* (ASIN: {change['asin']})\n"
                            f"Current: #{change['current_rank']} | "
                            f"Previous: #{change['previous_rank']} | "
                            f"Change: {change['change']:+d}"
                }
            })

        # Send to Slack
        try:
            response = requests.post(self.webhook_url, json=message, timeout=10)
            response.raise_for_status()
            logger.info("βœ“ Slack alert sent")
        except Exception as e:
            logger.error(f"βœ— Slack alert failed: {str(e)}")

def detect_changes(db: Database, threshold: int = 5) -> List[Dict]:
    """Detect significant ranking changes"""

    query = """
    WITH latest AS (
        SELECT DISTINCT ON (keyword, asin)
            keyword, asin, rank, timestamp
        FROM keyword_rankings
        ORDER BY keyword, asin, timestamp DESC
    ),
    previous AS (
        SELECT DISTINCT ON (keyword, asin)
            keyword, asin, rank, timestamp
        FROM keyword_rankings
        WHERE timestamp < (SELECT MAX(timestamp) FROM keyword_rankings)
        ORDER BY keyword, asin, timestamp DESC
    )
    SELECT 
        l.keyword,
        l.asin,
        l.rank AS current_rank,
        p.rank AS previous_rank,
        (p.rank - l.rank) AS change
    FROM latest l
    JOIN previous p ON l.keyword = p.keyword AND l.asin = p.asin
    WHERE ABS(p.rank - l.rank) >= %s
    """

    cursor = db.conn.cursor()
    cursor.execute(query, (threshold,))
    results = cursor.fetchall()
    cursor.close()

    return [
        {
            'keyword': r[0],
            'asin': r[1],
            'current_rank': r[2],
            'previous_rank': r[3],
            'change': r[4]
        }
        for r in results
    ]
Enter fullscreen mode Exit fullscreen mode

Update monitor.py to include alerts:

from alerts import AlertService, detect_changes

# In main():
# After monitoring, check for changes
changes = detect_changes(monitor.db, threshold=5)

if changes:
    alert = AlertService(os.getenv('SLACK_WEBHOOK_URL'))
    alert.send_slack_alert(changes)
Enter fullscreen mode Exit fullscreen mode

Step 5: Visualization Dashboard

Simple Flask Dashboard

# dashboard.py
from flask import Flask, render_template
import pandas as pd
from database import Database
from config import Config

app = Flask(__name__)
db = Database(Config())
db.connect()

@app.route('/')
def index():
    """Main dashboard"""

    # Get latest rankings
    query = """
    SELECT DISTINCT ON (keyword, asin)
        keyword, asin, rank, timestamp
    FROM keyword_rankings
    ORDER BY keyword, asin, timestamp DESC
    """

    df = pd.read_sql_query(query, db.conn)

    return render_template('dashboard.html', rankings=df.to_dict('records'))

@app.route('/history/<keyword>/<asin>')
def history(keyword, asin):
    """Ranking history for a keyword-ASIN pair"""

    query = """
    SELECT rank, timestamp
    FROM keyword_rankings
    WHERE keyword = %s AND asin = %s
    ORDER BY timestamp DESC
    LIMIT 100
    """

    df = pd.read_sql_query(query, db.conn, params=(keyword, asin))

    return render_template('history.html', 
                         keyword=keyword, 
                         asin=asin,
                         data=df.to_dict('records'))

if __name__ == '__main__':
    app.run(debug=True, port=5000)
Enter fullscreen mode Exit fullscreen mode

Testing

Unit Tests

# test_monitor.py
import unittest
from api_client import ScrapeAPIClient
from config import Config

class TestScrapeAPI(unittest.TestCase):

    def setUp(self):
        config = Config()
        self.client = ScrapeAPIClient(
            config.PANGOLIN_EMAIL,
            config.PANGOLIN_PASSWORD
        )

    def test_authentication(self):
        """Test API authentication"""
        result = self.client.authenticate()
        self.assertTrue(result)
        self.assertIsNotNone(self.client.token)

    def test_keyword_search(self):
        """Test keyword search"""
        self.client.authenticate()
        result = self.client.search_keyword('wireless earbuds')
        self.assertIsNotNone(result)
        self.assertIn('json', result)

    def test_find_asin_rank(self):
        """Test ASIN ranking"""
        self.client.authenticate()
        result = self.client.find_asin_rank(
            'wireless earbuds',
            'B08XYZ123'
        )
        # May or may not find (depends on actual ranking)
        self.assertIsInstance(result, (dict, type(None)))

if __name__ == '__main__':
    unittest.main()
Enter fullscreen mode Exit fullscreen mode

Run tests:

python -m pytest test_monitor.py -v
Enter fullscreen mode Exit fullscreen mode

Deployment

Docker Setup

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Run monitor
CMD ["python", "monitor.py"]
Enter fullscreen mode Exit fullscreen mode
# docker-compose.yml
version: '3.8'

services:
  monitor:
    build: .
    environment:
      - PANGOLIN_EMAIL=${PANGOLIN_EMAIL}
      - PANGOLIN_PASSWORD=${PANGOLIN_PASSWORD}
      - DB_HOST=postgres
      - DB_NAME=keyword_monitor
      - DB_USER=postgres
      - DB_PASSWORD=${DB_PASSWORD}
    depends_on:
      - postgres

  postgres:
    image: postgres:14-alpine
    environment:
      - POSTGRES_DB=keyword_monitor
      - POSTGRES_PASSWORD=${DB_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"

volumes:
  postgres_data:
Enter fullscreen mode Exit fullscreen mode

Deploy:

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

Cost Analysis

Monthly Costs

Item Cost
Scrape API (80 keywords, 4x/day) ~$70
DigitalOcean Droplet (1GB) $10
PostgreSQL (managed, optional) $15
Total $80-95/month

ROI Calculation

  • Time saved: 40 hours/month (@ $50/hr = $2,000)
  • Cost: $95/month
  • ROI: 2,005%

Troubleshooting

Common Issues

Issue: API authentication fails

Solution: Check email/password, ensure account has credits

Issue: ASIN not found

Solution: Increase MAX_PAGES, verify ASIN is correct

Issue: Database connection error

Solution: Check PostgreSQL is running, verify credentials

Issue: Rate limiting

Solution: Add delays between requests (time.sleep(0.5))


Next Steps

Enhancements

  1. Multi-marketplace support: Monitor US, UK, DE, FR simultaneously
  2. Competitor tracking: Track top 10 competitors for each keyword
  3. Trend analysis: ML models to predict ranking changes
  4. Mobile app: React Native app for on-the-go monitoring
  5. Advanced alerts: Telegram, SMS, email integration

Resources


Conclusion

You now have a production-ready Amazon keyword ranking monitor that:

βœ… Runs automatically every 6 hours

βœ… Stores historical data for trend analysis

βœ… Sends alerts for significant changes

βœ… Costs less than $100/month

βœ… Saves 40+ hours/month

The best part? It's completely customizable. Add features, integrate with your existing tools, and scale as your business grows.

Ready to build yours? Sign up for Pangolinfo Scrape API and start monitoring in minutes.


Questions?

Drop your questions in the comments! I'll answer every one.

Found this helpful? Give it a ❀️ and share with fellow Amazon sellers!

python #amazon #automation #api #tutorial

Top comments (1)

Collapse
 
onlineproxyio profile image
OnlineProxy

If you're trying to run this in production, you gotta tighten things up a bit. First off add some exponential backoff because the code doesn't handle those 429 rate-limit errors which'll bite you eventually. If you're monitoring 500+ keywords, swap out your database for TimescaleDB to keep your indexes from getting bloated. You'll also want to implement hysteresis-based alerts. Basically make sure something happens twice in a row before you freak out because Amazon's platform is naturally noisy and you'll get hammered with false alarms otherwise. Here's the real kicker though - Amazon doesn't actually stick products in fixed 48-per-page slots anymore, they throw in sponsored listings and dynamic content all over the place, so your rank calculation (page - 1) * 48 + index is gonna be off. Once you start scaling past 100 keywords think about tiered monitoring like hit the top 20 keywords every hour, check the rest daily and throw Redis caching in there to avoid hammering the same API calls within 5-minute windows