DEV Community

Cover image for Scrapy Pipelines: The Complete Practical Guide (What the Docs Don't Tell You)
Muhammad Ikramullah Khan
Muhammad Ikramullah Khan

Posted on

Scrapy Pipelines: The Complete Practical Guide (What the Docs Don't Tell You)

If you've scraped some data with Scrapy, you might be wondering: "Now what? How do I actually do something with all this data?"

That's where pipelines come in.

Pipelines are where your scraped data goes to get cleaned, validated, transformed, and stored. Think of them as an assembly line for your data—each pipeline does one specific job, and together they turn raw scraped data into something actually useful.

In this guide, I'll show you how to build practical pipelines that solve real problems. We'll cover the basics, sure, but more importantly, we'll dig into the stuff the documentation glosses over—the patterns that actually work in production.

What Are Pipelines, Really?

Here's the simple version: after your spider scrapes an item, that item flows through a series of pipelines. Each pipeline can:

  • Transform the item (clean text, format dates, calculate values)
  • Validate the item (check required fields, verify data types)
  • Enrich the item (add timestamps, fetch related data)
  • Store the item (save to database, file, API)
  • Drop the item (if it's invalid or a duplicate)

Pipelines run in order, and each one decides whether to pass the item to the next pipeline or stop processing it entirely.

When You Actually Need Pipelines

You DON'T need pipelines if:

  • You're just exploring/testing and Scrapy's default output (JSON/CSV) is fine
  • Your data is already clean and ready to use
  • You're scraping once and done

You DO need pipelines if:

  • You need to clean or transform data (dates, prices, text)
  • You want to validate items before storing them
  • You need to remove duplicates
  • You're storing data in a database
  • You want to add metadata (scrape timestamps, source URLs)
  • You're processing images or files

Your First Pipeline: Adding Timestamps

Let's start with something simple but super useful: adding a timestamp to every scraped item.

The Pipeline

# pipelines.py
from datetime import datetime, timezone
from itemadapter import ItemAdapter

class TimestampPipeline:
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        adapter['scraped_at'] = datetime.now(timezone.utc).isoformat()
        adapter['spider_name'] = spider.name
        return item
Enter fullscreen mode Exit fullscreen mode

Enable It

# settings.py
ITEM_PIPELINES = {
    'myproject.pipelines.TimestampPipeline': 100,
}
Enter fullscreen mode Exit fullscreen mode

What's Happening Here

The number 100 is the priority—lower numbers run first. Scrapy processes pipelines from lowest to highest (100 → 200 → 300, etc.).

Why use ItemAdapter? It lets your pipeline work with both dict-based items and Scrapy Item objects. Always use it in production code.

Data Cleaning Pipeline (The Real-World Version)

Documentation shows you how to validate prices. Let me show you how to clean messy real-world data.

The Problem

Real scraped data looks like this:

  • Prices: "$1,234.99", "1234.99 USD", "Price: $1,234"
  • Dates: "Jan 5, 2024", "2024-01-05", "5th January 2024"
  • Text: Extra whitespace, HTML entities, weird characters

The Solution

# pipelines.py
import re
from datetime import datetime
from html import unescape
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem

class DataCleaningPipeline:
    def process_item(self, item, spider):
        adapter = ItemAdapter(item)

        # Clean price
        if adapter.get('price'):
            adapter['price'] = self.clean_price(adapter['price'])

        # Clean text fields
        for field in ['title', 'description', 'author']:
            if adapter.get(field):
                adapter[field] = self.clean_text(adapter[field])

        # Parse dates
        if adapter.get('published_date'):
            adapter['published_date'] = self.parse_date(adapter['published_date'])

        return item

    def clean_price(self, price_str):
        """Extract numeric price from messy string"""
        if not price_str:
            return None

        # Remove currency symbols, commas, text
        cleaned = re.sub(r'[^\d.]', '', str(price_str))

        try:
            return float(cleaned)
        except ValueError:
            return None

    def clean_text(self, text):
        """Clean text: strip whitespace, decode HTML entities, remove weird chars"""
        if not text:
            return ''

        # Decode HTML entities (& -> &, etc)
        text = unescape(str(text))

        # Strip whitespace
        text = ' '.join(text.split())

        # Remove non-printable characters
        text = ''.join(char for char in text if char.isprintable())

        return text.strip()

    def parse_date(self, date_str):
        """Try multiple date formats"""
        if not date_str:
            return None

        date_formats = [
            '%Y-%m-%d',           # 2024-01-05
            '%d/%m/%Y',           # 05/01/2024
            '%B %d, %Y',          # January 5, 2024
            '%b %d, %Y',          # Jan 5, 2024
            '%d %B %Y',           # 5 January 2024
        ]

        for fmt in date_formats:
            try:
                return datetime.strptime(date_str.strip(), fmt).date().isoformat()
            except ValueError:
                continue

        spider.logger.warning(f'Could not parse date: {date_str}')
        return None
Enter fullscreen mode Exit fullscreen mode

Why This Matters

Real-world data is messy. A robust cleaning pipeline saves you hours of manual data fixing later.

Validation Pipeline (Drop Bad Items)

Here's how to filter out items that don't meet your requirements.

# pipelines.py
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem

class ValidationPipeline:
    def __init__(self):
        self.required_fields = ['title', 'url']
        self.min_title_length = 5
        self.max_price = 10000

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)

        # Check required fields exist
        for field in self.required_fields:
            if not adapter.get(field):
                raise DropItem(f'Missing required field: {field} in {adapter.get("url", "unknown")}')

        # Validate title length
        if len(adapter['title']) < self.min_title_length:
            raise DropItem(f'Title too short: {adapter["title"]}')

        # Validate price range
        if adapter.get('price'):
            if adapter['price'] < 0 or adapter['price'] > self.max_price:
                raise DropItem(f'Invalid price: {adapter["price"]}')

        # Validate URL format
        if not adapter['url'].startswith(('http://', 'https://')):
            raise DropItem(f'Invalid URL format: {adapter["url"]}')

        return item
Enter fullscreen mode Exit fullscreen mode

Key Points

  • DropItem stops the item from continuing through pipelines
  • Always log why you're dropping items—debugging later is painful without logs
  • Put validation early in your pipeline order (low numbers like 200-300)

Duplicate Detection (The Smart Way)

The docs show a basic duplicate filter. Here's a better version that handles multiple fields and provides detailed logging.

# pipelines.py
from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem
import hashlib
import json

class SmartDuplicatesPipeline:
    def __init__(self):
        self.seen_hashes = set()
        self.duplicate_count = 0

    def open_spider(self, spider):
        spider.logger.info('Duplicate detection enabled')

    def close_spider(self, spider):
        spider.logger.info(
            f'Duplicate detection: Found {self.duplicate_count} duplicates, '
            f'{len(self.seen_hashes)} unique items processed'
        )

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)

        # Create hash based on multiple fields
        item_hash = self.get_item_hash(adapter)

        if item_hash in self.seen_hashes:
            self.duplicate_count += 1
            raise DropItem(f'Duplicate item found: {adapter.get("url", "unknown")}')

        self.seen_hashes.add(item_hash)
        return item

    def get_item_hash(self, adapter):
        """Create unique hash based on key fields"""
        # Choose fields that make an item unique
        unique_fields = {
            'title': adapter.get('title', ''),
            'url': adapter.get('url', ''),
        }

        # Create stable hash
        hash_str = json.dumps(unique_fields, sort_keys=True)
        return hashlib.md5(hash_str.encode()).hexdigest()
Enter fullscreen mode Exit fullscreen mode

What Makes This Better

  • Multi-field detection: Uses multiple fields (title + URL) instead of just one
  • Detailed logging: Reports exactly how many duplicates were found
  • Memory efficient: Only stores hashes, not entire items
  • Configurable: Easy to change which fields determine uniqueness

Database Storage (PostgreSQL Example)

Here's a production-ready database pipeline with proper error handling.

# pipelines.py
import psycopg2
from psycopg2 import sql, extras
from itemadapter import ItemAdapter

class PostgresPipeline:
    def __init__(self, db_settings):
        self.db_settings = db_settings
        self.connection = None
        self.cursor = None
        self.items_buffer = []
        self.buffer_size = 100  # Batch insert for performance

    @classmethod
    def from_crawler(cls, crawler):
        # Get database settings from settings.py
        db_settings = {
            'host': crawler.settings.get('POSTGRES_HOST', 'localhost'),
            'port': crawler.settings.get('POSTGRES_PORT', 5432),
            'database': crawler.settings.get('POSTGRES_DB'),
            'user': crawler.settings.get('POSTGRES_USER'),
            'password': crawler.settings.get('POSTGRES_PASSWORD'),
        }

        # Validate required settings
        if not all([db_settings['database'], db_settings['user']]):
            raise ValueError('POSTGRES_DB and POSTGRES_USER must be set in settings')

        return cls(db_settings)

    def open_spider(self, spider):
        """Connect to database when spider starts"""
        try:
            self.connection = psycopg2.connect(**self.db_settings)
            self.cursor = self.connection.cursor()
            spider.logger.info(f'Connected to PostgreSQL database: {self.db_settings["database"]}')

            # Create table if not exists
            self.create_table(spider)

        except psycopg2.Error as e:
            spider.logger.error(f'Database connection failed: {e}')
            raise

    def close_spider(self, spider):
        """Flush remaining items and close connection"""
        if self.items_buffer:
            self.flush_items(spider)

        if self.cursor:
            self.cursor.close()
        if self.connection:
            self.connection.close()
            spider.logger.info('Database connection closed')

    def create_table(self, spider):
        """Create table if it doesn't exist"""
        create_query = """
        CREATE TABLE IF NOT EXISTS scraped_items (
            id SERIAL PRIMARY KEY,
            title VARCHAR(500),
            url TEXT UNIQUE,
            price DECIMAL(10, 2),
            description TEXT,
            scraped_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
        """

        try:
            self.cursor.execute(create_query)
            self.connection.commit()
            spider.logger.info('Table checked/created successfully')
        except psycopg2.Error as e:
            spider.logger.error(f'Table creation failed: {e}')
            self.connection.rollback()

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)

        # Add to buffer
        self.items_buffer.append({
            'title': adapter.get('title'),
            'url': adapter.get('url'),
            'price': adapter.get('price'),
            'description': adapter.get('description'),
        })

        # Flush buffer if full
        if len(self.items_buffer) >= self.buffer_size:
            self.flush_items(spider)

        return item

    def flush_items(self, spider):
        """Batch insert items for better performance"""
        if not self.items_buffer:
            return

        insert_query = """
        INSERT INTO scraped_items (title, url, price, description)
        VALUES %(values)s
        ON CONFLICT (url) DO UPDATE SET
            title = EXCLUDED.title,
            price = EXCLUDED.price,
            description = EXCLUDED.description,
            scraped_at = CURRENT_TIMESTAMP
        """

        try:
            # Batch insert with execute_values
            extras.execute_values(
                self.cursor,
                insert_query,
                [tuple(item.values()) for item in self.items_buffer],
                template='(%s, %s, %s, %s)',
                page_size=self.buffer_size
            )

            self.connection.commit()
            spider.logger.info(f'Inserted/updated {len(self.items_buffer)} items')
            self.items_buffer = []

        except psycopg2.Error as e:
            spider.logger.error(f'Batch insert failed: {e}')
            self.connection.rollback()
            self.items_buffer = []
