DEV Community

Chad Dower
Chad Dower

Posted on

Effective Web Scraping with Python: Building a Robust Data Pipeline for Price Monitoring

Why Price Monitoring Matters

The e-commerce landscape changes by the minute. Manual price tracking is not only tedious—it's impossible at scale. Automated price monitoring gives you superpowers:

Key benefits:

  • Track competitor pricing strategies in real-time
  • Identify pricing errors and arbitrage opportunities instantly
  • Analyze market trends and seasonal patterns automatically
  • Receive alerts when prices drop below thresholds
  • Scale monitoring across thousands of products effortlessly

But here's the catch: modern websites don't make it easy. They employ sophisticated anti-bot measures, serve content dynamically, and constantly change their HTML structure. That's why we need more than just a simple scraping script—we need a robust pipeline.

Prerequisites

Before we dive in, make sure you have:

  • Python 3.8+ installed with pip
  • Basic understanding of HTML/CSS selectors
  • Familiarity with HTTP requests and responses
  • Experience with Python classes and decorators
  • A PostgreSQL or SQLite database (we'll use SQLite for simplicity)

Setting Up Your Scraping Environment

Let's start by creating a well-structured project and installing the necessary tools.

Project Structure

price-monitor/
├── scrapers/
│   ├── __init__.py
│   ├── base_scraper.py
│   ├── amazon_scraper.py
│   └── walmart_scraper.py
├── pipeline/
│   ├── __init__.py
│   ├── cleaner.py
│   ├── storage.py
│   └── notifier.py
├── utils/
│   ├── __init__.py
│   ├── proxy_manager.py
│   └── user_agents.py
├── config.py
├── requirements.txt
└── main.py
Enter fullscreen mode Exit fullscreen mode

Installing Dependencies

# Create virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install required packages
pip install scrapy beautifulsoup4 requests selenium pandas sqlalchemy
pip install python-dotenv fake-useragent rotating-proxies schedule
pip install lxml html5lib cloudscraper
Enter fullscreen mode Exit fullscreen mode

Create your requirements.txt:

scrapy==2.11.0
beautifulsoup4==4.12.2
requests==2.31.0
selenium==4.15.0
pandas==2.1.3
sqlalchemy==2.0.23
python-dotenv==1.0.0
fake-useragent==1.4.0
cloudscraper==1.2.71
lxml==4.9.3
html5lib==1.1
schedule==1.2.0
Enter fullscreen mode Exit fullscreen mode

Building the Base Scraper Class

Every good scraping system starts with a solid foundation. Let's create a base scraper class that handles common functionality:

# scrapers/base_scraper.py
import time
import random
import logging
from abc import ABC, abstractmethod
from typing import Dict, List, Optional
import requests
from fake_useragent import UserAgent
from bs4 import BeautifulSoup
import cloudscraper

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class BaseScraper(ABC):
    """Abstract base class for all scrapers"""

    def __init__(self, use_cloudscraper: bool = False):
        """
        Initialize base scraper with session management

        Args:
            use_cloudscraper: Use cloudscraper for Cloudflare bypass
        """
        self.ua = UserAgent()

        if use_cloudscraper:
            self.session = cloudscraper.create_scraper()
        else:
            self.session = requests.Session()

        # Set default headers
        self.session.headers.update({
            'User-Agent': self.ua.random,
            'Accept-Language': 'en-US,en;q=0.9',
            'Accept-Encoding': 'gzip, deflate, br',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1'
        })

    def get_page(self, url: str, **kwargs) -> Optional[str]:
        """
        Fetch page content with retry logic and rate limiting

        Args:
            url: URL to scrape
            **kwargs: Additional requests parameters

        Returns:
            HTML content or None if failed
        """
        max_retries = 3
        retry_delay = 5

        for attempt in range(max_retries):
            try:
                # Random delay between requests (1-3 seconds)
                time.sleep(random.uniform(1, 3))

                # Rotate user agent for each request
                self.session.headers['User-Agent'] = self.ua.random

                response = self.session.get(url, timeout=10, **kwargs)
                response.raise_for_status()

                logger.info(f"Successfully fetched: {url}")
                return response.text

            except requests.exceptions.RequestException as e:
                logger.warning(f"Attempt {attempt + 1} failed for {url}: {str(e)}")

                if attempt < max_retries - 1:
                    time.sleep(retry_delay * (attempt + 1))
                else:
                    logger.error(f"Failed to fetch {url} after {max_retries} attempts")
                    return None

    def parse_html(self, html: str) -> BeautifulSoup:
        """Parse HTML content with BeautifulSoup"""
        return BeautifulSoup(html, 'lxml')

    @abstractmethod
    def extract_product_data(self, soup: BeautifulSoup, url: str) -> Dict:
        """
        Extract product data from parsed HTML

        Must be implemented by child classes
        """
        pass

    @abstractmethod
    def normalize_price(self, price_string: str) -> Optional[float]:
        """
        Convert price string to float

        Must be implemented by child classes
        """
        pass

    def scrape(self, url: str) -> Optional[Dict]:
        """
        Main scraping method

        Args:
            url: Product URL to scrape

        Returns:
            Extracted product data or None if failed
        """
        html = self.get_page(url)

        if not html:
            return None

        soup = self.parse_html(html)

        try:
            data = self.extract_product_data(soup, url)
            data['timestamp'] = time.time()
            data['url'] = url
            return data
        except Exception as e:
            logger.error(f"Failed to extract data from {url}: {str(e)}")
            return None
Enter fullscreen mode Exit fullscreen mode

Pro Tip: Always implement exponential backoff for retries. It reduces server load and increases your chances of successful scraping.

Implementing Platform-Specific Scrapers

Now let's create scrapers for specific e-commerce platforms. Each site has unique HTML structures and anti-bot measures.

Amazon Scraper

# scrapers/amazon_scraper.py
import re
from typing import Dict, Optional
from bs4 import BeautifulSoup
from .base_scraper import BaseScraper

class AmazonScraper(BaseScraper):
    """Scraper specifically for Amazon products"""

    def __init__(self):
        # Amazon often requires cloudscraper for Cloudflare bypass
        super().__init__(use_cloudscraper=True)

        # Amazon-specific headers
        self.session.headers.update({
            'Host': 'www.amazon.com',
            'Referer': 'https://www.amazon.com/',
        })

    def extract_product_data(self, soup: BeautifulSoup, url: str) -> Dict:
        """Extract product information from Amazon page"""

        data = {
            'platform': 'amazon',
            'product_id': self._extract_asin(url),
            'title': None,
            'price': None,
            'availability': None,
            'rating': None,
            'review_count': None,
            'image_url': None
        }

        # Extract title
        title_elem = soup.find('span', {'id': 'productTitle'})
        if title_elem:
            data['title'] = title_elem.text.strip()

        # Extract price - Amazon has multiple price selectors
        price_selectors = [
            'span.a-price-whole',
            'span#priceblock_dealprice',
            'span#priceblock_ourprice',
            'span.a-price.a-text-price.a-size-medium.apexPriceToPay',
            'span.a-price-range'
        ]

        for selector in price_selectors:
            price_elem = soup.select_one(selector)
            if price_elem:
                price_text = price_elem.text.strip()
                data['price'] = self.normalize_price(price_text)
                if data['price']:
                    break

        # Extract availability
        availability_elem = soup.find('div', {'id': 'availability'})
        if availability_elem:
            availability_text = availability_elem.text.strip()
            data['availability'] = 'in_stock' if 'in stock' in availability_text.lower() else 'out_of_stock'

        # Extract rating
        rating_elem = soup.find('span', {'class': 'a-icon-alt'})
        if rating_elem:
            rating_match = re.search(r'(\d+\.?\d*) out of', rating_elem.text)
            if rating_match:
                data['rating'] = float(rating_match.group(1))

        # Extract review count
        review_elem = soup.find('span', {'id': 'acrCustomerReviewText'})
        if review_elem:
            review_match = re.search(r'(\d+(?:,\d+)*)', review_elem.text)
            if review_match:
                data['review_count'] = int(review_match.group(1).replace(',', ''))

        # Extract main image
        image_elem = soup.find('img', {'id': 'landingImage'})
        if image_elem and 'src' in image_elem.attrs:
            data['image_url'] = image_elem['src']

        return data

    def normalize_price(self, price_string: str) -> Optional[float]:
        """Convert Amazon price string to float"""
        if not price_string:
            return None

        # Remove currency symbols and clean the string
        price_cleaned = re.sub(r'[^\d.,]', '', price_string)

        # Handle price ranges (take the lower price)
        if '-' in price_cleaned:
            price_cleaned = price_cleaned.split('-')[0].strip()

        # Convert to float
        try:
            # Remove thousands separator and convert
            price_cleaned = price_cleaned.replace(',', '')
            return float(price_cleaned)
        except ValueError:
            return None

    def _extract_asin(self, url: str) -> Optional[str]:
        """Extract ASIN from Amazon URL"""
        asin_match = re.search(r'/dp/([A-Z0-9]{10})', url)
        if asin_match:
            return asin_match.group(1)
        return None
Enter fullscreen mode Exit fullscreen mode

Walmart Scraper

# scrapers/walmart_scraper.py
import json
import re
from typing import Dict, Optional
from bs4 import BeautifulSoup
from .base_scraper import BaseScraper

class WalmartScraper(BaseScraper):
    """Scraper for Walmart products"""

    def __init__(self):
        super().__init__(use_cloudscraper=False)

        # Walmart-specific headers
        self.session.headers.update({
            'Host': 'www.walmart.com',
            'Referer': 'https://www.walmart.com/',
        })

    def extract_product_data(self, soup: BeautifulSoup, url: str) -> Dict:
        """Extract product information from Walmart page"""

        data = {
            'platform': 'walmart',
            'product_id': self._extract_product_id(url),
            'title': None,
            'price': None,
            'availability': None,
            'rating': None,
            'review_count': None,
            'image_url': None
        }

        # Walmart often stores data in JSON-LD scripts
        json_ld = soup.find('script', {'type': 'application/ld+json'})
        if json_ld:
            try:
                product_data = json.loads(json_ld.string)

                # Handle different JSON-LD structures
                if isinstance(product_data, list):
                    product_data = product_data[0]

                if 'name' in product_data:
                    data['title'] = product_data['name']

                if 'offers' in product_data:
                    offers = product_data['offers']
                    if 'price' in offers:
                        data['price'] = float(offers['price'])
                    if 'availability' in offers:
                        data['availability'] = 'in_stock' if 'InStock' in offers['availability'] else 'out_of_stock'

                if 'aggregateRating' in product_data:
                    rating = product_data['aggregateRating']
                    if 'ratingValue' in rating:
                        data['rating'] = float(rating['ratingValue'])
                    if 'reviewCount' in rating:
                        data['review_count'] = int(rating['reviewCount'])

                if 'image' in product_data:
                    data['image_url'] = product_data['image']

            except (json.JSONDecodeError, KeyError) as e:
                # Fall back to HTML parsing if JSON-LD fails
                pass

        # Fallback HTML parsing
        if not data['title']:
            title_elem = soup.find('h1', {'itemprop': 'name'})
            if title_elem:
                data['title'] = title_elem.text.strip()

        if not data['price']:
            price_elem = soup.find('span', {'itemprop': 'price'})
            if price_elem:
                data['price'] = self.normalize_price(price_elem.text)

        return data

    def normalize_price(self, price_string: str) -> Optional[float]:
        """Convert Walmart price string to float"""
        if not price_string:
            return None

        # Extract numeric value
        price_match = re.search(r'[\d,]+\.?\d*', price_string)
        if price_match:
            price_cleaned = price_match.group().replace(',', '')
            try:
                return float(price_cleaned)
            except ValueError:
                return None

        return None

    def _extract_product_id(self, url: str) -> Optional[str]:
        """Extract product ID from Walmart URL"""
        id_match = re.search(r'/(\d+)(?:\?|$)', url)
        if id_match:
            return id_match.group(1)
        return None
Enter fullscreen mode Exit fullscreen mode

⚠️ Warning: Always check a website's robots.txt and terms of service before scraping. Respect rate limits and consider reaching out to the website owner for API access if available.

Handling Dynamic Content with Selenium

Some websites load prices dynamically with JavaScript. For these cases, we need Selenium:

# scrapers/dynamic_scraper.py
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.common.exceptions import TimeoutException
import undetected_chromedriver as uc
from .base_scraper import BaseScraper

class DynamicScraper(BaseScraper):
    """Scraper for JavaScript-heavy websites using Selenium"""

    def __init__(self, headless: bool = True):
        super().__init__()
        self.headless = headless
        self.driver = None

    def _setup_driver(self):
        """Configure and create Chrome driver with anti-detection measures"""
        options = uc.ChromeOptions()

        if self.headless:
            options.add_argument('--headless')

        # Anti-detection configurations
        options.add_argument('--disable-blink-features=AutomationControlled')
        options.add_argument('--disable-dev-shm-usage')
        options.add_argument('--no-sandbox')
        options.add_argument('--disable-gpu')
        options.add_argument(f'user-agent={self.ua.random}')

        # Disable images for faster loading
        prefs = {"profile.managed_default_content_settings.images": 2}
        options.add_experimental_option("prefs", prefs)

        # Use undetected Chrome driver to bypass detection
        self.driver = uc.Chrome(options=options)

        # Execute script to remove webdriver property
        self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")

    def get_page(self, url: str, wait_selector: str = None, wait_time: int = 10) -> Optional[str]:
        """
        Fetch page content using Selenium

        Args:
            url: URL to scrape
            wait_selector: CSS selector to wait for before getting page source
            wait_time: Maximum time to wait for selector

        Returns:
            HTML content or None if failed
        """
        if not self.driver:
            self._setup_driver()

        try:
            self.driver.get(url)

            # Wait for specific element if selector provided
            if wait_selector:
                wait = WebDriverWait(self.driver, wait_time)
                wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, wait_selector)))

            # Get page source after JavaScript execution
            return self.driver.page_source

        except TimeoutException:
            logger.error(f"Timeout waiting for selector {wait_selector} on {url}")
            return None
        except Exception as e:
            logger.error(f"Error fetching {url} with Selenium: {str(e)}")
            return None

    def close(self):
        """Clean up driver resources"""
        if self.driver:
            self.driver.quit()
            self.driver = None

    def __del__(self):
        """Ensure driver is closed on deletion"""
        self.close()

