DEV Community

Alex Chen
Alex Chen

Posted on

Building an ETL Pipeline for Web Scraping: From CAPTCHA-Protected Pages to Clean Data

Most scraping tutorials stop at "get the HTML." But in production, getting the HTML is just step one. You need to detect CAPTCHAs, solve them, extract structured data, validate it, handle duplicates, and load it into a database — reliably, at scale.

This is an ETL (Extract, Transform, Load) pipeline. Let's build one.

The Architecture

┌─────────────┐    ┌──────────────┐    ┌────────────┐
│  URL Queue   │───▶│  Fetcher     │───▶│  CAPTCHA   │
│  (Redis)     │    │  (aiohttp)   │    │  Detector  │
└─────────────┘    └──────────────┘    └────────────┘
                                             │
                              ┌──────────────┘
                              ▼
                    ┌──────────────┐    ┌────────────┐
                    │  CAPTCHA     │───▶│  Parser    │
                    │  Solver      │    │  (BS4)     │
                    └──────────────┘    └────────────┘
                                             │
                              ┌──────────────┘
                              ▼
                    ┌──────────────┐    ┌────────────┐
                    │  Validator   │───▶│  Deduper   │
                    │  (Pydantic)  │    │  (bloom)   │
                    └──────────────┘    └────────────┘
                                             │
                              ┌──────────────┘
                              ▼
                    ┌──────────────┐
                    │  Loader      │
                    │  (Postgres)  │
                    └──────────────┘
Enter fullscreen mode Exit fullscreen mode

Step 1: Define Your Data Model

Start with what you want to end up with:

# models.py
from pydantic import BaseModel, HttpUrl, field_validator
from datetime import datetime
from decimal import Decimal

class Product(BaseModel):
    url: HttpUrl
    title: str
    price: Decimal
    currency: str = "USD"
    in_stock: bool
    description: str | None = None
    image_url: HttpUrl | None = None
    scraped_at: datetime
    source_domain: str

    @field_validator("title")    @classmethod    def title_not_empty(cls, v):
        v = v.strip()
        if len(v) < 3:
            raise ValueError("Title too short")
        return v

    @field_validator("price")    @classmethod    def price_positive(cls, v):
        if v <= 0:
            raise ValueError("Price must be positive")
        return v

class ScrapedPage(BaseModel):
    url: str
    html: str
    status_code: int
    captcha_solved: bool = False
    captcha_type: str | None = None
    fetch_time: float
    timestamp: datetime
Enter fullscreen mode Exit fullscreen mode

Step 2: The Fetcher with CAPTCHA Handling

# fetcher.py
import aiohttp
import asyncio
import time
from dataclasses import dataclass, field

@dataclassclass FetchResult:
    url: str
    html: str = ""
    status: int = 0
    captcha_type: str | None = None
    captcha_solved: bool = False
    error: str | None = None
    fetch_time: float = 0

class Fetcher:
    def __init__(
        self,
        max_concurrent: int = 20,
        captcha_solver = None,
    ):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.solver = captcha_solver
        self.stats = {
            "fetched": 0, "captchas": 0, "errors": 0
        }

    async def fetch(
        self, 
        session: aiohttp.ClientSession,
        url: str
    ) -> FetchResult:
        async with self.semaphore:
            start = time.monotonic()
            try:
                async with session.get(
                    url, timeout=aiohttp.ClientTimeout(total=30)
                ) as resp:
                    html = await resp.text()
                    fetch_time = time.monotonic() - start

                    # Detect CAPTCHA
                    captcha = detect_captcha(html, url)

                    if captcha and self.solver:
                        token = await self.solver.solve(
                            captcha_type=captcha["type"],
                            sitekey=captcha["sitekey"],
                            url=url
                        )

                        if token:
                            # Resubmit with token
                            async with session.post(
                                url,
                                data={
                                    captcha["field"]: token
                                }
                            ) as resp2:
                                html = await resp2.text()
                                self.stats["captchas"] += 1

                    self.stats["fetched"] += 1
                    return FetchResult(
                        url=url,
                        html=html,
                        status=resp.status,
                        captcha_type=(
                            captcha["type"] if captcha 
                            else None
                        ),
                        captcha_solved=bool(captcha),
                        fetch_time=fetch_time,
                    )

            except Exception as e:
                self.stats["errors"] += 1
                return FetchResult(
                    url=url,
                    error=str(e),
                    fetch_time=time.monotonic() - start,
                )


