DEV Community

Brad
Brad

Posted on

Python Web Scraper: Extract Any Product Data at Scale

Python Web Scraper: Extract Any Product Data at Scale

Price monitoring, competitor research, lead generation — web scraping unlocks valuable data. Here's how to build a scalable scraper that handles pagination, rate limits, and anti-bot measures.

The Robust Scraper Base

import requests
import time
import random
from bs4 import BeautifulSoup
from typing import Generator, Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class WebScraper:
    """Production-ready web scraper with retry, rotation, and rate limiting."""

    USER_AGENTS = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
        'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36',
    ]

    def __init__(
        self,
        delay_range: tuple = (1, 3),
        max_retries: int = 3,
        timeout: int = 30
    ):
        self.delay_range = delay_range
        self.max_retries = max_retries
        self.timeout = timeout
        self.session = requests.Session()

    def _get_headers(self) -> dict:
        return {
            'User-Agent': random.choice(self.USER_AGENTS),
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'en-US,en;q=0.5',
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': 'keep-alive',
        }

    def get_page(self, url: str, params: dict = None) -> Optional[BeautifulSoup]:
        """Fetch a page with retry logic and rate limiting."""

        for attempt in range(self.max_retries):
            try:
                time.sleep(random.uniform(*self.delay_range))

                response = self.session.get(
                    url,
                    headers=self._get_headers(),
                    params=params,
                    timeout=self.timeout
                )

                if response.status_code == 429:
                    wait_time = int(response.headers.get('Retry-After', 60))
                    logger.warning(f"Rate limited. Waiting {wait_time}s...")
                    time.sleep(wait_time)
                    continue

                response.raise_for_status()
                return BeautifulSoup(response.text, 'html.parser')

            except requests.exceptions.RequestException as e:
                if attempt < self.max_retries - 1:
                    wait = 2 ** attempt
                    logger.warning(f"Request failed (attempt {attempt + 1}): {e}. Retrying in {wait}s...")
                    time.sleep(wait)
                else:
                    logger.error(f"Max retries exceeded for {url}: {e}")
                    return None

        return None
Enter fullscreen mode Exit fullscreen mode

Amazon Product Scraper

from dataclasses import dataclass
from typing import List, Optional
import re

@dataclass
class AmazonProduct:
    title: str
    price: Optional[float]
    rating: Optional[float]
    review_count: Optional[int]
    availability: str
    asin: str
    image_url: Optional[str]

class AmazonScraper(WebScraper):
    BASE_URL = "https://www.amazon.com"

    def __init__(self):
        super().__init__(delay_range=(2, 5))

    def scrape_product(self, asin: str) -> Optional[AmazonProduct]:
        """Scrape a single product page by ASIN."""

        url = f"{self.BASE_URL}/dp/{asin}"
        soup = self.get_page(url)

        if not soup:
            return None

        # Extract data
        title_elem = soup.find('span', id='productTitle')
        title = title_elem.get_text().strip() if title_elem else "N/A"

        price_elem = soup.find('span', class_='a-price-whole')
        if price_elem:
            price_text = price_elem.get_text().replace(',', '').strip()
            price = float(price_text) if price_text else None
        else:
            price = None

        rating_elem = soup.find('span', class_='a-icon-alt')
        rating = None
        if rating_elem:
            match = re.search(r'(\d+\.\d+)', rating_elem.get_text())
            rating = float(match.group(1)) if match else None

        review_elem = soup.find('span', id='acrCustomerReviewText')
        review_count = None
        if review_elem:
            match = re.search(r'([\d,]+)', review_elem.get_text())
            review_count = int(match.group(1).replace(',', '')) if match else None

        avail_elem = soup.find('div', id='availability')
        availability = avail_elem.get_text().strip() if avail_elem else "Unknown"

        img_elem = soup.find('img', id='landingImage')
        image_url = img_elem.get('src') if img_elem else None

        return AmazonProduct(
            title=title,
            price=price,
            rating=rating,
            review_count=review_count,
            availability=availability,
            asin=asin,
            image_url=image_url
        )

    def search_products(self, query: str, pages: int = 3) -> List[AmazonProduct]:
        """Search Amazon and return product results."""

        products = []

        for page in range(1, pages + 1):
            url = f"{self.BASE_URL}/s"
            soup = self.get_page(url, params={'k': query, 'page': page})

            if not soup:
                break

            # Extract ASINs from search results
            product_divs = soup.find_all('div', attrs={'data-asin': True})

            for div in product_divs:
                asin = div.get('data-asin')
                if asin:
                    product = self.scrape_product(asin)
                    if product:
                        products.append(product)

        return products