# Example usage for a dynamic price site
class BestBuyScraper(DynamicScraper):
    """Scraper for Best Buy using Selenium for dynamic content"""

    def extract_product_data(self, soup: BeautifulSoup, url: str) -> Dict:
        """Extract product data from Best Buy page"""

        data = {
            'platform': 'bestbuy',
            'product_id': self._extract_sku(url),
            'title': None,
            'price': None,
            'availability': None,
            'rating': None,
            'review_count': None,
            'image_url': None
        }

        # Wait for price to load dynamically
        html = self.get_page(url, wait_selector='div.pricing-price__regular-price', wait_time=15)

        if not html:
            return data

        soup = self.parse_html(html)

        # Extract title
        title_elem = soup.find('h1', class_='sku-title')
        if title_elem:
            data['title'] = title_elem.text.strip()

        # Extract price
        price_elem = soup.find('div', class_='pricing-price__regular-price')
        if price_elem:
            data['price'] = self.normalize_price(price_elem.text)

        # Extract availability
        button_elem = soup.find('button', class_='add-to-cart-button')
        if button_elem:
            button_text = button_elem.text.lower()
            data['availability'] = 'in_stock' if 'add to cart' in button_text else 'out_of_stock'

        return data

    def normalize_price(self, price_string: str) -> Optional[float]:
        """Convert Best Buy price string to float"""
        if not price_string:
            return None

        # Remove currency symbols and clean
        price_cleaned = re.sub(r'[^\d.]', '', price_string)

        try:
            return float(price_cleaned)
        except ValueError:
            return None

    def _extract_sku(self, url: str) -> Optional[str]:
        """Extract SKU from Best Buy URL"""
        sku_match = re.search(r'skuId=(\d+)', url)
        if sku_match:
            return sku_match.group(1)
        return None
