DEV Community

agenthustler
agenthustler

Posted on

Web Scraping Pipeline: From Development to Production in 2026

Building a scraper is the easy part. Running it reliably in production — with scheduling, monitoring, retries, and data storage — is where most projects fail. This guide covers the full pipeline from development to production-grade scraping infrastructure.

Pipeline Architecture

A production scraping pipeline has five stages:

  1. URL Discovery — Find what to scrape
  2. Fetching — Download pages with proxy rotation
  3. Parsing — Extract structured data
  4. Storage — Save to database or data lake
  5. Monitoring — Track success rates and alerts
┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌──────────┐
│  URL    │───▶│ Fetcher │───▶│ Parser  │───▶│ Storage │───▶│ Monitor  │
│ Queue   │    │ +Proxy  │    │         │    │         │    │          │
└─────────┘    └─────────┘    └─────────┘    └─────────┘    └──────────┘
Enter fullscreen mode Exit fullscreen mode

Stage 1: URL Queue

import sqlite3
from datetime import datetime, timedelta
from enum import Enum

class URLStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"
    RETRY = "retry"

class URLQueue:
    def __init__(self, db_path="scraper.db"):
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS url_queue (
                url TEXT PRIMARY KEY,
                status TEXT DEFAULT 'pending',
                retries INTEGER DEFAULT 0,
                max_retries INTEGER DEFAULT 3,
                last_attempt TIMESTAMP,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                data TEXT
            )
        """)
        self.conn.commit()

    def add(self, urls):
        for url in urls:
            try:
                self.conn.execute(
                    "INSERT OR IGNORE INTO url_queue (url) VALUES (?)", (url,)
                )
            except sqlite3.IntegrityError:
                pass
        self.conn.commit()

    def get_batch(self, size=10):
        cursor = self.conn.execute(
            """SELECT url FROM url_queue
            WHERE status IN ('pending', 'retry')
            AND (last_attempt IS NULL OR last_attempt < ?)
            LIMIT ?""",
            (datetime.now() - timedelta(minutes=5), size)
        )
        urls = [row[0] for row in cursor.fetchall()]

        for url in urls:
            self.conn.execute(
                "UPDATE url_queue SET status=?, last_attempt=? WHERE url=?",
                (URLStatus.IN_PROGRESS.value, datetime.now(), url)
            )
        self.conn.commit()
        return urls

    def mark_completed(self, url, data=None):
        self.conn.execute(
            "UPDATE url_queue SET status=?, data=? WHERE url=?",
            (URLStatus.COMPLETED.value, data, url)
        )
        self.conn.commit()

    def mark_failed(self, url):
        self.conn.execute(
            """UPDATE url_queue SET
            status = CASE WHEN retries < max_retries THEN 'retry' ELSE 'failed' END,
            retries = retries + 1
            WHERE url = ?""",
            (url,)
        )
        self.conn.commit()
Enter fullscreen mode Exit fullscreen mode

Stage 2: Fetcher with Retry Logic

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import random
import time

class Fetcher:
    def __init__(self, proxies=None, max_retries=3):
        self.session = requests.Session()

        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504],
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("http://", adapter)
        self.session.mount("https://", adapter)

        self.proxies = proxies or []
        self.user_agents = [
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
            "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
        ]

    def fetch(self, url, timeout=30):
        headers = {
            "User-Agent": random.choice(self.user_agents),
            "Accept": "text/html,application/xhtml+xml",
            "Accept-Language": "en-US,en;q=0.9",
        }

        proxy = None
        if self.proxies:
            proxy_url = random.choice(self.proxies)
            proxy = {"http": proxy_url, "https": proxy_url}

        response = self.session.get(
            url, headers=headers, proxies=proxy, timeout=timeout
        )
        response.raise_for_status()
        return response

    def fetch_with_delay(self, url, min_delay=1, max_delay=5):
        time.sleep(random.uniform(min_delay, max_delay))
        return self.fetch(url)
Enter fullscreen mode Exit fullscreen mode

Stage 3: Parser Framework

from bs4 import BeautifulSoup
from abc import ABC, abstractmethod

class BaseParser(ABC):
    @abstractmethod
    def parse(self, html, url):
        pass

    def clean_text(self, text):
        if text:
            return " ".join(text.split()).strip()
        return ""

class ProductParser(BaseParser):
    def parse(self, html, url):
        soup = BeautifulSoup(html, "html.parser")

        products = []
        for card in soup.select(".product-card"):
            products.append({
                "name": self.clean_text(card.select_one(".title").get_text()),
                "price": self.clean_text(card.select_one(".price").get_text()),
                "url": url,
                "scraped_at": datetime.now().isoformat(),
            })

        return products

class ArticleParser(BaseParser):
    def parse(self, html, url):
        soup = BeautifulSoup(html, "html.parser")

        title = soup.select_one("h1")
        content = soup.select_one("article")

        return {
            "title": self.clean_text(title.get_text()) if title else None,
            "content": self.clean_text(content.get_text()) if content else None,
            "url": url,
            "scraped_at": datetime.now().isoformat(),
        }
Enter fullscreen mode Exit fullscreen mode

Stage 4: Storage Layer

import json
import csv

class Storage:
    def __init__(self, db_path="scraped_data.db"):
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS scraped_items (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                url TEXT,
                data TEXT,
                source TEXT,
                scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        self.conn.commit()

    def save(self, url, data, source="default"):
        self.conn.execute(
            "INSERT INTO scraped_items (url, data, source) VALUES (?, ?, ?)",
            (url, json.dumps(data, ensure_ascii=False), source)
        )
        self.conn.commit()

    def export_csv(self, filename, source=None):
        query = "SELECT * FROM scraped_items"
        if source:
            query += f" WHERE source='{source}'"

        cursor = self.conn.execute(query)
        rows = cursor.fetchall()

        with open(filename, "w", newline="", encoding="utf-8") as f:
            writer = csv.writer(f)
            writer.writerow(["id", "url", "data", "source", "scraped_at"])
            writer.writerows(rows)
Enter fullscreen mode Exit fullscreen mode

Stage 5: Monitoring

from collections import defaultdict

class PipelineMonitor:
    def __init__(self):
        self.stats = defaultdict(int)
        self.errors = []
        self.start_time = datetime.now()

    def record_success(self, url):
        self.stats["success"] += 1

    def record_failure(self, url, error):
        self.stats["failure"] += 1
        self.errors.append({"url": url, "error": str(error), "time": datetime.now().isoformat()})

    def get_report(self):
        total = self.stats["success"] + self.stats["failure"]
        elapsed = (datetime.now() - self.start_time).total_seconds()

        return {
            "total_processed": total,
            "successful": self.stats["success"],
            "failed": self.stats["failure"],
            "success_rate": f"{(self.stats['success'] / max(total, 1)) * 100:.1f}%",
            "elapsed_seconds": round(elapsed, 1),
            "rate_per_minute": round(total / max(elapsed / 60, 0.01), 1),
            "recent_errors": self.errors[-5:],
        }
Enter fullscreen mode Exit fullscreen mode

Putting It All Together

class ScrapingPipeline:
    def __init__(self, parser, storage=None, proxies=None):
        self.queue = URLQueue()
        self.fetcher = Fetcher(proxies=proxies)
        self.parser = parser
        self.storage = storage or Storage()
        self.monitor = PipelineMonitor()

    def run(self, batch_size=10, max_batches=100):
        for batch_num in range(max_batches):
            urls = self.queue.get_batch(batch_size)
            if not urls:
                print("Queue empty. Pipeline complete.")
                break

            print(f"Batch {batch_num + 1}: processing {len(urls)} URLs")

            for url in urls:
                try:
                    response = self.fetcher.fetch_with_delay(url)
                    data = self.parser.parse(response.text, url)
                    self.storage.save(url, data)
                    self.queue.mark_completed(url)
                    self.monitor.record_success(url)
                except Exception as e:
                    self.queue.mark_failed(url)
                    self.monitor.record_failure(url, e)

            report = self.monitor.get_report()
            print(f"  Success rate: {report['success_rate']} | "
                  f"Rate: {report['rate_per_minute']}/min")

        return self.monitor.get_report()
Enter fullscreen mode Exit fullscreen mode

Scheduling with Cron

# schedule_scraper.py
import schedule
import time

def run_daily_scrape():
    pipeline = ScrapingPipeline(
        parser=ProductParser(),
        proxies=["http://proxy1:8080", "http://proxy2:8080"]
    )
    pipeline.queue.add(discover_urls())
    report = pipeline.run()
    print(json.dumps(report, indent=2))

schedule.every().day.at("02:00").do(run_daily_scrape)

while True:
    schedule.run_pending()
    time.sleep(60)
Enter fullscreen mode Exit fullscreen mode

Cloud Deployment

For production scrapers, consider platforms like Apify which provide built-in scheduling, proxy management, storage, and monitoring — eliminating the need to build infrastructure yourself.

Proxy Infrastructure

Production scrapers need reliable proxies. ThorData offers residential proxy pools with automatic rotation, session management, and geographic targeting for large-scale scraping operations.

Conclusion

Building a production scraping pipeline is primarily an infrastructure challenge, not a coding one. The key components — queue management, retry logic, monitoring, and proxy rotation — matter more than the parsing code itself. Start simple, monitor everything, and scale gradually.

Top comments (0)