DEV Community

agenthustler
agenthustler

Posted on • Edited on

Web Scraping Pipeline: From Development to Production in 2026

Building a scraper is the easy part. Running it reliably in production — with scheduling, monitoring, retries, and data storage — is where most projects fail. This guide covers the full pipeline from development to production-grade scraping infrastructure.

Pipeline Architecture

A production scraping pipeline has five stages:

  1. URL Discovery — Find what to scrape
  2. Fetching — Download pages with proxy rotation
  3. Parsing — Extract structured data
  4. Storage — Save to database or data lake
  5. Monitoring — Track success rates and alerts
┌─────────┐    ┌─────────┐    ┌─────────┐    ┌─────────┐    ┌──────────┐
│  URL    │───▶│ Fetcher │───▶│ Parser  │───▶│ Storage │───▶│ Monitor  │
│ Queue   │    │ +Proxy  │    │         │    │         │    │          │
└─────────┘    └─────────┘    └─────────┘    └─────────┘    └──────────┘
Enter fullscreen mode Exit fullscreen mode

Stage 1: URL Queue

import sqlite3
from datetime import datetime, timedelta
from enum import Enum

class URLStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"
    RETRY = "retry"

class URLQueue:
    def __init__(self, db_path="scraper.db"):
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS url_queue (
                url TEXT PRIMARY KEY,
                status TEXT DEFAULT 'pending',
                retries INTEGER DEFAULT 0,
                max_retries INTEGER DEFAULT 3,
                last_attempt TIMESTAMP,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                data TEXT
            )
        """)
        self.conn.commit()

    def add(self, urls):
        for url in urls:
            try:
                self.conn.execute(
                    "INSERT OR IGNORE INTO url_queue (url) VALUES (?)", (url,)
                )
            except sqlite3.IntegrityError:
                pass
        self.conn.commit()

    def get_batch(self, size=10):
        cursor = self.conn.execute(
            """SELECT url FROM url_queue
            WHERE status IN ('pending', 'retry')
            AND (last_attempt IS NULL OR last_attempt < ?)
            LIMIT ?""",
            (datetime.now() - timedelta(minutes=5), size)
        )
        urls = [row[0] for row in cursor.fetchall()]

        for url in urls:
            self.conn.execute(
                "UPDATE url_queue SET status=?, last_attempt=? WHERE url=?",
                (URLStatus.IN_PROGRESS.value, datetime.now(), url)
            )
        self.conn.commit()
        return urls

    def mark_completed(self, url, data=None):
        self.conn.execute(
            "UPDATE url_queue SET status=?, data=? WHERE url=?",
            (URLStatus.COMPLETED.value, data, url)
        )
        self.conn.commit()

    def mark_failed(self, url):
        self.conn.execute(
            """UPDATE url_queue SET
            status = CASE WHEN retries < max_retries THEN 'retry' ELSE 'failed' END,
            retries = retries + 1
            WHERE url = ?""",
            (url,)
        )
        self.conn.commit()
Enter fullscreen mode Exit fullscreen mode

Stage 2: Fetcher with Retry Logic

# Implementation is proprietary (that IS the moat).
# Skip the build — use our ready-made Apify actor:
# see the CTA below for the link (fpr=yw6md3).
Enter fullscreen mode Exit fullscreen mode

Stage 3: Parser Framework

# Implementation is proprietary (that IS the moat).
# Skip the build — use our ready-made Apify actor:
# see the CTA below for the link (fpr=yw6md3).
Enter fullscreen mode Exit fullscreen mode

Stage 4: Storage Layer

import json
import csv

class Storage:
    def __init__(self, db_path="scraped_data.db"):
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS scraped_items (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                url TEXT,
                data TEXT,
                source TEXT,
                scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        self.conn.commit()

    def save(self, url, data, source="default"):
        self.conn.execute(
            "INSERT INTO scraped_items (url, data, source) VALUES (?, ?, ?)",
            (url, json.dumps(data, ensure_ascii=False), source)
        )
        self.conn.commit()

    def export_csv(self, filename, source=None):
        query = "SELECT * FROM scraped_items"
        if source:
            query += f" WHERE source='{source}'"

        cursor = self.conn.execute(query)
        rows = cursor.fetchall()

        with open(filename, "w", newline="", encoding="utf-8") as f:
            writer = csv.writer(f)
            writer.writerow(["id", "url", "data", "source", "scraped_at"])
            writer.writerows(rows)
Enter fullscreen mode Exit fullscreen mode

Stage 5: Monitoring

from collections import defaultdict

class PipelineMonitor:
    def __init__(self):
        self.stats = defaultdict(int)
        self.errors = []
        self.start_time = datetime.now()

    def record_success(self, url):
        self.stats["success"] += 1

    def record_failure(self, url, error):
        self.stats["failure"] += 1
        self.errors.append({"url": url, "error": str(error), "time": datetime.now().isoformat()})

    def get_report(self):
        total = self.stats["success"] + self.stats["failure"]
        elapsed = (datetime.now() - self.start_time).total_seconds()

        return {
            "total_processed": total,
            "successful": self.stats["success"],
            "failed": self.stats["failure"],
            "success_rate": f"{(self.stats['success'] / max(total, 1)) * 100:.1f}%",
            "elapsed_seconds": round(elapsed, 1),
            "rate_per_minute": round(total / max(elapsed / 60, 0.01), 1),
            "recent_errors": self.errors[-5:],
        }
Enter fullscreen mode Exit fullscreen mode

Putting It All Together

# Implementation is proprietary (that IS the moat).
# Skip the build — use our ready-made Apify actor:
# see the CTA below for the link (fpr=yw6md3).
Enter fullscreen mode Exit fullscreen mode

Scheduling with Cron

# Implementation is proprietary (that IS the moat).
# Skip the build — use our ready-made Apify actor:
# see the CTA below for the link (fpr=yw6md3).
Enter fullscreen mode Exit fullscreen mode

Cloud Deployment

For production scrapers, consider platforms like Apify which provide built-in scheduling, proxy management, storage, and monitoring — eliminating the need to build infrastructure yourself.

Proxy Infrastructure

Production scrapers need reliable proxies. ThorData offers residential proxy pools with automatic rotation, session management, and geographic targeting for large-scale scraping operations.

Conclusion

Building a production scraping pipeline is primarily an infrastructure challenge, not a coding one. The key components — queue management, retry logic, monitoring, and proxy rotation — matter more than the parsing code itself. Start simple, monitor everything, and scale gradually.

Top comments (0)