Forem

agenthustler
agenthustler

Posted on

How to Build a Price Tracker with Python in 2026: Amazon, eBay, and Walmart

Introduction

Tracking prices across multiple e-commerce platforms is one of the most practical applications of web scraping. Whether you want to find the best deals for personal purchases, monitor competitor pricing for your business, or build a price comparison service, a cross-platform price tracker is an incredibly useful tool.

In this tutorial, I'll show you how to build a price tracker that monitors products on Amazon, eBay, and Walmart — storing historical price data in SQLite so you can spot trends and get alerts when prices drop.

What We're Building

Our price tracker will:

  • Scrape product prices from Amazon, eBay, and Walmart
  • Store price history in a local SQLite database
  • Detect price drops and alert you
  • Run on a schedule to track prices over time

The Scraping Challenges Per Platform

Each platform has its own quirks:

Platform Difficulty Main Challenge
Amazon Hard Aggressive bot detection, dynamic content
eBay Medium Auction vs. Buy-It-Now pricing, less anti-bot
Walmart Hard Heavy JavaScript rendering, Akamai protection

Project Setup

pip install requests beautifulsoup4 lxml schedule
Enter fullscreen mode Exit fullscreen mode

Let's start with the database and core structure:

import sqlite3
from datetime import datetime
from dataclasses import dataclass

@dataclass
class PriceRecord:
    product_id: str
    platform: str
    name: str
    price: float
    currency: str = 'USD'
    url: str = ''

class PriceDatabase:
    def __init__(self, db_path: str = 'price_tracker.db'):
        self.conn = sqlite3.connect(db_path)
        self._create_tables()

    def _create_tables(self):
        self.conn.execute('''
            CREATE TABLE IF NOT EXISTS products (
                id TEXT,
                platform TEXT,
                name TEXT,
                url TEXT,
                PRIMARY KEY (id, platform)
            )
        ''')
        self.conn.execute('''
            CREATE TABLE IF NOT EXISTS prices (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                product_id TEXT,
                platform TEXT,
                price REAL,
                currency TEXT DEFAULT 'USD',
                recorded_at TEXT DEFAULT CURRENT_TIMESTAMP,
                FOREIGN KEY (product_id, platform)
                    REFERENCES products(id, platform)
            )
        ''')
        self.conn.commit()

    def add_product(self, record: PriceRecord):
        self.conn.execute(
            'INSERT OR REPLACE INTO products VALUES (?, ?, ?, ?)',
            (record.product_id, record.platform, record.name, record.url)
        )
        self.conn.execute(
            'INSERT INTO prices (product_id, platform, price, currency) '
            'VALUES (?, ?, ?, ?)',
            (record.product_id, record.platform,
             record.price, record.currency)
        )
        self.conn.commit()

    def get_price_history(self, product_id: str, platform: str) -> list:
        cursor = self.conn.execute(
            'SELECT price, recorded_at FROM prices '
            'WHERE product_id = ? AND platform = ? '
            'ORDER BY recorded_at DESC LIMIT 30',
            (product_id, platform)
        )
        return cursor.fetchall()

    def get_price_drop(self, product_id: str, platform: str) -> float | None:
        """Returns percentage drop from previous price, or None."""
        history = self.get_price_history(product_id, platform)
        if len(history) < 2:
            return None
        current, previous = history[0][0], history[1][0]
        if previous > 0 and current < previous:
            return round((1 - current / previous) * 100, 1)
        return None
Enter fullscreen mode Exit fullscreen mode

Platform Scrapers

The Base Scraper

import requests
from bs4 import BeautifulSoup
import time
import random

class BaseScraper:
    def __init__(self, proxies: list[str] | None = None):
        self.session = requests.Session()
        self.proxies = proxies or []
        self.proxy_index = 0
        self.user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
            '(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 '
            '(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 '
            '(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36',
        ]

    def _get_headers(self) -> dict:
        return {
            'User-Agent': random.choice(self.user_agents),
            'Accept-Language': 'en-US,en;q=0.9',
            'Accept': 'text/html,application/xhtml+xml',
        }

    def _get_proxy(self) -> dict | None:
        if not self.proxies:
            return None
        proxy = self.proxies[self.proxy_index % len(self.proxies)]
        self.proxy_index += 1
        return {'http': f'http://{proxy}', 'https': f'http://{proxy}'}

    def fetch(self, url: str) -> BeautifulSoup | None:
        try:
            response = self.session.get(
                url,
                headers=self._get_headers(),
                proxies=self._get_proxy(),
                timeout=15
            )
            if response.status_code == 200:
                return BeautifulSoup(response.text, 'lxml')
            print(f'Status {response.status_code} for {url}')
        except requests.RequestException as e:
            print(f'Error fetching {url}: {e}')
        return None

    def delay(self):
        time.sleep(random.uniform(2, 5))
Enter fullscreen mode Exit fullscreen mode

Amazon Scraper

