DEV Community

NexGenData
NexGenData

Posted on • Originally published at thenextgennexus.com

Automated Price Monitoring: Track Any E-Commerce Product With Python

Table of Contents

Toggle

Automated Price Monitoring: Track Any E-Commerce Product With Python

Price volatility is a fact of life in e-commerce. Whether you’re a reseller looking for arbitrage opportunities, a consumer waiting for the perfect price drop, or a competitive analyst monitoring competitor pricing, manual price checking is tedious and inefficient. A single product across multiple retailers can fluctuate by 30-50% in weeks—and you need real-time visibility to act.

In this guide, I’ll show you how to build an automated price monitoring system in Python that tracks products across multiple e-commerce sites (Amazon, Walmart, eBay, Shopify stores), sends alerts when prices drop, and maintains a historical price database for trend analysis. This is the approach behind the Tech Stack Report, which helps e-commerce managers make data-driven inventory and pricing decisions.

Why Automated Price Monitoring Matters

Here are the real business outcomes:

  • Arbitrage profits: Buy low, sell high. Price monitoring reveals gaps between platforms and time windows for profitable reselling.
  • Competitive intelligence: Monitor competitor pricing strategies. Are they undercutting you? When do they change prices?
  • Deal identification: Automatically find price drops before they’re advertised. Get first-mover advantage.
  • Historical data: Build pricing trends to predict future price movements and optimal selling windows.
  • Inventory optimization: Know exactly when to reorder based on competitor pricing and your own margin requirements.

Architecture: Building a Scalable Price Monitoring System

The system consists of four core components:

  • Product Registry: URLs and metadata for products to monitor.
  • Price Scraper: Fetches current prices from each product URL.
  • Database: Stores historical prices and timestamps.
  • Alert Engine: Compares new prices to thresholds and sends notifications.

Step 1: Set Up Dependencies and Database


    pip install requests beautifulsoup4 selenium pandas sqlite3 schedule smtplib
Enter fullscreen mode Exit fullscreen mode

Create a SQLite database to store product URLs and price history:


    import sqlite3
    from datetime import datetime

    def initialize_database(db_name='price_monitor.db'):
        """Create tables for products and price history."""
        conn = sqlite3.connect(db_name)
        c = conn.cursor()

        # Products to monitor
        c.execute('''CREATE TABLE IF NOT EXISTS products
                     (id INTEGER PRIMARY KEY, name TEXT, url TEXT, 
                      retailer TEXT, current_price REAL, target_price REAL,
                      alert_enabled BOOLEAN, created_at TIMESTAMP)''')

        # Price history for trend analysis
        c.execute('''CREATE TABLE IF NOT EXISTS price_history
                     (id INTEGER PRIMARY KEY, product_id INTEGER, 
                      price REAL, timestamp TIMESTAMP, 
                      FOREIGN KEY(product_id) REFERENCES products(id))''')

        # Alert log
        c.execute('''CREATE TABLE IF NOT EXISTS alerts
                     (id INTEGER PRIMARY KEY, product_id INTEGER, 
                      old_price REAL, new_price REAL, 
                      alert_type TEXT, timestamp TIMESTAMP,
                      FOREIGN KEY(product_id) REFERENCES products(id))''')

        conn.commit()
        conn.close()
        print("Database initialized successfully")

    initialize_database()
Enter fullscreen mode Exit fullscreen mode

Step 2: Add Products to Monitor


    def add_product(name, url, retailer, target_price=None):
        """Add a product to the monitoring list."""
        conn = sqlite3.connect('price_monitor.db')
        c = conn.cursor()

        c.execute('''INSERT INTO products (name, url, retailer, target_price, alert_enabled, created_at)
                     VALUES (?, ?, ?, ?, ?, ?)''',
                  (name, url, retailer, target_price, True, datetime.now()))

        conn.commit()
        product_id = c.lastrowid
        conn.close()

        print(f"Added product '{name}' (ID: {product_id})")
        return product_id

    # Example: Add products to monitor
    add_product(
        name="Sony WH-CH720 Wireless Headphones",
        url="https://www.amazon.com/Sony-WH-CH720-Wireless-Headphones-Blue/dp/B0B4H5PYXQ",
        retailer="Amazon",
        target_price=79.99
    )

    add_product(
        name="Apple Magic Keyboard",
        url="https://www.walmart.com/ip/Apple-Magic-Keyboard-White/373834041",
        retailer="Walmart",
        target_price=249.00
    )

    add_product(
        name="Nintendo Switch OLED Console",
        url="https://www.bestbuy.com/site/nintendo-switch-oled-model-white/16521127.p",
        retailer="Best Buy",
        target_price=299.99
    )
