Why Price Monitoring Matters
The e-commerce landscape changes by the minute. Manual price tracking is not only tedious—it's impossible at scale. Automated price monitoring gives you superpowers:
Key benefits:
- Track competitor pricing strategies in real-time
- Identify pricing errors and arbitrage opportunities instantly
- Analyze market trends and seasonal patterns automatically
- Receive alerts when prices drop below thresholds
- Scale monitoring across thousands of products effortlessly
But here's the catch: modern websites don't make it easy. They employ sophisticated anti-bot measures, serve content dynamically, and constantly change their HTML structure. That's why we need more than just a simple scraping script—we need a robust pipeline.
Prerequisites
Before we dive in, make sure you have:
- Python 3.8+ installed with pip
- Basic understanding of HTML/CSS selectors
- Familiarity with HTTP requests and responses
- Experience with Python classes and decorators
- A PostgreSQL or SQLite database (we'll use SQLite for simplicity)
Setting Up Your Scraping Environment
Let's start by creating a well-structured project and installing the necessary tools.
Project Structure
price-monitor/
├── scrapers/
│ ├── __init__.py
│ ├── base_scraper.py
│ ├── amazon_scraper.py
│ └── walmart_scraper.py
├── pipeline/
│ ├── __init__.py
│ ├── cleaner.py
│ ├── storage.py
│ └── notifier.py
├── utils/
│ ├── __init__.py
│ ├── proxy_manager.py
│ └── user_agents.py
├── config.py
├── requirements.txt
└── main.py
Installing Dependencies
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install required packages
pip install scrapy beautifulsoup4 requests selenium pandas sqlalchemy
pip install python-dotenv fake-useragent rotating-proxies schedule
pip install lxml html5lib cloudscraper
Create your requirements.txt
:
scrapy==2.11.0
beautifulsoup4==4.12.2
requests==2.31.0
selenium==4.15.0
pandas==2.1.3
sqlalchemy==2.0.23
python-dotenv==1.0.0
fake-useragent==1.4.0
cloudscraper==1.2.71
lxml==4.9.3
html5lib==1.1
schedule==1.2.0
Building the Base Scraper Class
Every good scraping system starts with a solid foundation. Let's create a base scraper class that handles common functionality:
# scrapers/base_scraper.py
import time
import random
import logging
from abc import ABC, abstractmethod
from typing import Dict, List, Optional
import requests
from fake_useragent import UserAgent
from bs4 import BeautifulSoup
import cloudscraper
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class BaseScraper(ABC):
"""Abstract base class for all scrapers"""
def __init__(self, use_cloudscraper: bool = False):
"""
Initialize base scraper with session management
Args:
use_cloudscraper: Use cloudscraper for Cloudflare bypass
"""
self.ua = UserAgent()
if use_cloudscraper:
self.session = cloudscraper.create_scraper()
else:
self.session = requests.Session()
# Set default headers
self.session.headers.update({
'User-Agent': self.ua.random,
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
})
def get_page(self, url: str, **kwargs) -> Optional[str]:
"""
Fetch page content with retry logic and rate limiting
Args:
url: URL to scrape
**kwargs: Additional requests parameters
Returns:
HTML content or None if failed
"""
max_retries = 3
retry_delay = 5
for attempt in range(max_retries):
try:
# Random delay between requests (1-3 seconds)
time.sleep(random.uniform(1, 3))
# Rotate user agent for each request
self.session.headers['User-Agent'] = self.ua.random
response = self.session.get(url, timeout=10, **kwargs)
response.raise_for_status()
logger.info(f"Successfully fetched: {url}")
return response.text
except requests.exceptions.RequestException as e:
logger.warning(f"Attempt {attempt + 1} failed for {url}: {str(e)}")
if attempt < max_retries - 1:
time.sleep(retry_delay * (attempt + 1))
else:
logger.error(f"Failed to fetch {url} after {max_retries} attempts")
return None
def parse_html(self, html: str) -> BeautifulSoup:
"""Parse HTML content with BeautifulSoup"""
return BeautifulSoup(html, 'lxml')
@abstractmethod
def extract_product_data(self, soup: BeautifulSoup, url: str) -> Dict:
"""
Extract product data from parsed HTML
Must be implemented by child classes
"""
pass
@abstractmethod
def normalize_price(self, price_string: str) -> Optional[float]:
"""
Convert price string to float
Must be implemented by child classes
"""
pass
def scrape(self, url: str) -> Optional[Dict]:
"""
Main scraping method
Args:
url: Product URL to scrape
Returns:
Extracted product data or None if failed
"""
html = self.get_page(url)
if not html:
return None
soup = self.parse_html(html)
try:
data = self.extract_product_data(soup, url)
data['timestamp'] = time.time()
data['url'] = url
return data
except Exception as e:
logger.error(f"Failed to extract data from {url}: {str(e)}")
return None
Pro Tip: Always implement exponential backoff for retries. It reduces server load and increases your chances of successful scraping.
Implementing Platform-Specific Scrapers
Now let's create scrapers for specific e-commerce platforms. Each site has unique HTML structures and anti-bot measures.
Amazon Scraper
# scrapers/amazon_scraper.py
import re
from typing import Dict, Optional
from bs4 import BeautifulSoup
from .base_scraper import BaseScraper
class AmazonScraper(BaseScraper):
"""Scraper specifically for Amazon products"""
def __init__(self):
# Amazon often requires cloudscraper for Cloudflare bypass
super().__init__(use_cloudscraper=True)
# Amazon-specific headers
self.session.headers.update({
'Host': 'www.amazon.com',
'Referer': 'https://www.amazon.com/',
})
def extract_product_data(self, soup: BeautifulSoup, url: str) -> Dict:
"""Extract product information from Amazon page"""
data = {
'platform': 'amazon',
'product_id': self._extract_asin(url),
'title': None,
'price': None,
'availability': None,
'rating': None,
'review_count': None,
'image_url': None
}
# Extract title
title_elem = soup.find('span', {'id': 'productTitle'})
if title_elem:
data['title'] = title_elem.text.strip()
# Extract price - Amazon has multiple price selectors
price_selectors = [
'span.a-price-whole',
'span#priceblock_dealprice',
'span#priceblock_ourprice',
'span.a-price.a-text-price.a-size-medium.apexPriceToPay',
'span.a-price-range'
]
for selector in price_selectors:
price_elem = soup.select_one(selector)
if price_elem:
price_text = price_elem.text.strip()
data['price'] = self.normalize_price(price_text)
if data['price']:
break
# Extract availability
availability_elem = soup.find('div', {'id': 'availability'})
if availability_elem:
availability_text = availability_elem.text.strip()
data['availability'] = 'in_stock' if 'in stock' in availability_text.lower() else 'out_of_stock'
# Extract rating
rating_elem = soup.find('span', {'class': 'a-icon-alt'})
if rating_elem:
rating_match = re.search(r'(\d+\.?\d*) out of', rating_elem.text)
if rating_match:
data['rating'] = float(rating_match.group(1))
# Extract review count
review_elem = soup.find('span', {'id': 'acrCustomerReviewText'})
if review_elem:
review_match = re.search(r'(\d+(?:,\d+)*)', review_elem.text)
if review_match:
data['review_count'] = int(review_match.group(1).replace(',', ''))
# Extract main image
image_elem = soup.find('img', {'id': 'landingImage'})
if image_elem and 'src' in image_elem.attrs:
data['image_url'] = image_elem['src']
return data
def normalize_price(self, price_string: str) -> Optional[float]:
"""Convert Amazon price string to float"""
if not price_string:
return None
# Remove currency symbols and clean the string
price_cleaned = re.sub(r'[^\d.,]', '', price_string)
# Handle price ranges (take the lower price)
if '-' in price_cleaned:
price_cleaned = price_cleaned.split('-')[0].strip()
# Convert to float
try:
# Remove thousands separator and convert
price_cleaned = price_cleaned.replace(',', '')
return float(price_cleaned)
except ValueError:
return None
def _extract_asin(self, url: str) -> Optional[str]:
"""Extract ASIN from Amazon URL"""
asin_match = re.search(r'/dp/([A-Z0-9]{10})', url)
if asin_match:
return asin_match.group(1)
return None
Walmart Scraper
# scrapers/walmart_scraper.py
import json
import re
from typing import Dict, Optional
from bs4 import BeautifulSoup
from .base_scraper import BaseScraper
class WalmartScraper(BaseScraper):
"""Scraper for Walmart products"""
def __init__(self):
super().__init__(use_cloudscraper=False)
# Walmart-specific headers
self.session.headers.update({
'Host': 'www.walmart.com',
'Referer': 'https://www.walmart.com/',
})
def extract_product_data(self, soup: BeautifulSoup, url: str) -> Dict:
"""Extract product information from Walmart page"""
data = {
'platform': 'walmart',
'product_id': self._extract_product_id(url),
'title': None,
'price': None,
'availability': None,
'rating': None,
'review_count': None,
'image_url': None
}
# Walmart often stores data in JSON-LD scripts
json_ld = soup.find('script', {'type': 'application/ld+json'})
if json_ld:
try:
product_data = json.loads(json_ld.string)
# Handle different JSON-LD structures
if isinstance(product_data, list):
product_data = product_data[0]
if 'name' in product_data:
data['title'] = product_data['name']
if 'offers' in product_data:
offers = product_data['offers']
if 'price' in offers:
data['price'] = float(offers['price'])
if 'availability' in offers:
data['availability'] = 'in_stock' if 'InStock' in offers['availability'] else 'out_of_stock'
if 'aggregateRating' in product_data:
rating = product_data['aggregateRating']
if 'ratingValue' in rating:
data['rating'] = float(rating['ratingValue'])
if 'reviewCount' in rating:
data['review_count'] = int(rating['reviewCount'])
if 'image' in product_data:
data['image_url'] = product_data['image']
except (json.JSONDecodeError, KeyError) as e:
# Fall back to HTML parsing if JSON-LD fails
pass
# Fallback HTML parsing
if not data['title']:
title_elem = soup.find('h1', {'itemprop': 'name'})
if title_elem:
data['title'] = title_elem.text.strip()
if not data['price']:
price_elem = soup.find('span', {'itemprop': 'price'})
if price_elem:
data['price'] = self.normalize_price(price_elem.text)
return data
def normalize_price(self, price_string: str) -> Optional[float]:
"""Convert Walmart price string to float"""
if not price_string:
return None
# Extract numeric value
price_match = re.search(r'[\d,]+\.?\d*', price_string)
if price_match:
price_cleaned = price_match.group().replace(',', '')
try:
return float(price_cleaned)
except ValueError:
return None
return None
def _extract_product_id(self, url: str) -> Optional[str]:
"""Extract product ID from Walmart URL"""
id_match = re.search(r'/(\d+)(?:\?|$)', url)
if id_match:
return id_match.group(1)
return None
⚠️ Warning: Always check a website's robots.txt and terms of service before scraping. Respect rate limits and consider reaching out to the website owner for API access if available.
Handling Dynamic Content with Selenium
Some websites load prices dynamically with JavaScript. For these cases, we need Selenium:
# scrapers/dynamic_scraper.py
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException
import undetected_chromedriver as uc
from .base_scraper import BaseScraper
class DynamicScraper(BaseScraper):
"""Scraper for JavaScript-heavy websites using Selenium"""
def __init__(self, headless: bool = True):
super().__init__()
self.headless = headless
self.driver = None
def _setup_driver(self):
"""Configure and create Chrome driver with anti-detection measures"""
options = uc.ChromeOptions()
if self.headless:
options.add_argument('--headless')
# Anti-detection configurations
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--no-sandbox')
options.add_argument('--disable-gpu')
options.add_argument(f'user-agent={self.ua.random}')
# Disable images for faster loading
prefs = {"profile.managed_default_content_settings.images": 2}
options.add_experimental_option("prefs", prefs)
# Use undetected Chrome driver to bypass detection
self.driver = uc.Chrome(options=options)
# Execute script to remove webdriver property
self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
def get_page(self, url: str, wait_selector: str = None, wait_time: int = 10) -> Optional[str]:
"""
Fetch page content using Selenium
Args:
url: URL to scrape
wait_selector: CSS selector to wait for before getting page source
wait_time: Maximum time to wait for selector
Returns:
HTML content or None if failed
"""
if not self.driver:
self._setup_driver()
try:
self.driver.get(url)
# Wait for specific element if selector provided
if wait_selector:
wait = WebDriverWait(self.driver, wait_time)
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, wait_selector)))
# Get page source after JavaScript execution
return self.driver.page_source
except TimeoutException:
logger.error(f"Timeout waiting for selector {wait_selector} on {url}")
return None
except Exception as e:
logger.error(f"Error fetching {url} with Selenium: {str(e)}")
return None
def close(self):
"""Clean up driver resources"""
if self.driver:
self.driver.quit()
self.driver = None
def __del__(self):
"""Ensure driver is closed on deletion"""
self.close()
# Example usage for a dynamic price site
class BestBuyScraper(DynamicScraper):
"""Scraper for Best Buy using Selenium for dynamic content"""
def extract_product_data(self, soup: BeautifulSoup, url: str) -> Dict:
"""Extract product data from Best Buy page"""
data = {
'platform': 'bestbuy',
'product_id': self._extract_sku(url),
'title': None,
'price': None,
'availability': None,
'rating': None,
'review_count': None,
'image_url': None
}
# Wait for price to load dynamically
html = self.get_page(url, wait_selector='div.pricing-price__regular-price', wait_time=15)
if not html:
return data
soup = self.parse_html(html)
# Extract title
title_elem = soup.find('h1', class_='sku-title')
if title_elem:
data['title'] = title_elem.text.strip()
# Extract price
price_elem = soup.find('div', class_='pricing-price__regular-price')
if price_elem:
data['price'] = self.normalize_price(price_elem.text)
# Extract availability
button_elem = soup.find('button', class_='add-to-cart-button')
if button_elem:
button_text = button_elem.text.lower()
data['availability'] = 'in_stock' if 'add to cart' in button_text else 'out_of_stock'
return data
def normalize_price(self, price_string: str) -> Optional[float]:
"""Convert Best Buy price string to float"""
if not price_string:
return None
# Remove currency symbols and clean
price_cleaned = re.sub(r'[^\d.]', '', price_string)
try:
return float(price_cleaned)
except ValueError:
return None
def _extract_sku(self, url: str) -> Optional[str]:
"""Extract SKU from Best Buy URL"""
sku_match = re.search(r'skuId=(\d+)', url)
if sku_match:
return sku_match.group(1)
return None
Pro Tip: Use undetected-chromedriver instead of regular Selenium for sites with advanced bot detection. It patches Chrome to avoid detection flags.
Building the Data Pipeline
Now let's create a robust pipeline to process, store, and analyze the scraped data:
Data Cleaning and Validation
# pipeline/cleaner.py
import re
from typing import Dict, Optional, List
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class DataCleaner:
"""Clean and validate scraped product data"""
def __init__(self):
self.required_fields = ['platform', 'product_id', 'title', 'price', 'timestamp', 'url']
self.price_range = (0.01, 1000000) # Reasonable price range
def clean(self, data: Dict) -> Optional[Dict]:
"""
Clean and validate product data
Args:
data: Raw product data from scraper
Returns:
Cleaned data or None if invalid
"""
if not self._validate_required_fields(data):
return None
cleaned = {
'platform': self._clean_platform(data.get('platform')),
'product_id': self._clean_product_id(data.get('product_id')),
'title': self._clean_title(data.get('title')),
'price': self._validate_price(data.get('price')),
'original_price': data.get('original_price'),
'discount_percentage': None,
'availability': self._clean_availability(data.get('availability')),
'rating': self._validate_rating(data.get('rating')),
'review_count': self._validate_review_count(data.get('review_count')),
'image_url': self._clean_url(data.get('image_url')),
'url': self._clean_url(data.get('url')),
'timestamp': datetime.fromtimestamp(data.get('timestamp', 0)),
'scraped_at': datetime.utcnow()
}
# Calculate discount if original price exists
if cleaned['original_price'] and cleaned['price']:
discount = (cleaned['original_price'] - cleaned['price']) / cleaned['original_price'] * 100
cleaned['discount_percentage'] = round(discount, 2)
return cleaned
def _validate_required_fields(self, data: Dict) -> bool:
"""Check if all required fields are present"""
for field in self.required_fields:
if field not in data or data[field] is None:
logger.warning(f"Missing required field: {field}")
return False
return True
def _clean_platform(self, platform: str) -> str:
"""Normalize platform name"""
if not platform:
return 'unknown'
return platform.lower().strip()
def _clean_product_id(self, product_id: str) -> str:
"""Clean product ID"""
if not product_id:
return 'unknown'
# Remove special characters except alphanumeric and hyphens
return re.sub(r'[^a-zA-Z0-9\-_]', '', str(product_id))
def _clean_title(self, title: str) -> str:
"""Clean product title"""
if not title:
return 'Unknown Product'
# Remove extra whitespace
title = ' '.join(title.split())
# Truncate if too long
if len(title) > 500:
title = title[:497] + '...'
return title
def _validate_price(self, price: float) -> Optional[float]:
"""Validate price is within reasonable range"""
if price is None:
return None
try:
price_float = float(price)
# Check if price is within reasonable range
if self.price_range[0] <= price_float <= self.price_range[1]:
return round(price_float, 2)
else:
logger.warning(f"Price {price_float} outside valid range")
return None
except (ValueError, TypeError):
logger.warning(f"Invalid price format: {price}")
return None
def _clean_availability(self, availability: str) -> str:
"""Normalize availability status"""
if not availability:
return 'unknown'
availability_lower = availability.lower().strip()
if any(term in availability_lower for term in ['in stock', 'available', 'in_stock']):
return 'in_stock'
elif any(term in availability_lower for term in ['out of stock', 'unavailable', 'out_of_stock']):
return 'out_of_stock'
else:
return 'unknown'
def _validate_rating(self, rating: float) -> Optional[float]:
"""Validate rating is between 0 and 5"""
if rating is None:
return None
try:
rating_float = float(rating)
if 0 <= rating_float <= 5:
return round(rating_float, 2)
else:
return None
except (ValueError, TypeError):
return None
def _validate_review_count(self, review_count: int) -> Optional[int]:
"""Validate review count is positive integer"""
if review_count is None:
return None
try:
count = int(review_count)
return count if count >= 0 else None
except (ValueError, TypeError):
return None
def _clean_url(self, url: str) -> Optional[str]:
"""Validate and clean URL"""
if not url:
return None
# Basic URL validation
if url.startswith(('http://', 'https://')):
return url.strip()
return None
Database Storage
# pipeline/storage.py
from sqlalchemy import create_engine, Column, String, Float, Integer, DateTime, Boolean, Index
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from contextlib import contextmanager
from typing import List, Dict, Optional
from datetime import datetime, timedelta
import logging
Base = declarative_base()
logger = logging.getLogger(__name__)
class Product(Base):
"""Product model for database storage"""
__tablename__ = 'products'
id = Column(Integer, primary_key=True)
platform = Column(String(50), nullable=False)
product_id = Column(String(100), nullable=False)
title = Column(String(500), nullable=False)
url = Column(String(1000), nullable=False)
image_url = Column(String(1000))
# Create composite index for platform and product_id
__table_args__ = (
Index('ix_platform_product', 'platform', 'product_id'),
)
class PriceHistory(Base):
"""Price history model for tracking changes"""
__tablename__ = 'price_history'
id = Column(Integer, primary_key=True)
platform = Column(String(50), nullable=False)
product_id = Column(String(100), nullable=False)
price = Column(Float, nullable=False)
original_price = Column(Float)
discount_percentage = Column(Float)
availability = Column(String(20))
rating = Column(Float)
review_count = Column(Integer)
timestamp = Column(DateTime, nullable=False)
scraped_at = Column(DateTime, default=datetime.utcnow)
# Index for efficient queries
__table_args__ = (
Index('ix_product_timestamp', 'platform', 'product_id', 'timestamp'),
)
class PriceAlert(Base):
"""Price alert configuration"""
__tablename__ = 'price_alerts'
id = Column(Integer, primary_key=True)
platform = Column(String(50), nullable=False)
product_id = Column(String(100), nullable=False)
target_price = Column(Float, nullable=False)
alert_email = Column(String(200))
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
last_triggered = Column(DateTime)
Top comments (0)