Enter fullscreen mode Exit fullscreen mode

Enable It

# settings.py
ITEM_PIPELINES = {
    'myproject.pipelines.TimestampPipeline': 100,
    'myproject.pipelines.DataCleaningPipeline': 200,
    'myproject.pipelines.ValidationPipeline': 300,
    'myproject.pipelines.SmartDuplicatesPipeline': 400,
    'myproject.pipelines.PostgresPipeline': 800,
}

# Database settings
POSTGRES_HOST = 'localhost'
POSTGRES_PORT = 5432
POSTGRES_DB = 'scrapy_data'
POSTGRES_USER = 'your_user'
POSTGRES_PASSWORD = 'your_password'
Enter fullscreen mode Exit fullscreen mode

What Makes This Production-Ready

  • Batch inserts: Much faster than inserting one at a time
  • Connection pooling: Proper connection management
  • Error handling: Graceful handling of database errors
  • Upserts: Updates existing records instead of failing on duplicates
  • Logging: Detailed logs for debugging
  • Configuration: All settings externalized to settings.py

Conditional Pipelines (Per-Spider Logic)

Sometimes you want different pipelines for different spiders. Here's how:

# pipelines.py
from itemadapter import ItemAdapter

class ConditionalPipeline:
    def process_item(self, item, spider):
        # Only process items from specific spiders
        if spider.name not in ['amazon_spider', 'ebay_spider']:
            return item

        adapter = ItemAdapter(item)

        # Spider-specific logic
        if spider.name == 'amazon_spider':
            # Amazon-specific processing
            if adapter.get('asin'):
                adapter['product_id'] = f"AMZN-{adapter['asin']}"

        elif spider.name == 'ebay_spider':
            # eBay-specific processing
            if adapter.get('item_number'):
                adapter['product_id'] = f"EBAY-{adapter['item_number']}"

        return item