Enter fullscreen mode Exit fullscreen mode

Step 3: Build the Price Scraper

The scraper needs to handle different website structures. We’ll use BeautifulSoup for simple HTML parsing and Selenium for JavaScript-heavy sites.


    import requests
    from bs4 import BeautifulSoup
    from selenium import webdriver
    from selenium.webdriver.common.by import By
    import re

    class PriceScraper:
        def __init__(self):
            self.headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }

        def scrape_amazon(self, url):
            """Extract price from Amazon product page."""
            try:
                response = requests.get(url, headers=self.headers, timeout=10)
                soup = BeautifulSoup(response.content, 'html.parser')

                # Amazon stores price in multiple places; try main price first
                price_element = soup.find('span', {'class': 'a-price-whole'})
                if price_element:
                    price_text = price_element.get_text(strip=True)
                    price = float(re.sub(r'[^0-9.]', '', price_text))
                    return price

                return None
            except Exception as e:
                print(f"Error scraping Amazon: {e}")
                return None

        def scrape_walmart(self, url):
            """Extract price from Walmart product page."""
            try:
                response = requests.get(url, headers=self.headers, timeout=10)
                soup = BeautifulSoup(response.content, 'html.parser')

                # Walmart uses different selectors
                price_element = soup.find('div', {'data-testid': 'current-price'})
                if price_element:
                    price_text = price_element.get_text(strip=True)
                    price = float(re.sub(r'[^0-9.]', '', price_text))
                    return price

                return None
            except Exception as e:
                print(f"Error scraping Walmart: {e}")
                return None

        def scrape_bestbuy(self, url):
            """Extract price from Best Buy product page."""
            try:
                response = requests.get(url, headers=self.headers, timeout=10)
                soup = BeautifulSoup(response.content, 'html.parser')

                price_element = soup.find('div', {'class': 'priceView'})
                if price_element:
                    price_text = price_element.get_text(strip=True)
                    price = float(re.sub(r'[^0-9.]', '', price_text))
                    return price

                return None
            except Exception as e:
                print(f"Error scraping Best Buy: {e}")
                return None

        def scrape_ebay(self, url):
            """Extract price from eBay listing."""
            try:
                response = requests.get(url, headers=self.headers, timeout=10)
                soup = BeautifulSoup(response.content, 'html.parser')

                # eBay has various price formats
                price_element = soup.find('span', {'id': 'prcIsum'})
                if price_element:
                    price_text = price_element.get_text(strip=True)
                    price = float(re.sub(r'[^0-9.]', '', price_text))
                    return price

                return None
            except Exception as e:
                print(f"Error scraping eBay: {e}")
                return None

        def get_price(self, url, retailer):
            """Route to appropriate scraper based on retailer."""
            if 'amazon' in url:
                return self.scrape_amazon(url)
            elif 'walmart' in url:
                return self.scrape_walmart(url)
            elif 'bestbuy' in url:
                return self.scrape_bestbuy(url)
            elif 'ebay' in url:
                return self.scrape_ebay(url)
            else:
                # Generic scraper for Shopify and other stores
                return self.scrape_generic(url)

        def scrape_generic(self, url):
            """Generic price extraction for non-major retailers."""
            try:
                response = requests.get(url, headers=self.headers, timeout=10)
                soup = BeautifulSoup(response.content, 'html.parser')

                # Look for common price patterns
                price_patterns = [
                    soup.find('span', {'class': re.compile('price', re.I)}),
                    soup.find('div', {'class': re.compile('price', re.I)}),
                    soup.find('p', {'class': re.compile('price', re.I)}),
                ]

                for element in price_patterns:
                    if element:
                        price_text = element.get_text(strip=True)
                        # Extract first float found
                        match = re.search(r'\$?([0-9]+\.?[0-9]*)', price_text)
                        if match:
                            return float(match.group(1))

                return None
            except Exception as e:
                print(f"Error scraping generic URL: {e}")
                return None

    # Test the scraper
    scraper = PriceScraper()
    price = scraper.get_price('https://www.amazon.com/Sony-WH-CH720-Wireless-Headphones-Blue/dp/B0B4H5PYXQ', 'Amazon')
    print(f"Current price: ${price}")
Enter fullscreen mode Exit fullscreen mode