Enter fullscreen mode Exit fullscreen mode

Pro Tip: Use undetected-chromedriver instead of regular Selenium for sites with advanced bot detection. It patches Chrome to avoid detection flags.

Building the Data Pipeline

Now let's create a robust pipeline to process, store, and analyze the scraped data:

Data Cleaning and Validation

# pipeline/cleaner.py
import re
from typing import Dict, Optional, List
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

class DataCleaner:
    """Clean and validate scraped product data"""

    def __init__(self):
        self.required_fields = ['platform', 'product_id', 'title', 'price', 'timestamp', 'url']
        self.price_range = (0.01, 1000000)  # Reasonable price range

    def clean(self, data: Dict) -> Optional[Dict]:
        """
        Clean and validate product data

        Args:
            data: Raw product data from scraper

        Returns:
            Cleaned data or None if invalid
        """
        if not self._validate_required_fields(data):
            return None

        cleaned = {
            'platform': self._clean_platform(data.get('platform')),
            'product_id': self._clean_product_id(data.get('product_id')),
            'title': self._clean_title(data.get('title')),
            'price': self._validate_price(data.get('price')),
            'original_price': data.get('original_price'),
            'discount_percentage': None,
            'availability': self._clean_availability(data.get('availability')),
            'rating': self._validate_rating(data.get('rating')),
            'review_count': self._validate_review_count(data.get('review_count')),
            'image_url': self._clean_url(data.get('image_url')),
            'url': self._clean_url(data.get('url')),
            'timestamp': datetime.fromtimestamp(data.get('timestamp', 0)),
            'scraped_at': datetime.utcnow()
        }

        # Calculate discount if original price exists
        if cleaned['original_price'] and cleaned['price']:
            discount = (cleaned['original_price'] - cleaned['price']) / cleaned['original_price'] * 100
            cleaned['discount_percentage'] = round(discount, 2)

        return cleaned

    def _validate_required_fields(self, data: Dict) -> bool:
        """Check if all required fields are present"""
        for field in self.required_fields:
            if field not in data or data[field] is None:
                logger.warning(f"Missing required field: {field}")
                return False
        return True

    def _clean_platform(self, platform: str) -> str:
        """Normalize platform name"""
        if not platform:
            return 'unknown'
        return platform.lower().strip()

    def _clean_product_id(self, product_id: str) -> str:
        """Clean product ID"""
        if not product_id:
            return 'unknown'
        # Remove special characters except alphanumeric and hyphens
        return re.sub(r'[^a-zA-Z0-9\-_]', '', str(product_id))

    def _clean_title(self, title: str) -> str:
        """Clean product title"""
        if not title:
            return 'Unknown Product'

        # Remove extra whitespace
        title = ' '.join(title.split())

        # Truncate if too long
        if len(title) > 500:
            title = title[:497] + '...'

        return title

    def _validate_price(self, price: float) -> Optional[float]:
        """Validate price is within reasonable range"""
        if price is None:
            return None

        try:
            price_float = float(price)

            # Check if price is within reasonable range
            if self.price_range[0] <= price_float <= self.price_range[1]:
                return round(price_float, 2)
            else:
                logger.warning(f"Price {price_float} outside valid range")
                return None
        except (ValueError, TypeError):
            logger.warning(f"Invalid price format: {price}")
            return None

    def _clean_availability(self, availability: str) -> str:
        """Normalize availability status"""
        if not availability:
            return 'unknown'

        availability_lower = availability.lower().strip()

        if any(term in availability_lower for term in ['in stock', 'available', 'in_stock']):
            return 'in_stock'
        elif any(term in availability_lower for term in ['out of stock', 'unavailable', 'out_of_stock']):
            return 'out_of_stock'
        else:
            return 'unknown'

    def _validate_rating(self, rating: float) -> Optional[float]:
        """Validate rating is between 0 and 5"""
        if rating is None:
            return None

        try:
            rating_float = float(rating)
            if 0 <= rating_float <= 5:
                return round(rating_float, 2)
            else:
                return None
        except (ValueError, TypeError):
            return None

    def _validate_review_count(self, review_count: int) -> Optional[int]:
        """Validate review count is positive integer"""
        if review_count is None:
            return None

        try:
            count = int(review_count)
            return count if count >= 0 else None
        except (ValueError, TypeError):
            return None

    def _clean_url(self, url: str) -> Optional[str]:
        """Validate and clean URL"""
        if not url:
            return None

        # Basic URL validation
        if url.startswith(('http://', 'https://')):
            return url.strip()
        return None
