DEV Community

agenthustler
agenthustler

Posted on

Web Scraping at Scale: Distributed Architecture with Redis Queues

Web Scraping at Scale: Distributed Architecture with Redis Queues

When your scraping project grows beyond a single machine, you need a distributed architecture. Redis queues are the backbone of most production scraping systems — they are fast, reliable, and simple to implement. This guide shows you how to build one.

Why Redis for Scraping?

Redis offers three features that make it perfect for scraping orchestration:

  1. Speed — In-memory operations handle millions of URL dispatches per second
  2. Persistence — Your queue survives restarts
  3. Atomic operations — No duplicate processing, no lost URLs

Architecture Overview

A distributed scraping system has four components:

[URL Generator] → [Redis Queue] → [Workers (N)] → [Results Store]
                       ↑                  |
                       └── retry queue ←───┘
Enter fullscreen mode Exit fullscreen mode

Implementation

Step 1: URL Queue Manager

import redis
import json
from datetime import datetime

class ScrapingQueue:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.queue_key = "scrape:pending"
        self.processing_key = "scrape:processing"
        self.completed_key = "scrape:completed"
        self.failed_key = "scrape:failed"

    def add_urls(self, urls: list[str], priority: int = 0):
        """Add URLs to the queue with optional priority"""
        pipe = self.redis.pipeline()
        for url in urls:
            task = json.dumps({
                "url": url,
                "priority": priority,
                "added_at": datetime.utcnow().isoformat(),
                "retries": 0,
            })
            # Use sorted set for priority queue
            pipe.zadd(self.queue_key, {task: priority})
        pipe.execute()
        print(f"Added {len(urls)} URLs to queue")

    def get_task(self) -> dict | None:
        """Atomically pop highest priority task"""
        # ZPOPMIN gets lowest score (highest priority)
        result = self.redis.zpopmin(self.queue_key, count=1)
        if not result:
            return None
        task_json, score = result[0]
        task = json.loads(task_json)
        # Track in processing set
        self.redis.sadd(self.processing_key, task["url"])
        return task

    def mark_completed(self, url: str, data: dict):
        self.redis.srem(self.processing_key, url)
        self.redis.hset(self.completed_key, url, json.dumps(data))

    def mark_failed(self, url: str, error: str, max_retries: int = 3):
        self.redis.srem(self.processing_key, url)
        retries = self.redis.hincrby(self.failed_key, url, 1)
        if retries <= max_retries:
            # Re-queue with lower priority
            self.add_urls([url], priority=retries * 10)

    def stats(self) -> dict:
        return {
            "pending": self.redis.zcard(self.queue_key),
            "processing": self.redis.scard(self.processing_key),
            "completed": self.redis.hlen(self.completed_key),
            "failed": self.redis.hlen(self.failed_key),
        }
Enter fullscreen mode Exit fullscreen mode

Step 2: Worker Process

import httpx
import signal
import sys
from selectolax.parser import HTMLParser

class ScrapingWorker:
    def __init__(self, worker_id: str, queue: ScrapingQueue):
        self.worker_id = worker_id
        self.queue = queue
        self.running = True
        signal.signal(signal.SIGTERM, self._shutdown)
        signal.signal(signal.SIGINT, self._shutdown)

    def _shutdown(self, signum, frame):
        print(f"Worker {self.worker_id} shutting down gracefully...")
        self.running = False

    def process_url(self, url: str) -> dict:
        """Scrape a single URL and return structured data"""
        response = httpx.get(url, timeout=30, headers={
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
        })
        response.raise_for_status()
        tree = HTMLParser(response.text)
        return {
            "title": tree.css_first("title").text() if tree.css_first("title") else "",
            "body_text": tree.body.text()[:5000] if tree.body else "",
            "links": [a.attributes.get("href", "") for a in tree.css("a[href]")],
            "status": response.status_code,
        }

    def run(self):
        print(f"Worker {self.worker_id} started")
        while self.running:
            task = self.queue.get_task()
            if task is None:
                import time
                time.sleep(1)  # No tasks, wait briefly
                continue

            url = task["url"]
            try:
                data = self.process_url(url)
                self.queue.mark_completed(url, data)
                print(f"[{self.worker_id}] Scraped: {url}")
            except Exception as e:
                self.queue.mark_failed(url, str(e))
                print(f"[{self.worker_id}] Failed: {url} - {e}")
Enter fullscreen mode Exit fullscreen mode

Step 3: Orchestrator

import subprocess
import sys

def launch_workers(num_workers: int = 4):
    """Launch multiple worker processes"""
    queue = ScrapingQueue()

    # Seed the queue
    urls = [f"https://example.com/page/{i}" for i in range(10000)]
    queue.add_urls(urls)

    # Launch workers as subprocesses
    processes = []
    for i in range(num_workers):
        p = subprocess.Popen(
            [sys.executable, "worker.py", f"--id=worker-{i}"],
        )
        processes.append(p)

    print(f"Launched {num_workers} workers")
    return processes
Enter fullscreen mode Exit fullscreen mode

Step 4: Monitoring Dashboard

import time

def monitor(queue: ScrapingQueue, interval: int = 5):
    """Print queue stats every N seconds"""
    prev_completed = 0
    while True:
        stats = queue.stats()
        completed = stats["completed"]
        rate = (completed - prev_completed) / interval
        prev_completed = completed

        print(
            f"Pending: {stats[pending]:,} | "
            f"Processing: {stats[processing]} | "
            f"Completed: {completed:,} | "
            f"Failed: {stats[failed]:,} | "
            f"Rate: {rate:.1f}/sec"
        )
        time.sleep(interval)
Enter fullscreen mode Exit fullscreen mode

Scaling Beyond One Machine

Once you outgrow a single server, the Redis queue architecture scales horizontally. Each worker machine connects to the same Redis instance and pulls tasks independently.

For the proxy layer, ScraperAPI works well with distributed systems — each worker makes independent API calls and the service handles proxy rotation across all of them.

When running workers across multiple regions, residential proxies from ThorData let you geo-target requests from each worker to match the region its proxy pool covers.

To optimize proxy costs across your fleet, ScrapeOps helps benchmark which proxy provider gives the best success rate for each target domain.

Production Considerations

  1. Deduplication: Use a Redis set or Bloom filter to avoid scraping the same URL twice
  2. Rate limiting: Add per-domain delays using Redis sorted sets with timestamps
  3. Dead letter queue: After max retries, move failed URLs to a separate queue for manual review
  4. Backpressure: Stop feeding URLs when the queue exceeds a threshold
  5. Graceful shutdown: Handle SIGTERM so workers finish their current task before exiting
# Per-domain rate limiting
def rate_limited_get(queue: ScrapingQueue, url: str, delay: float = 1.0):
    from urllib.parse import urlparse
    domain = urlparse(url).netloc
    lock_key = f"ratelimit:{domain}"

    while not queue.redis.set(lock_key, 1, nx=True, ex=int(delay)):
        time.sleep(0.1)

    return httpx.get(url, timeout=30)
Enter fullscreen mode Exit fullscreen mode

Key Takeaways

  1. Redis sorted sets make excellent priority queues for scraping
  2. Atomic operations prevent duplicate processing across workers
  3. Retry with backoff handles transient failures gracefully
  4. Horizontal scaling is trivial — just add more workers pointing at the same Redis
  5. Monitor everything — rate, queue depth, failure rate, and worker health

Top comments (0)