Step 4: Update Prices and Store History


    def update_price(product_id, new_price):
        """Update product price and record history."""
        conn = sqlite3.connect('price_monitor.db')
        c = conn.cursor()

        # Get current price for comparison
        c.execute('SELECT current_price FROM products WHERE id = ?', (product_id,))
        result = c.fetchone()
        old_price = result[0] if result else None

        # Update current price
        c.execute('UPDATE products SET current_price = ? WHERE id = ?', (new_price, product_id))

        # Record in price history
        c.execute('INSERT INTO price_history (product_id, price, timestamp) VALUES (?, ?, ?)',
                  (product_id, new_price, datetime.now()))

        conn.commit()
        conn.close()

        return old_price

    def check_all_prices():
        """Check prices for all monitored products."""
        conn = sqlite3.connect('price_monitor.db')
        c = conn.cursor()

        c.execute('SELECT id, name, url, retailer, current_price, target_price FROM products WHERE alert_enabled = 1')
        products = c.fetchall()
        conn.close()

        scraper = PriceScraper()
        alerts = []

        for product in products:
            product_id, name, url, retailer, old_price, target_price = product

            # Scrape new price
            new_price = scraper.get_price(url, retailer)

            if new_price is None:
                print(f"Failed to scrape {name}")
                continue

            # Update database
            update_price(product_id, new_price)

            # Check for price drop
            if old_price and new_price < old_price:
                percent_drop = ((old_price - new_price) / old_price) * 100
                alerts.append({
                    'product_id': product_id,
                    'name': name,
                    'old_price': old_price,
                    'new_price': new_price,
                    'percent_drop': percent_drop,
                    'alert_type': 'PRICE_DROP'
                })
                print(f"Alert: {name} dropped from ${old_price:.2f} to ${new_price:.2f} ({percent_drop:.1f}%)")

            # Check for target price hit
            if target_price and new_price <= target_price:
                alerts.append({
                    'product_id': product_id,
                    'name': name,
                    'old_price': old_price,
                    'new_price': new_price,
                    'alert_type': 'TARGET_REACHED'
                })
                print(f"Alert: {name} hit target price ${target_price:.2f}!")

        return alerts

    # Run check
    alerts = check_all_prices()
    print(f"\nFound {len(alerts)} alerts")
Enter fullscreen mode Exit fullscreen mode

Step 5: Send Notifications


    import smtplib
    from email.mime.text import MIMEText
    from email.mime.multipart import MIMEMultipart

    class AlertNotifier:
        def __init__(self, email_address, email_password):
            self.email_address = email_address
            self.email_password = email_password

        def send_email_alert(self, recipient_email, alerts):
            """Send price alert via email."""
            try:
                # Create email
                message = MIMEMultipart('alternative')
                message['Subject'] = f"Price Alert: {len(alerts)} product(s) updated"
                message['From'] = self.email_address
                message['To'] = recipient_email

                # Build HTML email
                html_body = "

    ## Price Alert Summary

    "
                html_body += ""
                html_body += "Product| Old Price| New Price| Change| Type  
    ---|---|---|---|---  
    "

                for alert in alerts:
                    old_price = alert.get('old_price', 'N/A')
                    new_price = alert.get('new_price', 'N/A')
                    percent_drop = alert.get('percent_drop', 0)

                    html_body += f""
                    html_body += f"{alert['name']}"
                    html_body += f"| ${old_price:.2f}"
                    html_body += f"| ${new_price:.2f}"
                    html_body += f"| {percent_drop:.1f}%"
                    html_body += f"| {alert['alert_type']}"
                    html_body += f"  
    "

                html_body += ""

                message.attach(MIMEText(html_body, 'html'))

                # Send email
                server = smtplib.SMTP_SSL('smtp.gmail.com', 465)
                server.login(self.email_address, self.email_password)
                server.sendmail(self.email_address, recipient_email, message.as_string())
                server.quit()

                print(f"Alert email sent to {recipient_email}")

            except Exception as e:
                print(f"Error sending email: {e}")

        def send_webhook_alert(self, webhook_url, alerts):
            """Send alert via webhook (Slack, Discord, etc)."""
            import json
            try:
                payload = {
                    'text': f'Price Alert: {len(alerts)} product(s) updated',
                    'attachments': [
                        {
                            'title': alert['name'],
                            'text': f"${alert['old_price']:.2f} → ${alert['new_price']:.2f}",
                            'color': 'danger' if alert['alert_type'] == 'PRICE_DROP' else 'warning'
                        }
                        for alert in alerts
                    ]
                }

                response = requests.post(webhook_url, json=payload)
                print(f"Webhook alert sent (status: {response.status_code})")

            except Exception as e:
                print(f"Error sending webhook: {e}")

    # Example usage
    notifier = AlertNotifier('your_email@gmail.com', 'your_app_password')
    alerts = check_all_prices()
    if alerts:
        notifier.send_email_alert('recipient@example.com', alerts)
        notifier.send_webhook_alert('https://hooks.slack.com/services/YOUR/WEBHOOK/URL', alerts)