def detect_captcha(html: str, url: str) -> dict | None:
    """Detect CAPTCHA type and extract sitekey."""
    import re

    patterns = {
        "recaptcha_v2": {
            "regex": r'class="g-recaptcha"[^>]*data-sitekey="([^"]+)"',
            "field": "g-recaptcha-response",
        },
        "hcaptcha": {
            "regex": r'class="h-captcha"[^>]*data-sitekey="([^"]+)"',
            "field": "h-captcha-response",
        },
        "turnstile": {
            "regex": r'class="cf-turnstile"[^>]*data-sitekey="([^"]+)"',
            "field": "cf-turnstile-response",
        },
    }

    for ctype, config in patterns.items():
        match = re.search(config["regex"], html)
        if match:
            return {
                "type": ctype,
                "sitekey": match.group(1),
                "field": config["field"],
            }

    return None
Enter fullscreen mode Exit fullscreen mode

Step 3: The Parser

Extract structured data from raw HTML:

# parser.py
from bs4 import BeautifulSoup
from decimal import Decimal, InvalidOperation
from datetime import datetime
import re

class ProductParser:
    """Parse product data from HTML.
    Override for different site structures."""

    def parse(self, html: str, url: str) -> dict | None:
        soup = BeautifulSoup(html, "lxml")

        try:
            return {
                "url": url,
                "title": self._extract_title(soup),
                "price": self._extract_price(soup),
                "currency": self._extract_currency(soup),
                "in_stock": self._extract_stock(soup),
                "description": self._extract_description(soup),
                "image_url": self._extract_image(soup),
                "scraped_at": datetime.utcnow(),
                "source_domain": self._get_domain(url),
            }
        except Exception as e:
            print(f"Parse error for {url}: {e}")
            return None

    def _extract_title(self, soup) -> str:
        selectors = [
            "h1.product-title",
            "h1[itemprop='name']",
            ".product-name h1",
            "h1",
        ]
        for sel in selectors:
            el = soup.select_one(sel)
            if el and el.text.strip():
                return el.text.strip()
        raise ValueError("No title found")

    def _extract_price(self, soup) -> Decimal:
        selectors = [
            "[itemprop='price']",
            ".price-current",
            ".product-price",
            "[data-price]",
        ]
        for sel in selectors:
            el = soup.select_one(sel)
            if el:
                # Try content attribute first
                price_str = (
                    el.get("content") 
                    or el.get("data-price") 
                    or el.text
                )
                price_str = re.sub(
                    r'[^\d.]', '', price_str
                )
                try:
                    return Decimal(price_str)
                except InvalidOperation:
                    continue
        raise ValueError("No price found")

    def _extract_stock(self, soup) -> bool:
        # Check for "out of stock" indicators
        oos_indicators = [
            ".out-of-stock",
            "[data-availability='OutOfStock']",
        ]
        for sel in oos_indicators:
            if soup.select_one(sel):
                return False
        return True

    def _extract_description(self, soup) -> str | None:
        el = soup.select_one(
            "[itemprop='description'], "
            ".product-description"
        )
        return el.text.strip()[:500] if el else None

    def _extract_image(self, soup) -> str | None:
        el = soup.select_one(
            "[itemprop='image'], "
            ".product-image img"
        )
        return el.get("src") if el else None

    def _extract_currency(self, soup) -> str:
        el = soup.select_one(
            "[itemprop='priceCurrency']"
        )
        return el.get("content", "USD") if el else "USD"

    def _get_domain(self, url: str) -> str:
        from urllib.parse import urlparse
        return urlparse(url).netloc
Enter fullscreen mode Exit fullscreen mode

Step 4: Validation with Pydantic

Validate parsed data before loading:

# validator.py
from pydantic import ValidationError
from models import Product
from typing import Generator

