Introduction
Tracking prices across multiple e-commerce platforms is one of the most practical applications of web scraping. Whether you want to find the best deals for personal purchases, monitor competitor pricing for your business, or build a price comparison service, a cross-platform price tracker is an incredibly useful tool.
In this tutorial, I'll show you how to build a price tracker that monitors products on Amazon, eBay, and Walmart — storing historical price data in SQLite so you can spot trends and get alerts when prices drop.
What We're Building
Our price tracker will:
- Scrape product prices from Amazon, eBay, and Walmart
- Store price history in a local SQLite database
- Detect price drops and alert you
- Run on a schedule to track prices over time
The Scraping Challenges Per Platform
Each platform has its own quirks:
| Platform | Difficulty | Main Challenge |
|---|---|---|
| Amazon | Hard | Aggressive bot detection, dynamic content |
| eBay | Medium | Auction vs. Buy-It-Now pricing, less anti-bot |
| Walmart | Hard | Heavy JavaScript rendering, Akamai protection |
Project Setup
pip install requests beautifulsoup4 lxml schedule
Let's start with the database and core structure:
import sqlite3
from datetime import datetime
from dataclasses import dataclass
@dataclass
class PriceRecord:
product_id: str
platform: str
name: str
price: float
currency: str = 'USD'
url: str = ''
class PriceDatabase:
def __init__(self, db_path: str = 'price_tracker.db'):
self.conn = sqlite3.connect(db_path)
self._create_tables()
def _create_tables(self):
self.conn.execute('''
CREATE TABLE IF NOT EXISTS products (
id TEXT,
platform TEXT,
name TEXT,
url TEXT,
PRIMARY KEY (id, platform)
)
''')
self.conn.execute('''
CREATE TABLE IF NOT EXISTS prices (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id TEXT,
platform TEXT,
price REAL,
currency TEXT DEFAULT 'USD',
recorded_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (product_id, platform)
REFERENCES products(id, platform)
)
''')
self.conn.commit()
def add_product(self, record: PriceRecord):
self.conn.execute(
'INSERT OR REPLACE INTO products VALUES (?, ?, ?, ?)',
(record.product_id, record.platform, record.name, record.url)
)
self.conn.execute(
'INSERT INTO prices (product_id, platform, price, currency) '
'VALUES (?, ?, ?, ?)',
(record.product_id, record.platform,
record.price, record.currency)
)
self.conn.commit()
def get_price_history(self, product_id: str, platform: str) -> list:
cursor = self.conn.execute(
'SELECT price, recorded_at FROM prices '
'WHERE product_id = ? AND platform = ? '
'ORDER BY recorded_at DESC LIMIT 30',
(product_id, platform)
)
return cursor.fetchall()
def get_price_drop(self, product_id: str, platform: str) -> float | None:
"""Returns percentage drop from previous price, or None."""
history = self.get_price_history(product_id, platform)
if len(history) < 2:
return None
current, previous = history[0][0], history[1][0]
if previous > 0 and current < previous:
return round((1 - current / previous) * 100, 1)
return None
Platform Scrapers
The Base Scraper
import requests
from bs4 import BeautifulSoup
import time
import random
class BaseScraper:
def __init__(self, proxies: list[str] | None = None):
self.session = requests.Session()
self.proxies = proxies or []
self.proxy_index = 0
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36',
]
def _get_headers(self) -> dict:
return {
'User-Agent': random.choice(self.user_agents),
'Accept-Language': 'en-US,en;q=0.9',
'Accept': 'text/html,application/xhtml+xml',
}
def _get_proxy(self) -> dict | None:
if not self.proxies:
return None
proxy = self.proxies[self.proxy_index % len(self.proxies)]
self.proxy_index += 1
return {'http': f'http://{proxy}', 'https': f'http://{proxy}'}
def fetch(self, url: str) -> BeautifulSoup | None:
try:
response = self.session.get(
url,
headers=self._get_headers(),
proxies=self._get_proxy(),
timeout=15
)
if response.status_code == 200:
return BeautifulSoup(response.text, 'lxml')
print(f'Status {response.status_code} for {url}')
except requests.RequestException as e:
print(f'Error fetching {url}: {e}')
return None
def delay(self):
time.sleep(random.uniform(2, 5))
Amazon Scraper
class AmazonScraper(BaseScraper):
def get_price(self, url: str) -> PriceRecord | None:
soup = self.fetch(url)
if not soup:
return None
title_el = soup.select_one('#productTitle')
title = title_el.text.strip() if title_el else 'Unknown'
# Amazon uses multiple price selectors
price = None
for selector in [
'span.a-price span.a-offscreen',
'#priceblock_ourprice',
'#priceblock_dealprice',
]:
el = soup.select_one(selector)
if el:
price_text = el.text.strip().replace('$', '').replace(',', '')
try:
price = float(price_text)
break
except ValueError:
continue
if price is None:
return None
# Extract ASIN from URL
asin = ''
if '/dp/' in url:
asin = url.split('/dp/')[1].split('/')[0].split('?')[0]
return PriceRecord(
product_id=asin, platform='amazon',
name=title, price=price, url=url
)
eBay Scraper
class EbayScraper(BaseScraper):
def get_price(self, url: str) -> PriceRecord | None:
soup = self.fetch(url)
if not soup:
return None
title_el = soup.select_one('h1.x-item-title__mainTitle span')
title = title_el.text.strip() if title_el else 'Unknown'
price_el = soup.select_one('div.x-price-primary span.ux-textspandit')
if not price_el:
price_el = soup.select_one('[itemprop="price"]')
if not price_el:
return None
price_text = price_el.text.strip()
price_text = price_text.replace('US $', '').replace('$', '').replace(',', '')
try:
price = float(price_text)
except ValueError:
return None
# Extract item ID from URL
item_id = ''
if '/itm/' in url:
item_id = url.split('/itm/')[1].split('?')[0].split('/')[0]
return PriceRecord(
product_id=item_id, platform='ebay',
name=title, price=price, url=url
)
Walmart Scraper
class WalmartScraper(BaseScraper):
def get_price(self, url: str) -> PriceRecord | None:
soup = self.fetch(url)
if not soup:
return None
title_el = soup.select_one('h1[itemprop="name"]')
title = title_el.text.strip() if title_el else 'Unknown'
price_el = soup.select_one('[itemprop="price"]')
if not price_el:
price_el = soup.select_one('span.price-characteristic')
if not price_el:
return None
price_text = price_el.get('content', '') or price_el.text.strip()
price_text = price_text.replace('$', '').replace(',', '')
try:
price = float(price_text)
except ValueError:
return None
# Extract product ID
product_id = url.rstrip('/').split('/')[-1]
return PriceRecord(
product_id=product_id, platform='walmart',
name=title, price=price, url=url
)
Dealing with Proxy Rotation
Proxy rotation is non-negotiable for any multi-platform scraper. Without it, you'll get blocked within minutes on Amazon and Walmart.
You have two options:
Option 1: Self-managed proxies
Buy residential proxies from a provider and rotate them yourself:
proxy_list = [
'user:pass@residential1.example.com:8080',
'user:pass@residential2.example.com:8080',
'user:pass@residential3.example.com:8080',
]
amazon = AmazonScraper(proxies=proxy_list)
ebay = EbayScraper(proxies=proxy_list)
walmart = WalmartScraper(proxies=proxy_list)
Option 2: Use a proxy management service
Services like ScrapeOps handle proxy rotation, CAPTCHA solving, and request optimization for you. Instead of managing a pool of proxies yourself, you route requests through their API:
SCRAPEOPS_KEY = 'your_api_key'
def scrape_via_scrapeops(url: str) -> str:
response = requests.get(
'https://proxy.scrapeops.io/v1/',
params={
'api_key': SCRAPEOPS_KEY,
'url': url,
'render_js': 'true',
},
timeout=60
)
return response.text
ScrapeOps is particularly good for multi-platform scraping because they optimize proxy selection per target site. Their dashboard also gives you success rate analytics so you can see which sites are causing issues.
Putting It All Together: The Price Tracker
import schedule
class PriceTracker:
def __init__(self):
self.db = PriceDatabase()
self.scrapers = {
'amazon': AmazonScraper(),
'ebay': EbayScraper(),
'walmart': WalmartScraper(),
}
self.watchlist = []
def add_to_watchlist(self, url: str, platform: str):
self.watchlist.append({'url': url, 'platform': platform})
def check_prices(self):
"""Check all watched products and record prices."""
print(f'\n--- Price Check: {datetime.now().isoformat()} ---')
for item in self.watchlist:
platform = item['platform']
url = item['url']
scraper = self.scrapers.get(platform)
if not scraper:
print(f'No scraper for {platform}')
continue
record = scraper.get_price(url)
if record:
self.db.add_product(record)
drop = self.db.get_price_drop(
record.product_id, platform
)
status = f' (DOWN {drop}%!)' if drop else ''
print(
f' [{platform}] {record.name[:50]}: '
f'${record.price}{status}'
)
else:
print(f' [{platform}] Failed to scrape {url}')
scraper.delay()
def run(self, interval_hours: int = 6):
"""Run the tracker on a schedule."""
self.check_prices() # Initial check
schedule.every(interval_hours).hours.do(self.check_prices)
print(f'\nTracker running (every {interval_hours}h). Ctrl+C to stop.')
while True:
schedule.run_pending()
time.sleep(60)
# Example usage
if __name__ == '__main__':
tracker = PriceTracker()
# Add products to watch
tracker.add_to_watchlist(
'https://www.amazon.com/dp/B0EXAMPLE', 'amazon'
)
tracker.add_to_watchlist(
'https://www.ebay.com/itm/123456789', 'ebay'
)
tracker.add_to_watchlist(
'https://www.walmart.com/ip/product-name/987654321', 'walmart'
)
# Run every 6 hours
tracker.run(interval_hours=6)
Adding Price Drop Alerts
Let's add email notifications when prices drop significantly:
import smtplib
from email.mime.text import MIMEText
def send_price_alert(
product_name: str, platform: str,
old_price: float, new_price: float, url: str
):
drop_pct = round((1 - new_price / old_price) * 100, 1)
subject = f'Price Drop Alert: {product_name} (-{drop_pct}%)'
body = (
f'{product_name}\n'
f'Platform: {platform}\n'
f'Old price: ${old_price:.2f}\n'
f'New price: ${new_price:.2f}\n'
f'Drop: {drop_pct}%\n'
f'Link: {url}'
)
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = 'tracker@yourdomain.com'
msg['To'] = 'you@yourdomain.com'
with smtplib.SMTP('smtp.yourdomain.com', 587) as server:
server.starttls()
server.login('tracker@yourdomain.com', 'your_password')
server.send_message(msg)
Querying Your Price Data
Once you've been tracking for a while, you can run queries to find the best deals:
def find_lowest_prices(db_path: str = 'price_tracker.db'):
"""Find the lowest recorded price for each product."""
conn = sqlite3.connect(db_path)
cursor = conn.execute('''
SELECT p.name, p.platform, MIN(pr.price) as lowest,
pr.recorded_at
FROM products p
JOIN prices pr ON p.id = pr.product_id
AND p.platform = pr.platform
GROUP BY p.id, p.platform
ORDER BY p.name
''')
for row in cursor.fetchall():
print(f'{row[0]} ({row[1]}): ${row[2]:.2f} on {row[3]}')
conn.close()
def price_trend(product_id: str, platform: str,
db_path: str = 'price_tracker.db'):
"""Show price trend for a specific product."""
conn = sqlite3.connect(db_path)
cursor = conn.execute(
'SELECT price, recorded_at FROM prices '
'WHERE product_id = ? AND platform = ? '
'ORDER BY recorded_at',
(product_id, platform)
)
print(f'Price history for {product_id} on {platform}:')
for price, date in cursor.fetchall():
bar = '#' * int(price / 5)
print(f' {date}: ${price:.2f} {bar}')
conn.close()
Tips for Production
- Rotate User-Agents: Keep a list of 10+ current browser UAs and randomize
- Respect robots.txt: Check each platform's terms of service
- Add exponential backoff: When you get blocked, wait longer before retrying
- Use a managed proxy service: ScrapeOps or similar services save you from the headache of proxy management
- Monitor success rates: Track how often your scrapes succeed per platform
- Run at varied times: Don't hit the same sites at exactly the same time every day
- Store raw HTML: Keep the original pages so you can re-parse if your selectors break
Conclusion
Building a cross-platform price tracker is a great project that combines web scraping, data storage, and automation. The biggest challenge isn't writing the scraping code — it's maintaining reliable access to these platforms over time.
Start with one platform, get it working reliably, then expand. Use proxy rotation from the start (even if you're tempted to skip it during development), and consider a managed service like ScrapeOps if you want to focus on the data rather than the infrastructure.
The full source code for this project is available in the examples above — just combine the classes into a single file and you're ready to go.
Building something cool with price data? Share it in the comments!
Top comments (0)