Most scraping tutorials stop at "get the HTML." But in production, getting the HTML is just step one. You need to detect CAPTCHAs, solve them, extract structured data, validate it, handle duplicates, and load it into a database — reliably, at scale.
This is an ETL (Extract, Transform, Load) pipeline. Let's build one.
The Architecture
┌─────────────┐ ┌──────────────┐ ┌────────────┐
│ URL Queue │───▶│ Fetcher │───▶│ CAPTCHA │
│ (Redis) │ │ (aiohttp) │ │ Detector │
└─────────────┘ └──────────────┘ └────────────┘
│
┌──────────────┘
▼
┌──────────────┐ ┌────────────┐
│ CAPTCHA │───▶│ Parser │
│ Solver │ │ (BS4) │
└──────────────┘ └────────────┘
│
┌──────────────┘
▼
┌──────────────┐ ┌────────────┐
│ Validator │───▶│ Deduper │
│ (Pydantic) │ │ (bloom) │
└──────────────┘ └────────────┘
│
┌──────────────┘
▼
┌──────────────┐
│ Loader │
│ (Postgres) │
└──────────────┘
Step 1: Define Your Data Model
Start with what you want to end up with:
# models.py
from pydantic import BaseModel, HttpUrl, field_validator
from datetime import datetime
from decimal import Decimal
class Product(BaseModel):
url: HttpUrl
title: str
price: Decimal
currency: str = "USD"
in_stock: bool
description: str | None = None
image_url: HttpUrl | None = None
scraped_at: datetime
source_domain: str
@field_validator("title") @classmethod def title_not_empty(cls, v):
v = v.strip()
if len(v) < 3:
raise ValueError("Title too short")
return v
@field_validator("price") @classmethod def price_positive(cls, v):
if v <= 0:
raise ValueError("Price must be positive")
return v
class ScrapedPage(BaseModel):
url: str
html: str
status_code: int
captcha_solved: bool = False
captcha_type: str | None = None
fetch_time: float
timestamp: datetime
Step 2: The Fetcher with CAPTCHA Handling
# fetcher.py
import aiohttp
import asyncio
import time
from dataclasses import dataclass, field
@dataclassclass FetchResult:
url: str
html: str = ""
status: int = 0
captcha_type: str | None = None
captcha_solved: bool = False
error: str | None = None
fetch_time: float = 0
class Fetcher:
def __init__(
self,
max_concurrent: int = 20,
captcha_solver = None,
):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.solver = captcha_solver
self.stats = {
"fetched": 0, "captchas": 0, "errors": 0
}
async def fetch(
self,
session: aiohttp.ClientSession,
url: str
) -> FetchResult:
async with self.semaphore:
start = time.monotonic()
try:
async with session.get(
url, timeout=aiohttp.ClientTimeout(total=30)
) as resp:
html = await resp.text()
fetch_time = time.monotonic() - start
# Detect CAPTCHA
captcha = detect_captcha(html, url)
if captcha and self.solver:
token = await self.solver.solve(
captcha_type=captcha["type"],
sitekey=captcha["sitekey"],
url=url
)
if token:
# Resubmit with token
async with session.post(
url,
data={
captcha["field"]: token
}
) as resp2:
html = await resp2.text()
self.stats["captchas"] += 1
self.stats["fetched"] += 1
return FetchResult(
url=url,
html=html,
status=resp.status,
captcha_type=(
captcha["type"] if captcha
else None
),
captcha_solved=bool(captcha),
fetch_time=fetch_time,
)
except Exception as e:
self.stats["errors"] += 1
return FetchResult(
url=url,
error=str(e),
fetch_time=time.monotonic() - start,
)
def detect_captcha(html: str, url: str) -> dict | None:
"""Detect CAPTCHA type and extract sitekey."""
import re
patterns = {
"recaptcha_v2": {
"regex": r'class="g-recaptcha"[^>]*data-sitekey="([^"]+)"',
"field": "g-recaptcha-response",
},
"hcaptcha": {
"regex": r'class="h-captcha"[^>]*data-sitekey="([^"]+)"',
"field": "h-captcha-response",
},
"turnstile": {
"regex": r'class="cf-turnstile"[^>]*data-sitekey="([^"]+)"',
"field": "cf-turnstile-response",
},
}
for ctype, config in patterns.items():
match = re.search(config["regex"], html)
if match:
return {
"type": ctype,
"sitekey": match.group(1),
"field": config["field"],
}
return None
Step 3: The Parser
Extract structured data from raw HTML:
# parser.py
from bs4 import BeautifulSoup
from decimal import Decimal, InvalidOperation
from datetime import datetime
import re
class ProductParser:
"""Parse product data from HTML.
Override for different site structures."""
def parse(self, html: str, url: str) -> dict | None:
soup = BeautifulSoup(html, "lxml")
try:
return {
"url": url,
"title": self._extract_title(soup),
"price": self._extract_price(soup),
"currency": self._extract_currency(soup),
"in_stock": self._extract_stock(soup),
"description": self._extract_description(soup),
"image_url": self._extract_image(soup),
"scraped_at": datetime.utcnow(),
"source_domain": self._get_domain(url),
}
except Exception as e:
print(f"Parse error for {url}: {e}")
return None
def _extract_title(self, soup) -> str:
selectors = [
"h1.product-title",
"h1[itemprop='name']",
".product-name h1",
"h1",
]
for sel in selectors:
el = soup.select_one(sel)
if el and el.text.strip():
return el.text.strip()
raise ValueError("No title found")
def _extract_price(self, soup) -> Decimal:
selectors = [
"[itemprop='price']",
".price-current",
".product-price",
"[data-price]",
]
for sel in selectors:
el = soup.select_one(sel)
if el:
# Try content attribute first
price_str = (
el.get("content")
or el.get("data-price")
or el.text
)
price_str = re.sub(
r'[^\d.]', '', price_str
)
try:
return Decimal(price_str)
except InvalidOperation:
continue
raise ValueError("No price found")
def _extract_stock(self, soup) -> bool:
# Check for "out of stock" indicators
oos_indicators = [
".out-of-stock",
"[data-availability='OutOfStock']",
]
for sel in oos_indicators:
if soup.select_one(sel):
return False
return True
def _extract_description(self, soup) -> str | None:
el = soup.select_one(
"[itemprop='description'], "
".product-description"
)
return el.text.strip()[:500] if el else None
def _extract_image(self, soup) -> str | None:
el = soup.select_one(
"[itemprop='image'], "
".product-image img"
)
return el.get("src") if el else None
def _extract_currency(self, soup) -> str:
el = soup.select_one(
"[itemprop='priceCurrency']"
)
return el.get("content", "USD") if el else "USD"
def _get_domain(self, url: str) -> str:
from urllib.parse import urlparse
return urlparse(url).netloc
Step 4: Validation with Pydantic
Validate parsed data before loading:
# validator.py
from pydantic import ValidationError
from models import Product
from typing import Generator
class DataValidator:
def __init__(self):
self.valid_count = 0
self.invalid_count = 0
self.errors = []
def validate(
self, raw_data: dict
) -> Product | None:
try:
product = Product(**raw_data)
self.valid_count += 1
return product
except ValidationError as e:
self.invalid_count += 1
self.errors.append({
"url": raw_data.get("url", "unknown"),
"errors": e.errors(),
})
return None
def validate_batch(
self, items: list[dict]
) -> Generator[Product, None, None]:
for item in items:
product = self.validate(item)
if product:
yield product
@property def stats(self) -> dict:
total = self.valid_count + self.invalid_count
return {
"valid": self.valid_count,
"invalid": self.invalid_count,
"rate": (
f"{self.valid_count/total:.1%}"
if total > 0 else "N/A"
),
}
Step 5: Deduplication
Don't insert the same product twice:
# deduper.py
import hashlib
from typing import Generator
class Deduplicator:
"""Simple set-based deduplication.
For production, use Redis or a bloom filter."""
def __init__(self):
self.seen = set()
self.dupes_count = 0
def _make_key(self, product) -> str:
"""Create a unique key for each product."""
raw = f"{product.source_domain}:{product.title}:{product.price}"
return hashlib.md5(raw.encode()).hexdigest()
def is_duplicate(self, product) -> bool:
key = self._make_key(product)
if key in self.seen:
self.dupes_count += 1
return True
self.seen.add(key)
return False
def filter_batch(
self, products
) -> Generator:
for product in products:
if not self.is_duplicate(product):
yield product
Step 6: Database Loader
# loader.py
import asyncpg
from models import Product
class PostgresLoader:
def __init__(self, dsn: str):
self.dsn = dsn
self.pool = None
self.loaded_count = 0
async def connect(self):
self.pool = await asyncpg.create_pool(
self.dsn, min_size=2, max_size=10
)
await self._create_table()
async def _create_table(self):
async with self.pool.acquire() as conn:
await conn.execute("""
CREATE TABLE IF NOT EXISTS products (
id SERIAL PRIMARY KEY,
url TEXT UNIQUE,
title TEXT NOT NULL,
price DECIMAL(10,2),
currency VARCHAR(3),
in_stock BOOLEAN,
description TEXT,
image_url TEXT,
scraped_at TIMESTAMP,
source_domain TEXT,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
)
""")
async def load(self, product: Product):
async with self.pool.acquire() as conn:
await conn.execute("""
INSERT INTO products
(url, title, price, currency,
in_stock, description, image_url,
scraped_at, source_domain)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9)
ON CONFLICT (url) DO UPDATE SET
price = EXCLUDED.price,
in_stock = EXCLUDED.in_stock,
scraped_at = EXCLUDED.scraped_at,
updated_at = NOW()
""",
str(product.url),
product.title,
product.price,
product.currency,
product.in_stock,
product.description,
str(product.image_url) if product.image_url else None,
product.scraped_at,
product.source_domain,
)
self.loaded_count += 1
async def load_batch(self, products: list[Product]):
async with self.pool.acquire() as conn:
async with conn.transaction():
for product in products:
await self.load(product)
async def close(self):
if self.pool:
await self.pool.close()
Step 7: The Pipeline Orchestrator
Wire everything together:
# pipeline.py
import asyncio
import aiohttp
import time
from fetcher import Fetcher
from parser import ProductParser
from validator import DataValidator
from deduper import Deduplicator
from loader import PostgresLoader
class ScrapingPipeline:
def __init__(
self,
urls: list[str],
db_dsn: str,
max_concurrent: int = 20,
):
self.urls = urls
self.fetcher = Fetcher(
max_concurrent=max_concurrent,
captcha_solver=CaptchaSolver(
api_base="https://www.passxapi.com"
),
)
self.parser = ProductParser()
self.validator = DataValidator()
self.deduper = Deduplicator()
self.loader = PostgresLoader(db_dsn)
async def run(self):
start = time.monotonic()
print(f"Starting pipeline for {len(self.urls)} URLs")
# Connect to database
await self.loader.connect()
async with aiohttp.ClientSession(
headers={
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36"
)
}
) as session:
# Process in batches
batch_size = 50
for i in range(0, len(self.urls), batch_size):
batch = self.urls[i:i + batch_size]
await self._process_batch(session, batch)
# Progress report
elapsed = time.monotonic() - start
print(
f" Batch {i//batch_size + 1}: "
f"{self.fetcher.stats} | "
f"Valid: {self.validator.stats} | "
f"Loaded: {self.loader.loaded_count} | "
f"Dupes: {self.deduper.dupes_count} | "
f"{elapsed:.0f}s elapsed"
)
await self.loader.close()
elapsed = time.monotonic() - start
self._print_summary(elapsed)
async def _process_batch(
self, session, urls: list[str]
):
# Extract: fetch all URLs concurrently
fetch_tasks = [
self.fetcher.fetch(session, url)
for url in urls
]
results = await asyncio.gather(*fetch_tasks)
# Transform: parse + validate + dedupe
products = []
for result in results:
if result.error or not result.html:
continue
# Parse
raw = self.parser.parse(result.html, result.url)
if not raw:
continue
# Validate
product = self.validator.validate(raw)
if not product:
continue
# Deduplicate
if not self.deduper.is_duplicate(product):
products.append(product)
# Load: batch insert
if products:
await self.loader.load_batch(products)
def _print_summary(self, elapsed: float):
print(f"\n{'='*50}")
print(f"Pipeline completed in {elapsed:.0f}s")
print(f" Fetched: {self.fetcher.stats}")
print(f" Validation: {self.validator.stats}")
print(f" Duplicates: {self.deduper.dupes_count}")
print(f" Loaded to DB: {self.loader.loaded_count}")
print(f" Rate: {len(self.urls)/elapsed:.0f} URLs/s")
print(f"{'='*50}")
# Run it
async def main():
urls = load_urls_from_file("urls.txt")
pipeline = ScrapingPipeline(
urls=urls,
db_dsn="postgresql://user:pass@localhost/scraping", max_concurrent=20,
)
await pipeline.run()
asyncio.run(main())
Adding a Dead Letter Queue
Pages that fail should be retried later, not lost:
# dlq.py
import json
from pathlib import Path
from datetime import datetime
class DeadLetterQueue:
def __init__(self, path: str = "failed_urls.jsonl"):
self.path = Path(path)
self.count = 0
def add(
self, url: str,
error: str,
attempt: int = 1
):
entry = {
"url": url,
"error": error,
"attempt": attempt,
"timestamp": datetime.utcnow().isoformat(),
}
with open(self.path, "a") as f:
f.write(json.dumps(entry) + "\n")
self.count += 1
def get_retryable(
self, max_attempts: int = 3
) -> list[str]:
if not self.path.exists():
return []
retryable = []
with open(self.path) as f:
for line in f:
entry = json.loads(line)
if entry["attempt"] < max_attempts:
retryable.append(entry["url"])
return retryable
Key Takeaways
- Model your output first — define Pydantic models before writing parsers
- CAPTCHAs are just a pipeline step — detect, solve, continue
- Validate everything — bad data is worse than no data
- Deduplicate before loading — saves DB writes and storage
- Batch operations — DB inserts, HTTP requests, everything works better in batches
- Track stats at every stage — know where your data is being lost
- Dead letter queue — never lose a URL, retry later
For the CAPTCHA-solving step in your pipeline, check out passxapi-python — it provides async and sync clients that integrate cleanly into ETL workflows.
How do you structure your scraping pipelines? What stages do you find most important? Let me know in the comments.
Top comments (0)