class DataValidator:
    def __init__(self):
        self.valid_count = 0
        self.invalid_count = 0
        self.errors = []

    def validate(
        self, raw_data: dict
    ) -> Product | None:
        try:
            product = Product(**raw_data)
            self.valid_count += 1
            return product
        except ValidationError as e:
            self.invalid_count += 1
            self.errors.append({
                "url": raw_data.get("url", "unknown"),
                "errors": e.errors(),
            })
            return None

    def validate_batch(
        self, items: list[dict]
    ) -> Generator[Product, None, None]:
        for item in items:
            product = self.validate(item)
            if product:
                yield product

    @property    def stats(self) -> dict:
        total = self.valid_count + self.invalid_count
        return {
            "valid": self.valid_count,
            "invalid": self.invalid_count,
            "rate": (
                f"{self.valid_count/total:.1%}" 
                if total > 0 else "N/A"
            ),
        }
Enter fullscreen mode Exit fullscreen mode

Step 5: Deduplication

Don't insert the same product twice:

# deduper.py
import hashlib
from typing import Generator

class Deduplicator:
    """Simple set-based deduplication.
    For production, use Redis or a bloom filter."""

    def __init__(self):
        self.seen = set()
        self.dupes_count = 0

    def _make_key(self, product) -> str:
        """Create a unique key for each product."""
        raw = f"{product.source_domain}:{product.title}:{product.price}"
        return hashlib.md5(raw.encode()).hexdigest()

    def is_duplicate(self, product) -> bool:
        key = self._make_key(product)
        if key in self.seen:
            self.dupes_count += 1
            return True
        self.seen.add(key)
        return False

    def filter_batch(
        self, products
    ) -> Generator:
        for product in products:
            if not self.is_duplicate(product):
                yield product
Enter fullscreen mode Exit fullscreen mode

Step 6: Database Loader

# loader.py
import asyncpg
from models import Product

class PostgresLoader:
    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool = None
        self.loaded_count = 0

    async def connect(self):
        self.pool = await asyncpg.create_pool(
            self.dsn, min_size=2, max_size=10
        )
        await self._create_table()

    async def _create_table(self):
        async with self.pool.acquire() as conn:
            await conn.execute("""
                CREATE TABLE IF NOT EXISTS products (
                    id SERIAL PRIMARY KEY,
                    url TEXT UNIQUE,
                    title TEXT NOT NULL,
                    price DECIMAL(10,2),
                    currency VARCHAR(3),
                    in_stock BOOLEAN,
                    description TEXT,
                    image_url TEXT,
                    scraped_at TIMESTAMP,
                    source_domain TEXT,
                    created_at TIMESTAMP DEFAULT NOW(),
                    updated_at TIMESTAMP DEFAULT NOW()
                )
            """)

    async def load(self, product: Product):
        async with self.pool.acquire() as conn:
            await conn.execute("""
                INSERT INTO products 
                    (url, title, price, currency, 
                     in_stock, description, image_url,
                     scraped_at, source_domain)
                VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
                ON CONFLICT (url) DO UPDATE SET
                    price = EXCLUDED.price,
                    in_stock = EXCLUDED.in_stock,
                    scraped_at = EXCLUDED.scraped_at,
                    updated_at = NOW()
            """,
                str(product.url),
                product.title,
                product.price,
                product.currency,
                product.in_stock,
                product.description,
                str(product.image_url) if product.image_url else None,
                product.scraped_at,
                product.source_domain,
            )
            self.loaded_count += 1

    async def load_batch(self, products: list[Product]):
        async with self.pool.acquire() as conn:
            async with conn.transaction():
                for product in products:
                    await self.load(product)

    async def close(self):
        if self.pool:
            await self.pool.close()
Enter fullscreen mode Exit fullscreen mode

Step 7: The Pipeline Orchestrator

Wire everything together:

# pipeline.py
import asyncio
import aiohttp
import time
from fetcher import Fetcher
from parser import ProductParser
from validator import DataValidator
from deduper import Deduplicator
from loader import PostgresLoader