class AmazonScraper(BaseScraper):
    def get_price(self, url: str) -> PriceRecord | None:
        soup = self.fetch(url)
        if not soup:
            return None

        title_el = soup.select_one('#productTitle')
        title = title_el.text.strip() if title_el else 'Unknown'

        # Amazon uses multiple price selectors
        price = None
        for selector in [
            'span.a-price span.a-offscreen',
            '#priceblock_ourprice',
            '#priceblock_dealprice',
        ]:
            el = soup.select_one(selector)
            if el:
                price_text = el.text.strip().replace('$', '').replace(',', '')
                try:
                    price = float(price_text)
                    break
                except ValueError:
                    continue

        if price is None:
            return None

        # Extract ASIN from URL
        asin = ''
        if '/dp/' in url:
            asin = url.split('/dp/')[1].split('/')[0].split('?')[0]

        return PriceRecord(
            product_id=asin, platform='amazon',
            name=title, price=price, url=url
        )
Enter fullscreen mode Exit fullscreen mode

eBay Scraper

class EbayScraper(BaseScraper):
    def get_price(self, url: str) -> PriceRecord | None:
        soup = self.fetch(url)
        if not soup:
            return None

        title_el = soup.select_one('h1.x-item-title__mainTitle span')
        title = title_el.text.strip() if title_el else 'Unknown'

        price_el = soup.select_one('div.x-price-primary span.ux-textspandit')
        if not price_el:
            price_el = soup.select_one('[itemprop="price"]')

        if not price_el:
            return None

        price_text = price_el.text.strip()
        price_text = price_text.replace('US $', '').replace('$', '').replace(',', '')
        try:
            price = float(price_text)
        except ValueError:
            return None

        # Extract item ID from URL
        item_id = ''
        if '/itm/' in url:
            item_id = url.split('/itm/')[1].split('?')[0].split('/')[0]

        return PriceRecord(
            product_id=item_id, platform='ebay',
            name=title, price=price, url=url
        )
Enter fullscreen mode Exit fullscreen mode

Walmart Scraper

class WalmartScraper(BaseScraper):
    def get_price(self, url: str) -> PriceRecord | None:
        soup = self.fetch(url)
        if not soup:
            return None

        title_el = soup.select_one('h1[itemprop="name"]')
        title = title_el.text.strip() if title_el else 'Unknown'

        price_el = soup.select_one('[itemprop="price"]')
        if not price_el:
            price_el = soup.select_one('span.price-characteristic')

        if not price_el:
            return None

        price_text = price_el.get('content', '') or price_el.text.strip()
        price_text = price_text.replace('$', '').replace(',', '')
        try:
            price = float(price_text)
        except ValueError:
            return None

        # Extract product ID
        product_id = url.rstrip('/').split('/')[-1]

        return PriceRecord(
            product_id=product_id, platform='walmart',
            name=title, price=price, url=url
        )
Enter fullscreen mode Exit fullscreen mode

Dealing with Proxy Rotation

Proxy rotation is non-negotiable for any multi-platform scraper. Without it, you'll get blocked within minutes on Amazon and Walmart.

You have two options:

Option 1: Self-managed proxies

Buy residential proxies from a provider and rotate them yourself:

proxy_list = [
    'user:pass@residential1.example.com:8080',
    'user:pass@residential2.example.com:8080',
    'user:pass@residential3.example.com:8080',
]

amazon = AmazonScraper(proxies=proxy_list)
ebay = EbayScraper(proxies=proxy_list)
walmart = WalmartScraper(proxies=proxy_list)
Enter fullscreen mode Exit fullscreen mode

Option 2: Use a proxy management service

Services like ScrapeOps handle proxy rotation, CAPTCHA solving, and request optimization for you. Instead of managing a pool of proxies yourself, you route requests through their API:

SCRAPEOPS_KEY = 'your_api_key'

def scrape_via_scrapeops(url: str) -> str:
    response = requests.get(
        'https://proxy.scrapeops.io/v1/',
        params={
            'api_key': SCRAPEOPS_KEY,
            'url': url,
            'render_js': 'true',
        },
        timeout=60
    )
    return response.text
Enter fullscreen mode Exit fullscreen mode

ScrapeOps is particularly good for multi-platform scraping because they optimize proxy selection per target site. Their dashboard also gives you success rate analytics so you can see which sites are causing issues.

Putting It All Together: The Price Tracker

import schedule