Enter fullscreen mode Exit fullscreen mode

Step 6: Schedule Automated Monitoring


    import schedule
    import time
    from threading import Thread

    def run_price_check():
        """Scheduled task to check prices."""
        print(f"[{datetime.now()}] Running price check...")
        alerts = check_all_prices()
        if alerts:
            notifier.send_email_alert('your_email@gmail.com', alerts)

    def schedule_monitoring():
        """Set up automated price monitoring."""
        # Check every 6 hours
        schedule.every(6).hours.do(run_price_check)

        # Run at specific times
        schedule.every().day.at("09:00").do(run_price_check)  # 9 AM
        schedule.every().day.at("17:00").do(run_price_check)  # 5 PM
        schedule.every().day.at("21:00").do(run_price_check)  # 9 PM

        # Keep scheduler running in background
        while True:
            schedule.run_pending()
            time.sleep(60)

    # Start in background thread
    scheduler_thread = Thread(target=schedule_monitoring, daemon=True)
    scheduler_thread.start()

    print("Price monitoring started. Running in background...")
    while True:
        time.sleep(1)
Enter fullscreen mode Exit fullscreen mode

Analyzing Price Trends


    import pandas as pd
    from datetime import datetime, timedelta

    def analyze_price_trends(product_id, days=30):
        """Analyze price trends over time."""
        conn = sqlite3.connect('price_monitor.db')
        query = f'''SELECT price, timestamp FROM price_history 
                   WHERE product_id = ? AND timestamp > datetime('now', '-{days} days')
                   ORDER BY timestamp'''
        df = pd.read_sql_query(query, conn, params=[product_id])
        conn.close()

        if df.empty:
            return None

        # Calculate statistics
        stats = {
            'min_price': df['price'].min(),
            'max_price': df['price'].max(),
            'avg_price': df['price'].mean(),
            'current_price': df['price'].iloc[-1],
            'price_change': df['price'].iloc[-1] - df['price'].iloc[0],
            'percent_change': ((df['price'].iloc[-1] - df['price'].iloc[0]) / df['price'].iloc[0] * 100)
        }

        return stats

    # Example
    stats = analyze_price_trends(product_id=1, days=30)
    print(f"30-Day Price Analysis:")
    print(f"  Min: ${stats['min_price']:.2f}")
    print(f"  Max: ${stats['max_price']:.2f}")
    print(f"  Avg: ${stats['avg_price']:.2f}")
    print(f"  Change: {stats['percent_change']:.1f}%")
Enter fullscreen mode Exit fullscreen mode

Real-World Applications

  • Amazon resellers: Monitor wholesale prices (Alibaba, AliExpress) vs Amazon selling prices for arbitrage.
  • Dropshippers: Track supplier costs to ensure profit margins stay healthy.
  • Deal aggregators: Build your own deal site by monitoring and alerting on price drops.
  • Affiliate marketers: Promote products when prices are lowest (higher conversion rate).
  • Price optimization: Use historical data to inform dynamic pricing strategies.

Accelerate with the Tech Stack Report

The code above is solid, but maintaining scrapers for dozens of e-commerce sites requires constant updates. The Tech Stack Report ($9) provides:

  • Pre-built price monitoring dashboard (updates hourly)
  • Support for 50+ retailers (Amazon, Walmart, eBay, Shopify, and more)
  • Slack/Discord/email integration
  • Historical price charts and trend analysis
  • CSV export for analysis in Excel or Python

Get the Tech Stack Report →


About the Author

The Next Gen Nexus covers AI agents, automation, and web data — practical guides for developers, analysts, and businesses working with data at scale.


Try Apify free — get $5 in platform credit (no credit card required) and run this scraper plus 30,000+ others. Sign up here →

See also: New — Walmart Scraper — Products, Prices, Ratings & Stock

Top comments (0)