class ScrapingPipeline:
    def __init__(
        self,
        urls: list[str],
        db_dsn: str,
        max_concurrent: int = 20,
    ):
        self.urls = urls
        self.fetcher = Fetcher(
            max_concurrent=max_concurrent,
            captcha_solver=CaptchaSolver(
                api_base="https://www.passxapi.com"
            ),
        )
        self.parser = ProductParser()
        self.validator = DataValidator()
        self.deduper = Deduplicator()
        self.loader = PostgresLoader(db_dsn)

    async def run(self):
        start = time.monotonic()
        print(f"Starting pipeline for {len(self.urls)} URLs")

        # Connect to database
        await self.loader.connect()

        async with aiohttp.ClientSession(
            headers={
                "User-Agent": (
                    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                    "AppleWebKit/537.36"
                )
            }
        ) as session:
            # Process in batches
            batch_size = 50
            for i in range(0, len(self.urls), batch_size):
                batch = self.urls[i:i + batch_size]
                await self._process_batch(session, batch)

                # Progress report
                elapsed = time.monotonic() - start
                print(
                    f"  Batch {i//batch_size + 1}: "
                    f"{self.fetcher.stats} | "
                    f"Valid: {self.validator.stats} | "
                    f"Loaded: {self.loader.loaded_count} | "
                    f"Dupes: {self.deduper.dupes_count} | "
                    f"{elapsed:.0f}s elapsed"
                )

        await self.loader.close()

        elapsed = time.monotonic() - start
        self._print_summary(elapsed)

    async def _process_batch(
        self, session, urls: list[str]
    ):
        # Extract: fetch all URLs concurrently
        fetch_tasks = [
            self.fetcher.fetch(session, url) 
            for url in urls
        ]
        results = await asyncio.gather(*fetch_tasks)

        # Transform: parse + validate + dedupe
        products = []
        for result in results:
            if result.error or not result.html:
                continue

            # Parse
            raw = self.parser.parse(result.html, result.url)
            if not raw:
                continue

            # Validate
            product = self.validator.validate(raw)
            if not product:
                continue

            # Deduplicate
            if not self.deduper.is_duplicate(product):
                products.append(product)

        # Load: batch insert
        if products:
            await self.loader.load_batch(products)

    def _print_summary(self, elapsed: float):
        print(f"\n{'='*50}")
        print(f"Pipeline completed in {elapsed:.0f}s")
        print(f"  Fetched: {self.fetcher.stats}")
        print(f"  Validation: {self.validator.stats}")
        print(f"  Duplicates: {self.deduper.dupes_count}")
        print(f"  Loaded to DB: {self.loader.loaded_count}")
        print(f"  Rate: {len(self.urls)/elapsed:.0f} URLs/s")
        print(f"{'='*50}")


# Run it
async def main():
    urls = load_urls_from_file("urls.txt")

    pipeline = ScrapingPipeline(
        urls=urls,
        db_dsn="postgresql://user:pass@localhost/scraping",        max_concurrent=20,
    )

    await pipeline.run()

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Adding a Dead Letter Queue

Pages that fail should be retried later, not lost:

# dlq.py
import json
from pathlib import Path
from datetime import datetime

class DeadLetterQueue:
    def __init__(self, path: str = "failed_urls.jsonl"):
        self.path = Path(path)
        self.count = 0

    def add(
        self, url: str, 
        error: str, 
        attempt: int = 1
    ):
        entry = {
            "url": url,
            "error": error,
            "attempt": attempt,
            "timestamp": datetime.utcnow().isoformat(),
        }
        with open(self.path, "a") as f:
            f.write(json.dumps(entry) + "\n")
        self.count += 1

    def get_retryable(
        self, max_attempts: int = 3
    ) -> list[str]:
        if not self.path.exists():
            return []

        retryable = []
        with open(self.path) as f:
            for line in f:
                entry = json.loads(line)
                if entry["attempt"] < max_attempts:
                    retryable.append(entry["url"])

        return retryable
Enter fullscreen mode Exit fullscreen mode

Key Takeaways

  1. Model your output first — define Pydantic models before writing parsers
  2. CAPTCHAs are just a pipeline step — detect, solve, continue
  3. Validate everything — bad data is worse than no data
  4. Deduplicate before loading — saves DB writes and storage
  5. Batch operations — DB inserts, HTTP requests, everything works better in batches
  6. Track stats at every stage — know where your data is being lost
  7. Dead letter queue — never lose a URL, retry later

For the CAPTCHA-solving step in your pipeline, check out passxapi-python — it provides async and sync clients that integrate cleanly into ETL workflows.


How do you structure your scraping pipelines? What stages do you find most important? Let me know in the comments.

Top comments (0)