Building a scraper is the easy part. Running it reliably in production — with scheduling, monitoring, retries, and data storage — is where most projects fail. This guide covers the full pipeline from development to production-grade scraping infrastructure.
Pipeline Architecture
A production scraping pipeline has five stages:
- URL Discovery — Find what to scrape
- Fetching — Download pages with proxy rotation
- Parsing — Extract structured data
- Storage — Save to database or data lake
- Monitoring — Track success rates and alerts
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌──────────┐
│ URL │───▶│ Fetcher │───▶│ Parser │───▶│ Storage │───▶│ Monitor │
│ Queue │ │ +Proxy │ │ │ │ │ │ │
└─────────┘ └─────────┘ └─────────┘ └─────────┘ └──────────┘
Stage 1: URL Queue
import sqlite3
from datetime import datetime, timedelta
from enum import Enum
class URLStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
RETRY = "retry"
class URLQueue:
def __init__(self, db_path="scraper.db"):
self.conn = sqlite3.connect(db_path)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS url_queue (
url TEXT PRIMARY KEY,
status TEXT DEFAULT 'pending',
retries INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3,
last_attempt TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
data TEXT
)
""")
self.conn.commit()
def add(self, urls):
for url in urls:
try:
self.conn.execute(
"INSERT OR IGNORE INTO url_queue (url) VALUES (?)", (url,)
)
except sqlite3.IntegrityError:
pass
self.conn.commit()
def get_batch(self, size=10):
cursor = self.conn.execute(
"""SELECT url FROM url_queue
WHERE status IN ('pending', 'retry')
AND (last_attempt IS NULL OR last_attempt < ?)
LIMIT ?""",
(datetime.now() - timedelta(minutes=5), size)
)
urls = [row[0] for row in cursor.fetchall()]
for url in urls:
self.conn.execute(
"UPDATE url_queue SET status=?, last_attempt=? WHERE url=?",
(URLStatus.IN_PROGRESS.value, datetime.now(), url)
)
self.conn.commit()
return urls
def mark_completed(self, url, data=None):
self.conn.execute(
"UPDATE url_queue SET status=?, data=? WHERE url=?",
(URLStatus.COMPLETED.value, data, url)
)
self.conn.commit()
def mark_failed(self, url):
self.conn.execute(
"""UPDATE url_queue SET
status = CASE WHEN retries < max_retries THEN 'retry' ELSE 'failed' END,
retries = retries + 1
WHERE url = ?""",
(url,)
)
self.conn.commit()
Stage 2: Fetcher with Retry Logic
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import random
import time
class Fetcher:
def __init__(self, proxies=None, max_retries=3):
self.session = requests.Session()
retry_strategy = Retry(
total=max_retries,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
self.proxies = proxies or []
self.user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
]
def fetch(self, url, timeout=30):
headers = {
"User-Agent": random.choice(self.user_agents),
"Accept": "text/html,application/xhtml+xml",
"Accept-Language": "en-US,en;q=0.9",
}
proxy = None
if self.proxies:
proxy_url = random.choice(self.proxies)
proxy = {"http": proxy_url, "https": proxy_url}
response = self.session.get(
url, headers=headers, proxies=proxy, timeout=timeout
)
response.raise_for_status()
return response
def fetch_with_delay(self, url, min_delay=1, max_delay=5):
time.sleep(random.uniform(min_delay, max_delay))
return self.fetch(url)
Stage 3: Parser Framework
from bs4 import BeautifulSoup
from abc import ABC, abstractmethod
class BaseParser(ABC):
@abstractmethod
def parse(self, html, url):
pass
def clean_text(self, text):
if text:
return " ".join(text.split()).strip()
return ""
class ProductParser(BaseParser):
def parse(self, html, url):
soup = BeautifulSoup(html, "html.parser")
products = []
for card in soup.select(".product-card"):
products.append({
"name": self.clean_text(card.select_one(".title").get_text()),
"price": self.clean_text(card.select_one(".price").get_text()),
"url": url,
"scraped_at": datetime.now().isoformat(),
})
return products
class ArticleParser(BaseParser):
def parse(self, html, url):
soup = BeautifulSoup(html, "html.parser")
title = soup.select_one("h1")
content = soup.select_one("article")
return {
"title": self.clean_text(title.get_text()) if title else None,
"content": self.clean_text(content.get_text()) if content else None,
"url": url,
"scraped_at": datetime.now().isoformat(),
}
Stage 4: Storage Layer
import json
import csv
class Storage:
def __init__(self, db_path="scraped_data.db"):
self.conn = sqlite3.connect(db_path)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS scraped_items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT,
data TEXT,
source TEXT,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
self.conn.commit()
def save(self, url, data, source="default"):
self.conn.execute(
"INSERT INTO scraped_items (url, data, source) VALUES (?, ?, ?)",
(url, json.dumps(data, ensure_ascii=False), source)
)
self.conn.commit()
def export_csv(self, filename, source=None):
query = "SELECT * FROM scraped_items"
if source:
query += f" WHERE source='{source}'"
cursor = self.conn.execute(query)
rows = cursor.fetchall()
with open(filename, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["id", "url", "data", "source", "scraped_at"])
writer.writerows(rows)
Stage 5: Monitoring
from collections import defaultdict
class PipelineMonitor:
def __init__(self):
self.stats = defaultdict(int)
self.errors = []
self.start_time = datetime.now()
def record_success(self, url):
self.stats["success"] += 1
def record_failure(self, url, error):
self.stats["failure"] += 1
self.errors.append({"url": url, "error": str(error), "time": datetime.now().isoformat()})
def get_report(self):
total = self.stats["success"] + self.stats["failure"]
elapsed = (datetime.now() - self.start_time).total_seconds()
return {
"total_processed": total,
"successful": self.stats["success"],
"failed": self.stats["failure"],
"success_rate": f"{(self.stats['success'] / max(total, 1)) * 100:.1f}%",
"elapsed_seconds": round(elapsed, 1),
"rate_per_minute": round(total / max(elapsed / 60, 0.01), 1),
"recent_errors": self.errors[-5:],
}
Putting It All Together
class ScrapingPipeline:
def __init__(self, parser, storage=None, proxies=None):
self.queue = URLQueue()
self.fetcher = Fetcher(proxies=proxies)
self.parser = parser
self.storage = storage or Storage()
self.monitor = PipelineMonitor()
def run(self, batch_size=10, max_batches=100):
for batch_num in range(max_batches):
urls = self.queue.get_batch(batch_size)
if not urls:
print("Queue empty. Pipeline complete.")
break
print(f"Batch {batch_num + 1}: processing {len(urls)} URLs")
for url in urls:
try:
response = self.fetcher.fetch_with_delay(url)
data = self.parser.parse(response.text, url)
self.storage.save(url, data)
self.queue.mark_completed(url)
self.monitor.record_success(url)
except Exception as e:
self.queue.mark_failed(url)
self.monitor.record_failure(url, e)
report = self.monitor.get_report()
print(f" Success rate: {report['success_rate']} | "
f"Rate: {report['rate_per_minute']}/min")
return self.monitor.get_report()
Scheduling with Cron
# schedule_scraper.py
import schedule
import time
def run_daily_scrape():
pipeline = ScrapingPipeline(
parser=ProductParser(),
proxies=["http://proxy1:8080", "http://proxy2:8080"]
)
pipeline.queue.add(discover_urls())
report = pipeline.run()
print(json.dumps(report, indent=2))
schedule.every().day.at("02:00").do(run_daily_scrape)
while True:
schedule.run_pending()
time.sleep(60)
Cloud Deployment
For production scrapers, consider platforms like Apify which provide built-in scheduling, proxy management, storage, and monitoring — eliminating the need to build infrastructure yourself.
Proxy Infrastructure
Production scrapers need reliable proxies. ThorData offers residential proxy pools with automatic rotation, session management, and geographic targeting for large-scale scraping operations.
Conclusion
Building a production scraping pipeline is primarily an infrastructure challenge, not a coding one. The key components — queue management, retry logic, monitoring, and proxy rotation — matter more than the parsing code itself. Start simple, monitor everything, and scale gradually.
Top comments (0)