Building a scraper is the easy part. Running it reliably in production — with scheduling, monitoring, retries, and data storage — is where most projects fail. This guide covers the full pipeline from development to production-grade scraping infrastructure.
Pipeline Architecture
A production scraping pipeline has five stages:
- URL Discovery — Find what to scrape
- Fetching — Download pages with proxy rotation
- Parsing — Extract structured data
- Storage — Save to database or data lake
- Monitoring — Track success rates and alerts
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌──────────┐
│ URL │───▶│ Fetcher │───▶│ Parser │───▶│ Storage │───▶│ Monitor │
│ Queue │ │ +Proxy │ │ │ │ │ │ │
└─────────┘ └─────────┘ └─────────┘ └─────────┘ └──────────┘
Stage 1: URL Queue
import sqlite3
from datetime import datetime, timedelta
from enum import Enum
class URLStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
RETRY = "retry"
class URLQueue:
def __init__(self, db_path="scraper.db"):
self.conn = sqlite3.connect(db_path)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS url_queue (
url TEXT PRIMARY KEY,
status TEXT DEFAULT 'pending',
retries INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3,
last_attempt TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
data TEXT
)
""")
self.conn.commit()
def add(self, urls):
for url in urls:
try:
self.conn.execute(
"INSERT OR IGNORE INTO url_queue (url) VALUES (?)", (url,)
)
except sqlite3.IntegrityError:
pass
self.conn.commit()
def get_batch(self, size=10):
cursor = self.conn.execute(
"""SELECT url FROM url_queue
WHERE status IN ('pending', 'retry')
AND (last_attempt IS NULL OR last_attempt < ?)
LIMIT ?""",
(datetime.now() - timedelta(minutes=5), size)
)
urls = [row[0] for row in cursor.fetchall()]
for url in urls:
self.conn.execute(
"UPDATE url_queue SET status=?, last_attempt=? WHERE url=?",
(URLStatus.IN_PROGRESS.value, datetime.now(), url)
)
self.conn.commit()
return urls
def mark_completed(self, url, data=None):
self.conn.execute(
"UPDATE url_queue SET status=?, data=? WHERE url=?",
(URLStatus.COMPLETED.value, data, url)
)
self.conn.commit()
def mark_failed(self, url):
self.conn.execute(
"""UPDATE url_queue SET
status = CASE WHEN retries < max_retries THEN 'retry' ELSE 'failed' END,
retries = retries + 1
WHERE url = ?""",
(url,)
)
self.conn.commit()
Stage 2: Fetcher with Retry Logic
# Implementation is proprietary (that IS the moat).
# Skip the build — use our ready-made Apify actor:
# see the CTA below for the link (fpr=yw6md3).
Stage 3: Parser Framework
# Implementation is proprietary (that IS the moat).
# Skip the build — use our ready-made Apify actor:
# see the CTA below for the link (fpr=yw6md3).
Stage 4: Storage Layer
import json
import csv
class Storage:
def __init__(self, db_path="scraped_data.db"):
self.conn = sqlite3.connect(db_path)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS scraped_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT,
data TEXT,
source TEXT,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
self.conn.commit()
def save(self, url, data, source="default"):
self.conn.execute(
"INSERT INTO scraped_items (url, data, source) VALUES (?, ?, ?)",
(url, json.dumps(data, ensure_ascii=False), source)
)
self.conn.commit()
def export_csv(self, filename, source=None):
query = "SELECT * FROM scraped_items"
if source:
query += f" WHERE source='{source}'"
cursor = self.conn.execute(query)
rows = cursor.fetchall()
with open(filename, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["id", "url", "data", "source", "scraped_at"])
writer.writerows(rows)
Stage 5: Monitoring
from collections import defaultdict
class PipelineMonitor:
def __init__(self):
self.stats = defaultdict(int)
self.errors = []
self.start_time = datetime.now()
def record_success(self, url):
self.stats["success"] += 1
def record_failure(self, url, error):
self.stats["failure"] += 1
self.errors.append({"url": url, "error": str(error), "time": datetime.now().isoformat()})
def get_report(self):
total = self.stats["success"] + self.stats["failure"]
elapsed = (datetime.now() - self.start_time).total_seconds()
return {
"total_processed": total,
"successful": self.stats["success"],
"failed": self.stats["failure"],
"success_rate": f"{(self.stats['success'] / max(total, 1)) * 100:.1f}%",
"elapsed_seconds": round(elapsed, 1),
"rate_per_minute": round(total / max(elapsed / 60, 0.01), 1),
"recent_errors": self.errors[-5:],
}
Putting It All Together
# Implementation is proprietary (that IS the moat).
# Skip the build — use our ready-made Apify actor:
# see the CTA below for the link (fpr=yw6md3).
Scheduling with Cron
# Implementation is proprietary (that IS the moat).
# Skip the build — use our ready-made Apify actor:
# see the CTA below for the link (fpr=yw6md3).
Cloud Deployment
For production scrapers, consider platforms like Apify which provide built-in scheduling, proxy management, storage, and monitoring — eliminating the need to build infrastructure yourself.
Proxy Infrastructure
Production scrapers need reliable proxies. ThorData offers residential proxy pools with automatic rotation, session management, and geographic targeting for large-scale scraping operations.
Conclusion
Building a production scraping pipeline is primarily an infrastructure challenge, not a coding one. The key components — queue management, retry logic, monitoring, and proxy rotation — matter more than the parsing code itself. Start simple, monitor everything, and scale gradually.
Top comments (0)