Enter fullscreen mode Exit fullscreen mode

Alternative: Pipeline Settings Per Spider

# spiders/amazon.py
class AmazonSpider(scrapy.Spider):
    name = 'amazon'

    custom_settings = {
        'ITEM_PIPELINES': {
            'myproject.pipelines.AmazonPipeline': 300,
            'myproject.pipelines.DatabasePipeline': 800,
        }
    }
Enter fullscreen mode Exit fullscreen mode

Image Download Pipeline (Beyond the Basics)

Scrapy has a built-in ImagesPipeline, but here's what the docs don't explain well:

Custom Image Pipeline with Smart Naming

# pipelines.py
from scrapy.pipelines.images import ImagesPipeline
from scrapy.exceptions import DropItem
from urllib.parse import urlparse
import hashlib

class SmartImagePipeline(ImagesPipeline):
    def get_media_requests(self, item, info):
        # Download images from image_urls field
        for image_url in item.get('image_urls', []):
            yield scrapy.Request(
                image_url,
                meta={'item_title': item.get('title', 'unknown')}
            )

    def file_path(self, request, response=None, info=None, *, item=None):
        """Generate meaningful filenames"""
        # Extract title from meta
        title = request.meta.get('item_title', 'unknown')

        # Clean title for filename
        safe_title = ''.join(c for c in title if c.isalnum() or c in (' ', '-', '_'))
        safe_title = safe_title.replace(' ', '_')[:50]  # Limit length

        # Add hash to avoid collisions
        url_hash = hashlib.md5(request.url.encode()).hexdigest()[:8]

        # Get file extension
        url_path = urlparse(request.url).path
        extension = url_path.split('.')[-1] if '.' in url_path else 'jpg'

        return f'{safe_title}_{url_hash}.{extension}'

    def item_completed(self, results, item, info):
        """Add downloaded image paths to item"""
        image_paths = [x['path'] for ok, x in results if ok]

        if not image_paths:
            raise DropItem('No images downloaded')

        item['image_paths'] = image_paths
        return item