class PriceTracker:
    def __init__(self):
        self.db = PriceDatabase()
        self.scrapers = {
            'amazon': AmazonScraper(),
            'ebay': EbayScraper(),
            'walmart': WalmartScraper(),
        }
        self.watchlist = []

    def add_to_watchlist(self, url: str, platform: str):
        self.watchlist.append({'url': url, 'platform': platform})

    def check_prices(self):
        """Check all watched products and record prices."""
        print(f'\n--- Price Check: {datetime.now().isoformat()} ---')

        for item in self.watchlist:
            platform = item['platform']
            url = item['url']
            scraper = self.scrapers.get(platform)

            if not scraper:
                print(f'No scraper for {platform}')
                continue

            record = scraper.get_price(url)
            if record:
                self.db.add_product(record)
                drop = self.db.get_price_drop(
                    record.product_id, platform
                )
                status = f' (DOWN {drop}%!)' if drop else ''
                print(
                    f'  [{platform}] {record.name[:50]}: '
                    f'${record.price}{status}'
                )
            else:
                print(f'  [{platform}] Failed to scrape {url}')

            scraper.delay()

    def run(self, interval_hours: int = 6):
        """Run the tracker on a schedule."""
        self.check_prices()  # Initial check
        schedule.every(interval_hours).hours.do(self.check_prices)

        print(f'\nTracker running (every {interval_hours}h). Ctrl+C to stop.')
        while True:
            schedule.run_pending()
            time.sleep(60)


# Example usage
if __name__ == '__main__':
    tracker = PriceTracker()

    # Add products to watch
    tracker.add_to_watchlist(
        'https://www.amazon.com/dp/B0EXAMPLE', 'amazon'
    )
    tracker.add_to_watchlist(
        'https://www.ebay.com/itm/123456789', 'ebay'
    )
    tracker.add_to_watchlist(
        'https://www.walmart.com/ip/product-name/987654321', 'walmart'
    )

    # Run every 6 hours
    tracker.run(interval_hours=6)
Enter fullscreen mode Exit fullscreen mode

Adding Price Drop Alerts

Let's add email notifications when prices drop significantly:

import smtplib
from email.mime.text import MIMEText

def send_price_alert(
    product_name: str, platform: str,
    old_price: float, new_price: float, url: str
):
    drop_pct = round((1 - new_price / old_price) * 100, 1)
    subject = f'Price Drop Alert: {product_name} (-{drop_pct}%)'
    body = (
        f'{product_name}\n'
        f'Platform: {platform}\n'
        f'Old price: ${old_price:.2f}\n'
        f'New price: ${new_price:.2f}\n'
        f'Drop: {drop_pct}%\n'
        f'Link: {url}'
    )

    msg = MIMEText(body)
    msg['Subject'] = subject
    msg['From'] = 'tracker@yourdomain.com'
    msg['To'] = 'you@yourdomain.com'

    with smtplib.SMTP('smtp.yourdomain.com', 587) as server:
        server.starttls()
        server.login('tracker@yourdomain.com', 'your_password')
        server.send_message(msg)
Enter fullscreen mode Exit fullscreen mode

Querying Your Price Data

Once you've been tracking for a while, you can run queries to find the best deals:

def find_lowest_prices(db_path: str = 'price_tracker.db'):
    """Find the lowest recorded price for each product."""
    conn = sqlite3.connect(db_path)
    cursor = conn.execute('''
        SELECT p.name, p.platform, MIN(pr.price) as lowest,
               pr.recorded_at
        FROM products p
        JOIN prices pr ON p.id = pr.product_id
            AND p.platform = pr.platform
        GROUP BY p.id, p.platform
        ORDER BY p.name
    ''')

    for row in cursor.fetchall():
        print(f'{row[0]} ({row[1]}): ${row[2]:.2f} on {row[3]}')
    conn.close()

def price_trend(product_id: str, platform: str,
                db_path: str = 'price_tracker.db'):
    """Show price trend for a specific product."""
    conn = sqlite3.connect(db_path)
    cursor = conn.execute(
        'SELECT price, recorded_at FROM prices '
        'WHERE product_id = ? AND platform = ? '
        'ORDER BY recorded_at',
        (product_id, platform)
    )

    print(f'Price history for {product_id} on {platform}:')
    for price, date in cursor.fetchall():
        bar = '#' * int(price / 5)
        print(f'  {date}: ${price:.2f} {bar}')
    conn.close()
Enter fullscreen mode Exit fullscreen mode

Tips for Production

  1. Rotate User-Agents: Keep a list of 10+ current browser UAs and randomize
  2. Respect robots.txt: Check each platform's terms of service
  3. Add exponential backoff: When you get blocked, wait longer before retrying
  4. Use a managed proxy service: ScrapeOps or similar services save you from the headache of proxy management
  5. Monitor success rates: Track how often your scrapes succeed per platform
  6. Run at varied times: Don't hit the same sites at exactly the same time every day
  7. Store raw HTML: Keep the original pages so you can re-parse if your selectors break

Conclusion

Building a cross-platform price tracker is a great project that combines web scraping, data storage, and automation. The biggest challenge isn't writing the scraping code — it's maintaining reliable access to these platforms over time.

Start with one platform, get it working reliably, then expand. Use proxy rotation from the start (even if you're tempted to skip it during development), and consider a managed service like ScrapeOps if you want to focus on the data rather than the infrastructure.

The full source code for this project is available in the examples above — just combine the classes into a single file and you're ready to go.


Building something cool with price data? Share it in the comments!

Top comments (0)