If you've scraped some data with Scrapy, you might be wondering: "Now what? How do I actually do something with all this data?"
That's where pipelines come in.
Pipelines are where your scraped data goes to get cleaned, validated, transformed, and stored. Think of them as an assembly line for your data—each pipeline does one specific job, and together they turn raw scraped data into something actually useful.
In this guide, I'll show you how to build practical pipelines that solve real problems. We'll cover the basics, sure, but more importantly, we'll dig into the stuff the documentation glosses over—the patterns that actually work in production.
What Are Pipelines, Really?
Here's the simple version: after your spider scrapes an item, that item flows through a series of pipelines. Each pipeline can:
- Transform the item (clean text, format dates, calculate values)
- Validate the item (check required fields, verify data types)
- Enrich the item (add timestamps, fetch related data)
- Store the item (save to database, file, API)
- Drop the item (if it's invalid or a duplicate)
Pipelines run in order, and each one decides whether to pass the item to the next pipeline or stop processing it entirely.
When You Actually Need Pipelines
You DON'T need pipelines if:
- You're just exploring/testing and Scrapy's default output (JSON/CSV) is fine
- Your data is already clean and ready to use
- You're scraping once and done
You DO need pipelines if:
- You need to clean or transform data (dates, prices, text)
- You want to validate items before storing them
- You need to remove duplicates
- You're storing data in a database
- You want to add metadata (scrape timestamps, source URLs)
- You're processing images or files
Your First Pipeline: Adding Timestamps
Let's start with something simple but super useful: adding a timestamp to every scraped item.
The Pipeline
# pipelines.py
from datetime import datetime, timezone
from itemadapter import ItemAdapter
class TimestampPipeline:
def process_item(self, item, spider):
adapter = ItemAdapter(item)
adapter['scraped_at'] = datetime.now(timezone.utc).isoformat()
adapter['spider_name'] = spider.name
return item
Enable It
# settings.py
ITEM_PIPELINES = {
'myproject.pipelines.TimestampPipeline': 100,
}
What's Happening Here
The number 100 is the priority—lower numbers run first. Scrapy processes pipelines from lowest to highest (100 → 200 → 300, etc.).
Why use ItemAdapter? It lets your pipeline work with both dict-based items and Scrapy Item objects. Always use it in production code.
Data Cleaning Pipeline (The Real-World Version)
Documentation shows you how to validate prices. Let me show you how to clean messy real-world data.
The Problem
Real scraped data looks like this:
- Prices:
"$1,234.99","1234.99 USD","Price: $1,234" - Dates:
"Jan 5, 2024","2024-01-05","5th January 2024" - Text: Extra whitespace, HTML entities, weird characters
The Solution
# pipelines.py
import re
from datetime import datetime
from html import unescape
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
class DataCleaningPipeline:
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# Clean price
if adapter.get('price'):
adapter['price'] = self.clean_price(adapter['price'])
# Clean text fields
for field in ['title', 'description', 'author']:
if adapter.get(field):
adapter[field] = self.clean_text(adapter[field])
# Parse dates
if adapter.get('published_date'):
adapter['published_date'] = self.parse_date(adapter['published_date'])
return item
def clean_price(self, price_str):
"""Extract numeric price from messy string"""
if not price_str:
return None
# Remove currency symbols, commas, text
cleaned = re.sub(r'[^\d.]', '', str(price_str))
try:
return float(cleaned)
except ValueError:
return None
def clean_text(self, text):
"""Clean text: strip whitespace, decode HTML entities, remove weird chars"""
if not text:
return ''
# Decode HTML entities (& -> &, etc)
text = unescape(str(text))
# Strip whitespace
text = ' '.join(text.split())
# Remove non-printable characters
text = ''.join(char for char in text if char.isprintable())
return text.strip()
def parse_date(self, date_str):
"""Try multiple date formats"""
if not date_str:
return None
date_formats = [
'%Y-%m-%d', # 2024-01-05
'%d/%m/%Y', # 05/01/2024
'%B %d, %Y', # January 5, 2024
'%b %d, %Y', # Jan 5, 2024
'%d %B %Y', # 5 January 2024
]
for fmt in date_formats:
try:
return datetime.strptime(date_str.strip(), fmt).date().isoformat()
except ValueError:
continue
spider.logger.warning(f'Could not parse date: {date_str}')
return None
Why This Matters
Real-world data is messy. A robust cleaning pipeline saves you hours of manual data fixing later.
Validation Pipeline (Drop Bad Items)
Here's how to filter out items that don't meet your requirements.
# pipelines.py
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
class ValidationPipeline:
def __init__(self):
self.required_fields = ['title', 'url']
self.min_title_length = 5
self.max_price = 10000
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# Check required fields exist
for field in self.required_fields:
if not adapter.get(field):
raise DropItem(f'Missing required field: {field} in {adapter.get("url", "unknown")}')
# Validate title length
if len(adapter['title']) < self.min_title_length:
raise DropItem(f'Title too short: {adapter["title"]}')
# Validate price range
if adapter.get('price'):
if adapter['price'] < 0 or adapter['price'] > self.max_price:
raise DropItem(f'Invalid price: {adapter["price"]}')
# Validate URL format
if not adapter['url'].startswith(('http://', 'https://')):
raise DropItem(f'Invalid URL format: {adapter["url"]}')
return item
Key Points
-
DropItemstops the item from continuing through pipelines - Always log why you're dropping items—debugging later is painful without logs
- Put validation early in your pipeline order (low numbers like 200-300)
Duplicate Detection (The Smart Way)
The docs show a basic duplicate filter. Here's a better version that handles multiple fields and provides detailed logging.
# pipelines.py
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
import hashlib
import json
class SmartDuplicatesPipeline:
def __init__(self):
self.seen_hashes = set()
self.duplicate_count = 0
def open_spider(self, spider):
spider.logger.info('Duplicate detection enabled')
def close_spider(self, spider):
spider.logger.info(
f'Duplicate detection: Found {self.duplicate_count} duplicates, '
f'{len(self.seen_hashes)} unique items processed'
)
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# Create hash based on multiple fields
item_hash = self.get_item_hash(adapter)
if item_hash in self.seen_hashes:
self.duplicate_count += 1
raise DropItem(f'Duplicate item found: {adapter.get("url", "unknown")}')
self.seen_hashes.add(item_hash)
return item
def get_item_hash(self, adapter):
"""Create unique hash based on key fields"""
# Choose fields that make an item unique
unique_fields = {
'title': adapter.get('title', ''),
'url': adapter.get('url', ''),
}
# Create stable hash
hash_str = json.dumps(unique_fields, sort_keys=True)
return hashlib.md5(hash_str.encode()).hexdigest()
What Makes This Better
- Multi-field detection: Uses multiple fields (title + URL) instead of just one
- Detailed logging: Reports exactly how many duplicates were found
- Memory efficient: Only stores hashes, not entire items
- Configurable: Easy to change which fields determine uniqueness
Database Storage (PostgreSQL Example)
Here's a production-ready database pipeline with proper error handling.
# pipelines.py
import psycopg2
from psycopg2 import sql, extras
from itemadapter import ItemAdapter
class PostgresPipeline:
def __init__(self, db_settings):
self.db_settings = db_settings
self.connection = None
self.cursor = None
self.items_buffer = []
self.buffer_size = 100 # Batch insert for performance
@classmethod
def from_crawler(cls, crawler):
# Get database settings from settings.py
db_settings = {
'host': crawler.settings.get('POSTGRES_HOST', 'localhost'),
'port': crawler.settings.get('POSTGRES_PORT', 5432),
'database': crawler.settings.get('POSTGRES_DB'),
'user': crawler.settings.get('POSTGRES_USER'),
'password': crawler.settings.get('POSTGRES_PASSWORD'),
}
# Validate required settings
if not all([db_settings['database'], db_settings['user']]):
raise ValueError('POSTGRES_DB and POSTGRES_USER must be set in settings')
return cls(db_settings)
def open_spider(self, spider):
"""Connect to database when spider starts"""
try:
self.connection = psycopg2.connect(**self.db_settings)
self.cursor = self.connection.cursor()
spider.logger.info(f'Connected to PostgreSQL database: {self.db_settings["database"]}')
# Create table if not exists
self.create_table(spider)
except psycopg2.Error as e:
spider.logger.error(f'Database connection failed: {e}')
raise
def close_spider(self, spider):
"""Flush remaining items and close connection"""
if self.items_buffer:
self.flush_items(spider)
if self.cursor:
self.cursor.close()
if self.connection:
self.connection.close()
spider.logger.info('Database connection closed')
def create_table(self, spider):
"""Create table if it doesn't exist"""
create_query = """
CREATE TABLE IF NOT EXISTS scraped_items (
id SERIAL PRIMARY KEY,
title VARCHAR(500),
url TEXT UNIQUE,
price DECIMAL(10, 2),
description TEXT,
scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
try:
self.cursor.execute(create_query)
self.connection.commit()
spider.logger.info('Table checked/created successfully')
except psycopg2.Error as e:
spider.logger.error(f'Table creation failed: {e}')
self.connection.rollback()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# Add to buffer
self.items_buffer.append({
'title': adapter.get('title'),
'url': adapter.get('url'),
'price': adapter.get('price'),
'description': adapter.get('description'),
})
# Flush buffer if full
if len(self.items_buffer) >= self.buffer_size:
self.flush_items(spider)
return item
def flush_items(self, spider):
"""Batch insert items for better performance"""
if not self.items_buffer:
return
insert_query = """
INSERT INTO scraped_items (title, url, price, description)
VALUES %(values)s
ON CONFLICT (url) DO UPDATE SET
title = EXCLUDED.title,
price = EXCLUDED.price,
description = EXCLUDED.description,
scraped_at = CURRENT_TIMESTAMP
"""
try:
# Batch insert with execute_values
extras.execute_values(
self.cursor,
insert_query,
[tuple(item.values()) for item in self.items_buffer],
template='(%s, %s, %s, %s)',
page_size=self.buffer_size
)
self.connection.commit()
spider.logger.info(f'Inserted/updated {len(self.items_buffer)} items')
self.items_buffer = []
except psycopg2.Error as e:
spider.logger.error(f'Batch insert failed: {e}')
self.connection.rollback()
self.items_buffer = []
Enable It
# settings.py
ITEM_PIPELINES = {
'myproject.pipelines.TimestampPipeline': 100,
'myproject.pipelines.DataCleaningPipeline': 200,
'myproject.pipelines.ValidationPipeline': 300,
'myproject.pipelines.SmartDuplicatesPipeline': 400,
'myproject.pipelines.PostgresPipeline': 800,
}
# Database settings
POSTGRES_HOST = 'localhost'
POSTGRES_PORT = 5432
POSTGRES_DB = 'scrapy_data'
POSTGRES_USER = 'your_user'
POSTGRES_PASSWORD = 'your_password'
What Makes This Production-Ready
- Batch inserts: Much faster than inserting one at a time
- Connection pooling: Proper connection management
- Error handling: Graceful handling of database errors
- Upserts: Updates existing records instead of failing on duplicates
- Logging: Detailed logs for debugging
- Configuration: All settings externalized to settings.py
Conditional Pipelines (Per-Spider Logic)
Sometimes you want different pipelines for different spiders. Here's how:
# pipelines.py
from itemadapter import ItemAdapter
class ConditionalPipeline:
def process_item(self, item, spider):
# Only process items from specific spiders
if spider.name not in ['amazon_spider', 'ebay_spider']:
return item
adapter = ItemAdapter(item)
# Spider-specific logic
if spider.name == 'amazon_spider':
# Amazon-specific processing
if adapter.get('asin'):
adapter['product_id'] = f"AMZN-{adapter['asin']}"
elif spider.name == 'ebay_spider':
# eBay-specific processing
if adapter.get('item_number'):
adapter['product_id'] = f"EBAY-{adapter['item_number']}"
return item
Alternative: Pipeline Settings Per Spider
# spiders/amazon.py
class AmazonSpider(scrapy.Spider):
name = 'amazon'
custom_settings = {
'ITEM_PIPELINES': {
'myproject.pipelines.AmazonPipeline': 300,
'myproject.pipelines.DatabasePipeline': 800,
}
}
Image Download Pipeline (Beyond the Basics)
Scrapy has a built-in ImagesPipeline, but here's what the docs don't explain well:
Custom Image Pipeline with Smart Naming
# pipelines.py
from scrapy.pipelines.images import ImagesPipeline
from scrapy.exceptions import DropItem
from urllib.parse import urlparse
import hashlib
class SmartImagePipeline(ImagesPipeline):
def get_media_requests(self, item, info):
# Download images from image_urls field
for image_url in item.get('image_urls', []):
yield scrapy.Request(
image_url,
meta={'item_title': item.get('title', 'unknown')}
)
def file_path(self, request, response=None, info=None, *, item=None):
"""Generate meaningful filenames"""
# Extract title from meta
title = request.meta.get('item_title', 'unknown')
# Clean title for filename
safe_title = ''.join(c for c in title if c.isalnum() or c in (' ', '-', '_'))
safe_title = safe_title.replace(' ', '_')[:50] # Limit length
# Add hash to avoid collisions
url_hash = hashlib.md5(request.url.encode()).hexdigest()[:8]
# Get file extension
url_path = urlparse(request.url).path
extension = url_path.split('.')[-1] if '.' in url_path else 'jpg'
return f'{safe_title}_{url_hash}.{extension}'
def item_completed(self, results, item, info):
"""Add downloaded image paths to item"""
image_paths = [x['path'] for ok, x in results if ok]
if not image_paths:
raise DropItem('No images downloaded')
item['image_paths'] = image_paths
return item
Enable It
# settings.py
ITEM_PIPELINES = {
'myproject.pipelines.SmartImagePipeline': 1,
}
IMAGES_STORE = './downloaded_images'
IMAGES_EXPIRES = 90 # Days before re-downloading
Async Pipeline (For External API Calls)
Need to enrich items by calling an external API? Use async:
# pipelines.py
import aiohttp
from itemadapter import ItemAdapter
class AsyncAPIEnrichmentPipeline:
def __init__(self):
self.session = None
def open_spider(self, spider):
# Create aiohttp session
self.session = aiohttp.ClientSession()
async def close_spider(self, spider):
# Close session
if self.session:
await self.session.close()
async def process_item(self, item, spider):
adapter = ItemAdapter(item)
# Example: Fetch additional data from API
if adapter.get('isbn'):
book_data = await self.fetch_book_details(adapter['isbn'])
if book_data:
adapter['author'] = book_data.get('author')
adapter['publisher'] = book_data.get('publisher')
return item
async def fetch_book_details(self, isbn):
"""Fetch book details from external API"""
url = f'https://api.example.com/books/{isbn}'
try:
async with self.session.get(url, timeout=5) as response:
if response.status == 200:
return await response.json()
except Exception as e:
# Log but don't fail the item
return None
Common Mistakes to Avoid
1. Not Returning the Item
# WRONG - Item disappears!
def process_item(self, item, spider):
item['processed'] = True
# Missing return!
# RIGHT
def process_item(self, item, spider):
item['processed'] = True
return item
2. Modifying Immutable Items
# WRONG - Some items are read-only
def process_item(self, item, spider):
item['new_field'] = 'value' # Might fail!
# RIGHT - Use ItemAdapter
def process_item(self, item, spider):
adapter = ItemAdapter(item)
adapter['new_field'] = 'value'
return item
3. Not Closing Resources
# WRONG - Database connection leak
class BadPipeline:
def open_spider(self, spider):
self.db = connect_to_db()
# Missing close_spider!
# RIGHT
class GoodPipeline:
def open_spider(self, spider):
self.db = connect_to_db()
def close_spider(self, spider):
self.db.close()
4. Wrong Pipeline Order
# WRONG - Validation happens AFTER storage!
ITEM_PIPELINES = {
'myproject.pipelines.DatabasePipeline': 300,
'myproject.pipelines.ValidationPipeline': 800, # Too late!
}
# RIGHT - Validate before storing
ITEM_PIPELINES = {
'myproject.pipelines.ValidationPipeline': 300,
'myproject.pipelines.DatabasePipeline': 800,
}
Testing Your Pipelines
Here's how to test pipelines without running the full spider:
# test_pipelines.py
from myproject.pipelines import DataCleaningPipeline
from scrapy.http import Response, Request
from scrapy.spiders import Spider
def test_price_cleaning():
pipeline = DataCleaningPipeline()
spider = Spider('test')
# Test item
item = {'price': '$1,234.99'}
# Process it
result = pipeline.process_item(item, spider)
# Check result
assert result['price'] == 1234.99
print('✓ Price cleaning works!')
if __name__ == '__main__':
test_price_cleaning()
Production Pipeline Setup
Here's a complete, production-ready pipeline configuration:
# settings.py
ITEM_PIPELINES = {
# 1. Add metadata (100-199)
'myproject.pipelines.TimestampPipeline': 100,
# 2. Clean data (200-299)
'myproject.pipelines.DataCleaningPipeline': 200,
# 3. Validate (300-399)
'myproject.pipelines.ValidationPipeline': 300,
# 4. Deduplicate (400-499)
'myproject.pipelines.SmartDuplicatesPipeline': 400,
# 5. Enrich (500-699)
'myproject.pipelines.AsyncAPIEnrichmentPipeline': 500,
# 6. Store (800-899)
'myproject.pipelines.PostgresPipeline': 800,
# 7. Images (special - run first)
'myproject.pipelines.SmartImagePipeline': 1,
}
# Performance settings
CONCURRENT_ITEMS = 100 # Process items in parallel
Final Thoughts
Pipelines are where good scrapers become great scrapers. They're what separate a quick script from a maintainable, production-ready system.
Key takeaways:
- Order matters: Clean → Validate → Deduplicate → Store
- Always use ItemAdapter: Future-proofs your code
- Batch operations: Much faster for databases
- Proper error handling: Log errors, don't crash
- Close resources: Use open_spider/close_spider
- Test your pipelines: Don't wait for production to find bugs
Start simple, add complexity as you need it. A good pipeline setup is like good infrastructure—you barely notice it when it's working right.
Next steps: Try building a pipeline that:
- Sends items to an API endpoint
- Generates thumbnails from images
- Implements a circuit breaker for external services
Drop a comment if you want to see any of these in detail!
Top comments (0)