Enter fullscreen mode Exit fullscreen mode

Enable It

# settings.py
ITEM_PIPELINES = {
    'myproject.pipelines.SmartImagePipeline': 1,
}

IMAGES_STORE = './downloaded_images'
IMAGES_EXPIRES = 90  # Days before re-downloading
Enter fullscreen mode Exit fullscreen mode

Async Pipeline (For External API Calls)

Need to enrich items by calling an external API? Use async:

# pipelines.py
import aiohttp
from itemadapter import ItemAdapter

class AsyncAPIEnrichmentPipeline:
    def __init__(self):
        self.session = None

    def open_spider(self, spider):
        # Create aiohttp session
        self.session = aiohttp.ClientSession()

    async def close_spider(self, spider):
        # Close session
        if self.session:
            await self.session.close()

    async def process_item(self, item, spider):
        adapter = ItemAdapter(item)

        # Example: Fetch additional data from API
        if adapter.get('isbn'):
            book_data = await self.fetch_book_details(adapter['isbn'])
            if book_data:
                adapter['author'] = book_data.get('author')
                adapter['publisher'] = book_data.get('publisher')

        return item

    async def fetch_book_details(self, isbn):
        """Fetch book details from external API"""
        url = f'https://api.example.com/books/{isbn}'

        try:
            async with self.session.get(url, timeout=5) as response:
                if response.status == 200:
                    return await response.json()
        except Exception as e:
            # Log but don't fail the item
            return None