Enter fullscreen mode Exit fullscreen mode

Database Storage

# pipeline/storage.py
from sqlalchemy import create_engine, Column, String, Float, Integer, DateTime, Boolean, Index
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from contextlib import contextmanager
from typing import List, Dict, Optional
from datetime import datetime, timedelta
import logging

Base = declarative_base()
logger = logging.getLogger(__name__)

class Product(Base):
    """Product model for database storage"""
    __tablename__ = 'products'

    id = Column(Integer, primary_key=True)
    platform = Column(String(50), nullable=False)
    product_id = Column(String(100), nullable=False)
    title = Column(String(500), nullable=False)
    url = Column(String(1000), nullable=False)
    image_url = Column(String(1000))

    # Create composite index for platform and product_id
    __table_args__ = (
        Index('ix_platform_product', 'platform', 'product_id'),
    )

class PriceHistory(Base):
    """Price history model for tracking changes"""
    __tablename__ = 'price_history'

    id = Column(Integer, primary_key=True)
    platform = Column(String(50), nullable=False)
    product_id = Column(String(100), nullable=False)
    price = Column(Float, nullable=False)
    original_price = Column(Float)
    discount_percentage = Column(Float)
    availability = Column(String(20))
    rating = Column(Float)
    review_count = Column(Integer)
    timestamp = Column(DateTime, nullable=False)
    scraped_at = Column(DateTime, default=datetime.utcnow)

    # Index for efficient queries
    __table_args__ = (
        Index('ix_product_timestamp', 'platform', 'product_id', 'timestamp'),
    )

class PriceAlert(Base):
    """Price alert configuration"""
    __tablename__ = 'price_alerts'

    id = Column(Integer, primary_key=True)
    platform = Column(String(50), nullable=False)
    product_id = Column(String(100), nullable=False)
    target_price = Column(Float, nullable=False)
    alert_email = Column(String(200))
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    last_triggered = Column(DateTime)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)