Web Scraping at Scale: Distributed Architecture with Redis Queues
When your scraping project grows beyond a single machine, you need a distributed architecture. Redis queues are the backbone of most production scraping systems — they are fast, reliable, and simple to implement. This guide shows you how to build one.
Why Redis for Scraping?
Redis offers three features that make it perfect for scraping orchestration:
- Speed — In-memory operations handle millions of URL dispatches per second
- Persistence — Your queue survives restarts
- Atomic operations — No duplicate processing, no lost URLs
Architecture Overview
A distributed scraping system has four components:
[URL Generator] → [Redis Queue] → [Workers (N)] → [Results Store]
↑ |
└── retry queue ←───┘
Implementation
Step 1: URL Queue Manager
import redis
import json
from datetime import datetime
class ScrapingQueue:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.queue_key = "scrape:pending"
self.processing_key = "scrape:processing"
self.completed_key = "scrape:completed"
self.failed_key = "scrape:failed"
def add_urls(self, urls: list[str], priority: int = 0):
"""Add URLs to the queue with optional priority"""
pipe = self.redis.pipeline()
for url in urls:
task = json.dumps({
"url": url,
"priority": priority,
"added_at": datetime.utcnow().isoformat(),
"retries": 0,
})
# Use sorted set for priority queue
pipe.zadd(self.queue_key, {task: priority})
pipe.execute()
print(f"Added {len(urls)} URLs to queue")
def get_task(self) -> dict | None:
"""Atomically pop highest priority task"""
# ZPOPMIN gets lowest score (highest priority)
result = self.redis.zpopmin(self.queue_key, count=1)
if not result:
return None
task_json, score = result[0]
task = json.loads(task_json)
# Track in processing set
self.redis.sadd(self.processing_key, task["url"])
return task
def mark_completed(self, url: str, data: dict):
self.redis.srem(self.processing_key, url)
self.redis.hset(self.completed_key, url, json.dumps(data))
def mark_failed(self, url: str, error: str, max_retries: int = 3):
self.redis.srem(self.processing_key, url)
retries = self.redis.hincrby(self.failed_key, url, 1)
if retries <= max_retries:
# Re-queue with lower priority
self.add_urls([url], priority=retries * 10)
def stats(self) -> dict:
return {
"pending": self.redis.zcard(self.queue_key),
"processing": self.redis.scard(self.processing_key),
"completed": self.redis.hlen(self.completed_key),
"failed": self.redis.hlen(self.failed_key),
}
Step 2: Worker Process
import httpx
import signal
import sys
from selectolax.parser import HTMLParser
class ScrapingWorker:
def __init__(self, worker_id: str, queue: ScrapingQueue):
self.worker_id = worker_id
self.queue = queue
self.running = True
signal.signal(signal.SIGTERM, self._shutdown)
signal.signal(signal.SIGINT, self._shutdown)
def _shutdown(self, signum, frame):
print(f"Worker {self.worker_id} shutting down gracefully...")
self.running = False
def process_url(self, url: str) -> dict:
"""Scrape a single URL and return structured data"""
response = httpx.get(url, timeout=30, headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
})
response.raise_for_status()
tree = HTMLParser(response.text)
return {
"title": tree.css_first("title").text() if tree.css_first("title") else "",
"body_text": tree.body.text()[:5000] if tree.body else "",
"links": [a.attributes.get("href", "") for a in tree.css("a[href]")],
"status": response.status_code,
}
def run(self):
print(f"Worker {self.worker_id} started")
while self.running:
task = self.queue.get_task()
if task is None:
import time
time.sleep(1) # No tasks, wait briefly
continue
url = task["url"]
try:
data = self.process_url(url)
self.queue.mark_completed(url, data)
print(f"[{self.worker_id}] Scraped: {url}")
except Exception as e:
self.queue.mark_failed(url, str(e))
print(f"[{self.worker_id}] Failed: {url} - {e}")
Step 3: Orchestrator
import subprocess
import sys
def launch_workers(num_workers: int = 4):
"""Launch multiple worker processes"""
queue = ScrapingQueue()
# Seed the queue
urls = [f"https://example.com/page/{i}" for i in range(10000)]
queue.add_urls(urls)
# Launch workers as subprocesses
processes = []
for i in range(num_workers):
p = subprocess.Popen(
[sys.executable, "worker.py", f"--id=worker-{i}"],
)
processes.append(p)
print(f"Launched {num_workers} workers")
return processes
Step 4: Monitoring Dashboard
import time
def monitor(queue: ScrapingQueue, interval: int = 5):
"""Print queue stats every N seconds"""
prev_completed = 0
while True:
stats = queue.stats()
completed = stats["completed"]
rate = (completed - prev_completed) / interval
prev_completed = completed
print(
f"Pending: {stats[pending]:,} | "
f"Processing: {stats[processing]} | "
f"Completed: {completed:,} | "
f"Failed: {stats[failed]:,} | "
f"Rate: {rate:.1f}/sec"
)
time.sleep(interval)
Scaling Beyond One Machine
Once you outgrow a single server, the Redis queue architecture scales horizontally. Each worker machine connects to the same Redis instance and pulls tasks independently.
For the proxy layer, ScraperAPI works well with distributed systems — each worker makes independent API calls and the service handles proxy rotation across all of them.
When running workers across multiple regions, residential proxies from ThorData let you geo-target requests from each worker to match the region its proxy pool covers.
To optimize proxy costs across your fleet, ScrapeOps helps benchmark which proxy provider gives the best success rate for each target domain.
Production Considerations
- Deduplication: Use a Redis set or Bloom filter to avoid scraping the same URL twice
- Rate limiting: Add per-domain delays using Redis sorted sets with timestamps
- Dead letter queue: After max retries, move failed URLs to a separate queue for manual review
- Backpressure: Stop feeding URLs when the queue exceeds a threshold
- Graceful shutdown: Handle SIGTERM so workers finish their current task before exiting
# Per-domain rate limiting
def rate_limited_get(queue: ScrapingQueue, url: str, delay: float = 1.0):
from urllib.parse import urlparse
domain = urlparse(url).netloc
lock_key = f"ratelimit:{domain}"
while not queue.redis.set(lock_key, 1, nx=True, ex=int(delay)):
time.sleep(0.1)
return httpx.get(url, timeout=30)
Key Takeaways
- Redis sorted sets make excellent priority queues for scraping
- Atomic operations prevent duplicate processing across workers
- Retry with backoff handles transient failures gracefully
- Horizontal scaling is trivial — just add more workers pointing at the same Redis
- Monitor everything — rate, queue depth, failure rate, and worker health
Top comments (0)