# Usage
scraper = AmazonScraper()
products = scraper.search_products("python programming book", pages=2)

for p in products:
    print(f"{p.title[:50]} | ${p.price} | ⭐ {p.rating} ({p.review_count} reviews)")
Enter fullscreen mode Exit fullscreen mode

Price Monitor with Alerts

import json
import smtplib
from datetime import datetime
from pathlib import Path

class PriceMonitor:
    def __init__(self, products_file: str = "monitored_products.json"):
        self.products_file = products_file
        self.scraper = AmazonScraper()
        self.products = self._load()

    def _load(self) -> dict:
        if Path(self.products_file).exists():
            with open(self.products_file, 'r') as f:
                return json.load(f)
        return {}

    def _save(self):
        with open(self.products_file, 'w') as f:
            json.dump(self.products, f, indent=2)

    def add_product(self, asin: str, target_price: float):
        self.products[asin] = {
            'target_price': target_price,
            'last_price': None,
            'price_history': []
        }
        self._save()

    def check_prices(self) -> list:
        """Check all monitored products and return alerts."""

        alerts = []

        for asin, data in self.products.items():
            product = self.scraper.scrape_product(asin)

            if not product or product.price is None:
                continue

            # Record price
            data['price_history'].append({
                'price': product.price,
                'date': datetime.now().isoformat()
            })
            data['last_price'] = product.price

            # Check if price hit target
            if product.price <= data['target_price']:
                alerts.append({
                    'asin': asin,
                    'title': product.title,
                    'current_price': product.price,
                    'target_price': data['target_price'],
                    'url': f"https://amazon.com/dp/{asin}"
                })

        self._save()
        return alerts

# Run daily price check
monitor = PriceMonitor()
monitor.add_product("B07PDHSJ1P", target_price=25.99)

alerts = monitor.check_prices()
for alert in alerts:
    print(f"🔔 PRICE DROP: {alert['title'][:40]}")
    print(f"   Current: ${alert['current_price']:.2f} (Target: ${alert['target_price']:.2f})")
    print(f"   Buy: {alert['url']}")
Enter fullscreen mode Exit fullscreen mode

Export to CSV/Database

import csv
import sqlite3

def save_to_csv(products: list, filename: str):
    """Save scraped products to CSV."""

    with open(filename, 'w', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=['asin', 'title', 'price', 'rating', 'review_count'])
        writer.writeheader()
        for p in products:
            writer.writerow({
                'asin': p.asin,
                'title': p.title,
                'price': p.price,
                'rating': p.rating,
                'review_count': p.review_count
            })
    print(f"Saved {len(products)} products to {filename}")

def save_to_sqlite(products: list, db_path: str):
    """Save scraped products to SQLite database."""

    conn = sqlite3.connect(db_path)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS products (
            asin TEXT PRIMARY KEY,
            title TEXT,
            price REAL,
            rating REAL,
            review_count INTEGER,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)

    for p in products:
        conn.execute(
            "INSERT OR REPLACE INTO products (asin, title, price, rating, review_count) VALUES (?, ?, ?, ?, ?)",
            (p.asin, p.title, p.price, p.rating, p.review_count)
        )

    conn.commit()
    conn.close()
    print(f"Saved {len(products)} products to {db_path}")
Enter fullscreen mode Exit fullscreen mode

Want a Complete Web Scraping Toolkit?

This is one of many scraping and automation tools I use daily.

👉 Get 50+ Python automation scripts — scrapers for Amazon, news sites, job boards + email automation, file organizers, report generators, and more.

Stop manually collecting data. Automate it once.

Top comments (0)