Enter fullscreen mode Exit fullscreen mode

Common Mistakes to Avoid

1. Not Returning the Item

# WRONG - Item disappears!
def process_item(self, item, spider):
    item['processed'] = True
    # Missing return!

# RIGHT
def process_item(self, item, spider):
    item['processed'] = True
    return item
Enter fullscreen mode Exit fullscreen mode

2. Modifying Immutable Items

# WRONG - Some items are read-only
def process_item(self, item, spider):
    item['new_field'] = 'value'  # Might fail!

# RIGHT - Use ItemAdapter
def process_item(self, item, spider):
    adapter = ItemAdapter(item)
    adapter['new_field'] = 'value'
    return item
Enter fullscreen mode Exit fullscreen mode

3. Not Closing Resources

# WRONG - Database connection leak
class BadPipeline:
    def open_spider(self, spider):
        self.db = connect_to_db()
    # Missing close_spider!

# RIGHT
class GoodPipeline:
    def open_spider(self, spider):
        self.db = connect_to_db()

    def close_spider(self, spider):
        self.db.close()
Enter fullscreen mode Exit fullscreen mode

4. Wrong Pipeline Order

# WRONG - Validation happens AFTER storage!
ITEM_PIPELINES = {
    'myproject.pipelines.DatabasePipeline': 300,
    'myproject.pipelines.ValidationPipeline': 800,  # Too late!
}

# RIGHT - Validate before storing
ITEM_PIPELINES = {
    'myproject.pipelines.ValidationPipeline': 300,
    'myproject.pipelines.DatabasePipeline': 800,
}
Enter fullscreen mode Exit fullscreen mode

Testing Your Pipelines

Here's how to test pipelines without running the full spider:

# test_pipelines.py
from myproject.pipelines import DataCleaningPipeline
from scrapy.http import Response, Request
from scrapy.spiders import Spider

def test_price_cleaning():
    pipeline = DataCleaningPipeline()
    spider = Spider('test')

    # Test item
    item = {'price': '$1,234.99'}

    # Process it
    result = pipeline.process_item(item, spider)

    # Check result
    assert result['price'] == 1234.99
    print('✓ Price cleaning works!')

if __name__ == '__main__':
    test_price_cleaning()
Enter fullscreen mode Exit fullscreen mode

Production Pipeline Setup

Here's a complete, production-ready pipeline configuration:

# settings.py
ITEM_PIPELINES = {
    # 1. Add metadata (100-199)
    'myproject.pipelines.TimestampPipeline': 100,

    # 2. Clean data (200-299)
    'myproject.pipelines.DataCleaningPipeline': 200,

    # 3. Validate (300-399)
    'myproject.pipelines.ValidationPipeline': 300,

    # 4. Deduplicate (400-499)
    'myproject.pipelines.SmartDuplicatesPipeline': 400,

    # 5. Enrich (500-699)
    'myproject.pipelines.AsyncAPIEnrichmentPipeline': 500,

    # 6. Store (800-899)
    'myproject.pipelines.PostgresPipeline': 800,

    # 7. Images (special - run first)
    'myproject.pipelines.SmartImagePipeline': 1,
}

# Performance settings
CONCURRENT_ITEMS = 100  # Process items in parallel
Enter fullscreen mode Exit fullscreen mode

Final Thoughts

Pipelines are where good scrapers become great scrapers. They're what separate a quick script from a maintainable, production-ready system.

Key takeaways:

  • Order matters: Clean → Validate → Deduplicate → Store
  • Always use ItemAdapter: Future-proofs your code
  • Batch operations: Much faster for databases
  • Proper error handling: Log errors, don't crash
  • Close resources: Use open_spider/close_spider
  • Test your pipelines: Don't wait for production to find bugs

Start simple, add complexity as you need it. A good pipeline setup is like good infrastructure—you barely notice it when it's working right.


Next steps: Try building a pipeline that:

  • Sends items to an API endpoint
  • Generates thumbnails from images
  • Implements a circuit breaker for external services

Drop a comment if you want to see any of these in detail!

Top comments (0)