Table of Contents
Toggle
-
Automated Price Monitoring: Track Any E-Commerce Product With Python
- Why Automated Price Monitoring Matters
- Architecture: Building a Scalable Price Monitoring System
- Step 1: Set Up Dependencies and Database
- Step 2: Add Products to Monitor
- Step 3: Build the Price Scraper
- Step 4: Update Prices and Store History
- Step 5: Send Notifications
- Price Alert Summary
- Step 6: Schedule Automated Monitoring
- Analyzing Price Trends
- Real-World Applications
- Accelerate with the Tech Stack Report
Automated Price Monitoring: Track Any E-Commerce Product With Python
Price volatility is a fact of life in e-commerce. Whether you’re a reseller looking for arbitrage opportunities, a consumer waiting for the perfect price drop, or a competitive analyst monitoring competitor pricing, manual price checking is tedious and inefficient. A single product across multiple retailers can fluctuate by 30-50% in weeks—and you need real-time visibility to act.
In this guide, I’ll show you how to build an automated price monitoring system in Python that tracks products across multiple e-commerce sites (Amazon, Walmart, eBay, Shopify stores), sends alerts when prices drop, and maintains a historical price database for trend analysis. This is the approach behind the Tech Stack Report, which helps e-commerce managers make data-driven inventory and pricing decisions.
Why Automated Price Monitoring Matters
Here are the real business outcomes:
- Arbitrage profits: Buy low, sell high. Price monitoring reveals gaps between platforms and time windows for profitable reselling.
- Competitive intelligence: Monitor competitor pricing strategies. Are they undercutting you? When do they change prices?
- Deal identification: Automatically find price drops before they’re advertised. Get first-mover advantage.
- Historical data: Build pricing trends to predict future price movements and optimal selling windows.
- Inventory optimization: Know exactly when to reorder based on competitor pricing and your own margin requirements.
Architecture: Building a Scalable Price Monitoring System
The system consists of four core components:
- Product Registry: URLs and metadata for products to monitor.
- Price Scraper: Fetches current prices from each product URL.
- Database: Stores historical prices and timestamps.
- Alert Engine: Compares new prices to thresholds and sends notifications.
Step 1: Set Up Dependencies and Database
pip install requests beautifulsoup4 selenium pandas sqlite3 schedule smtplib
Create a SQLite database to store product URLs and price history:
import sqlite3
from datetime import datetime
def initialize_database(db_name='price_monitor.db'):
"""Create tables for products and price history."""
conn = sqlite3.connect(db_name)
c = conn.cursor()
# Products to monitor
c.execute('''CREATE TABLE IF NOT EXISTS products
(id INTEGER PRIMARY KEY, name TEXT, url TEXT,
retailer TEXT, current_price REAL, target_price REAL,
alert_enabled BOOLEAN, created_at TIMESTAMP)''')
# Price history for trend analysis
c.execute('''CREATE TABLE IF NOT EXISTS price_history
(id INTEGER PRIMARY KEY, product_id INTEGER,
price REAL, timestamp TIMESTAMP,
FOREIGN KEY(product_id) REFERENCES products(id))''')
# Alert log
c.execute('''CREATE TABLE IF NOT EXISTS alerts
(id INTEGER PRIMARY KEY, product_id INTEGER,
old_price REAL, new_price REAL,
alert_type TEXT, timestamp TIMESTAMP,
FOREIGN KEY(product_id) REFERENCES products(id))''')
conn.commit()
conn.close()
print("Database initialized successfully")
initialize_database()
Step 2: Add Products to Monitor
def add_product(name, url, retailer, target_price=None):
"""Add a product to the monitoring list."""
conn = sqlite3.connect('price_monitor.db')
c = conn.cursor()
c.execute('''INSERT INTO products (name, url, retailer, target_price, alert_enabled, created_at)
VALUES (?, ?, ?, ?, ?, ?)''',
(name, url, retailer, target_price, True, datetime.now()))
conn.commit()
product_id = c.lastrowid
conn.close()
print(f"Added product '{name}' (ID: {product_id})")
return product_id
# Example: Add products to monitor
add_product(
name="Sony WH-CH720 Wireless Headphones",
url="https://www.amazon.com/Sony-WH-CH720-Wireless-Headphones-Blue/dp/B0B4H5PYXQ",
retailer="Amazon",
target_price=79.99
)
add_product(
name="Apple Magic Keyboard",
url="https://www.walmart.com/ip/Apple-Magic-Keyboard-White/373834041",
retailer="Walmart",
target_price=249.00
)
add_product(
name="Nintendo Switch OLED Console",
url="https://www.bestbuy.com/site/nintendo-switch-oled-model-white/16521127.p",
retailer="Best Buy",
target_price=299.99
)
Step 3: Build the Price Scraper
The scraper needs to handle different website structures. We’ll use BeautifulSoup for simple HTML parsing and Selenium for JavaScript-heavy sites.
import requests
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
import re
class PriceScraper:
def __init__(self):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
def scrape_amazon(self, url):
"""Extract price from Amazon product page."""
try:
response = requests.get(url, headers=self.headers, timeout=10)
soup = BeautifulSoup(response.content, 'html.parser')
# Amazon stores price in multiple places; try main price first
price_element = soup.find('span', {'class': 'a-price-whole'})
if price_element:
price_text = price_element.get_text(strip=True)
price = float(re.sub(r'[^0-9.]', '', price_text))
return price
return None
except Exception as e:
print(f"Error scraping Amazon: {e}")
return None
def scrape_walmart(self, url):
"""Extract price from Walmart product page."""
try:
response = requests.get(url, headers=self.headers, timeout=10)
soup = BeautifulSoup(response.content, 'html.parser')
# Walmart uses different selectors
price_element = soup.find('div', {'data-testid': 'current-price'})
if price_element:
price_text = price_element.get_text(strip=True)
price = float(re.sub(r'[^0-9.]', '', price_text))
return price
return None
except Exception as e:
print(f"Error scraping Walmart: {e}")
return None
def scrape_bestbuy(self, url):
"""Extract price from Best Buy product page."""
try:
response = requests.get(url, headers=self.headers, timeout=10)
soup = BeautifulSoup(response.content, 'html.parser')
price_element = soup.find('div', {'class': 'priceView'})
if price_element:
price_text = price_element.get_text(strip=True)
price = float(re.sub(r'[^0-9.]', '', price_text))
return price
return None
except Exception as e:
print(f"Error scraping Best Buy: {e}")
return None
def scrape_ebay(self, url):
"""Extract price from eBay listing."""
try:
response = requests.get(url, headers=self.headers, timeout=10)
soup = BeautifulSoup(response.content, 'html.parser')
# eBay has various price formats
price_element = soup.find('span', {'id': 'prcIsum'})
if price_element:
price_text = price_element.get_text(strip=True)
price = float(re.sub(r'[^0-9.]', '', price_text))
return price
return None
except Exception as e:
print(f"Error scraping eBay: {e}")
return None
def get_price(self, url, retailer):
"""Route to appropriate scraper based on retailer."""
if 'amazon' in url:
return self.scrape_amazon(url)
elif 'walmart' in url:
return self.scrape_walmart(url)
elif 'bestbuy' in url:
return self.scrape_bestbuy(url)
elif 'ebay' in url:
return self.scrape_ebay(url)
else:
# Generic scraper for Shopify and other stores
return self.scrape_generic(url)
def scrape_generic(self, url):
"""Generic price extraction for non-major retailers."""
try:
response = requests.get(url, headers=self.headers, timeout=10)
soup = BeautifulSoup(response.content, 'html.parser')
# Look for common price patterns
price_patterns = [
soup.find('span', {'class': re.compile('price', re.I)}),
soup.find('div', {'class': re.compile('price', re.I)}),
soup.find('p', {'class': re.compile('price', re.I)}),
]
for element in price_patterns:
if element:
price_text = element.get_text(strip=True)
# Extract first float found
match = re.search(r'\$?([0-9]+\.?[0-9]*)', price_text)
if match:
return float(match.group(1))
return None
except Exception as e:
print(f"Error scraping generic URL: {e}")
return None
# Test the scraper
scraper = PriceScraper()
price = scraper.get_price('https://www.amazon.com/Sony-WH-CH720-Wireless-Headphones-Blue/dp/B0B4H5PYXQ', 'Amazon')
print(f"Current price: ${price}")
Step 4: Update Prices and Store History
def update_price(product_id, new_price):
"""Update product price and record history."""
conn = sqlite3.connect('price_monitor.db')
c = conn.cursor()
# Get current price for comparison
c.execute('SELECT current_price FROM products WHERE id = ?', (product_id,))
result = c.fetchone()
old_price = result[0] if result else None
# Update current price
c.execute('UPDATE products SET current_price = ? WHERE id = ?', (new_price, product_id))
# Record in price history
c.execute('INSERT INTO price_history (product_id, price, timestamp) VALUES (?, ?, ?)',
(product_id, new_price, datetime.now()))
conn.commit()
conn.close()
return old_price
def check_all_prices():
"""Check prices for all monitored products."""
conn = sqlite3.connect('price_monitor.db')
c = conn.cursor()
c.execute('SELECT id, name, url, retailer, current_price, target_price FROM products WHERE alert_enabled = 1')
products = c.fetchall()
conn.close()
scraper = PriceScraper()
alerts = []
for product in products:
product_id, name, url, retailer, old_price, target_price = product
# Scrape new price
new_price = scraper.get_price(url, retailer)
if new_price is None:
print(f"Failed to scrape {name}")
continue
# Update database
update_price(product_id, new_price)
# Check for price drop
if old_price and new_price < old_price:
percent_drop = ((old_price - new_price) / old_price) * 100
alerts.append({
'product_id': product_id,
'name': name,
'old_price': old_price,
'new_price': new_price,
'percent_drop': percent_drop,
'alert_type': 'PRICE_DROP'
})
print(f"Alert: {name} dropped from ${old_price:.2f} to ${new_price:.2f} ({percent_drop:.1f}%)")
# Check for target price hit
if target_price and new_price <= target_price:
alerts.append({
'product_id': product_id,
'name': name,
'old_price': old_price,
'new_price': new_price,
'alert_type': 'TARGET_REACHED'
})
print(f"Alert: {name} hit target price ${target_price:.2f}!")
return alerts
# Run check
alerts = check_all_prices()
print(f"\nFound {len(alerts)} alerts")
Step 5: Send Notifications
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class AlertNotifier:
def __init__(self, email_address, email_password):
self.email_address = email_address
self.email_password = email_password
def send_email_alert(self, recipient_email, alerts):
"""Send price alert via email."""
try:
# Create email
message = MIMEMultipart('alternative')
message['Subject'] = f"Price Alert: {len(alerts)} product(s) updated"
message['From'] = self.email_address
message['To'] = recipient_email
# Build HTML email
html_body = "
## Price Alert Summary
"
html_body += ""
html_body += "Product| Old Price| New Price| Change| Type
---|---|---|---|---
"
for alert in alerts:
old_price = alert.get('old_price', 'N/A')
new_price = alert.get('new_price', 'N/A')
percent_drop = alert.get('percent_drop', 0)
html_body += f""
html_body += f"{alert['name']}"
html_body += f"| ${old_price:.2f}"
html_body += f"| ${new_price:.2f}"
html_body += f"| {percent_drop:.1f}%"
html_body += f"| {alert['alert_type']}"
html_body += f"
"
html_body += ""
message.attach(MIMEText(html_body, 'html'))
# Send email
server = smtplib.SMTP_SSL('smtp.gmail.com', 465)
server.login(self.email_address, self.email_password)
server.sendmail(self.email_address, recipient_email, message.as_string())
server.quit()
print(f"Alert email sent to {recipient_email}")
except Exception as e:
print(f"Error sending email: {e}")
def send_webhook_alert(self, webhook_url, alerts):
"""Send alert via webhook (Slack, Discord, etc)."""
import json
try:
payload = {
'text': f'Price Alert: {len(alerts)} product(s) updated',
'attachments': [
{
'title': alert['name'],
'text': f"${alert['old_price']:.2f} → ${alert['new_price']:.2f}",
'color': 'danger' if alert['alert_type'] == 'PRICE_DROP' else 'warning'
}
for alert in alerts
]
}
response = requests.post(webhook_url, json=payload)
print(f"Webhook alert sent (status: {response.status_code})")
except Exception as e:
print(f"Error sending webhook: {e}")
# Example usage
notifier = AlertNotifier('your_email@gmail.com', 'your_app_password')
alerts = check_all_prices()
if alerts:
notifier.send_email_alert('recipient@example.com', alerts)
notifier.send_webhook_alert('https://hooks.slack.com/services/YOUR/WEBHOOK/URL', alerts)
Step 6: Schedule Automated Monitoring
import schedule
import time
from threading import Thread
def run_price_check():
"""Scheduled task to check prices."""
print(f"[{datetime.now()}] Running price check...")
alerts = check_all_prices()
if alerts:
notifier.send_email_alert('your_email@gmail.com', alerts)
def schedule_monitoring():
"""Set up automated price monitoring."""
# Check every 6 hours
schedule.every(6).hours.do(run_price_check)
# Run at specific times
schedule.every().day.at("09:00").do(run_price_check) # 9 AM
schedule.every().day.at("17:00").do(run_price_check) # 5 PM
schedule.every().day.at("21:00").do(run_price_check) # 9 PM
# Keep scheduler running in background
while True:
schedule.run_pending()
time.sleep(60)
# Start in background thread
scheduler_thread = Thread(target=schedule_monitoring, daemon=True)
scheduler_thread.start()
print("Price monitoring started. Running in background...")
while True:
time.sleep(1)
Analyzing Price Trends
import pandas as pd
from datetime import datetime, timedelta
def analyze_price_trends(product_id, days=30):
"""Analyze price trends over time."""
conn = sqlite3.connect('price_monitor.db')
query = f'''SELECT price, timestamp FROM price_history
WHERE product_id = ? AND timestamp > datetime('now', '-{days} days')
ORDER BY timestamp'''
df = pd.read_sql_query(query, conn, params=[product_id])
conn.close()
if df.empty:
return None
# Calculate statistics
stats = {
'min_price': df['price'].min(),
'max_price': df['price'].max(),
'avg_price': df['price'].mean(),
'current_price': df['price'].iloc[-1],
'price_change': df['price'].iloc[-1] - df['price'].iloc[0],
'percent_change': ((df['price'].iloc[-1] - df['price'].iloc[0]) / df['price'].iloc[0] * 100)
}
return stats
# Example
stats = analyze_price_trends(product_id=1, days=30)
print(f"30-Day Price Analysis:")
print(f" Min: ${stats['min_price']:.2f}")
print(f" Max: ${stats['max_price']:.2f}")
print(f" Avg: ${stats['avg_price']:.2f}")
print(f" Change: {stats['percent_change']:.1f}%")
Real-World Applications
- Amazon resellers: Monitor wholesale prices (Alibaba, AliExpress) vs Amazon selling prices for arbitrage.
- Dropshippers: Track supplier costs to ensure profit margins stay healthy.
- Deal aggregators: Build your own deal site by monitoring and alerting on price drops.
- Affiliate marketers: Promote products when prices are lowest (higher conversion rate).
- Price optimization: Use historical data to inform dynamic pricing strategies.
Accelerate with the Tech Stack Report
The code above is solid, but maintaining scrapers for dozens of e-commerce sites requires constant updates. The Tech Stack Report ($9) provides:
- Pre-built price monitoring dashboard (updates hourly)
- Support for 50+ retailers (Amazon, Walmart, eBay, Shopify, and more)
- Slack/Discord/email integration
- Historical price charts and trend analysis
- CSV export for analysis in Excel or Python
Get the Tech Stack Report →
About the Author
The Next Gen Nexus covers AI agents, automation, and web data — practical guides for developers, analysts, and businesses working with data at scale.
Try Apify free — get $5 in platform credit (no credit card required) and run this scraper plus 30,000+ others. Sign up here →
See also: New — Walmart Scraper — Products, Prices, Ratings & Stock
Top comments (0)