DEV Community

Muhammad Ikramullah Khan
Muhammad Ikramullah Khan

Posted on

Using ORM with Scrapy: The Complete Guide

In the last blog, I learned about ORM. Writing Python instead of SQL. Clean code. No more complex INSERT statements.

But how do I actually use it in my Scrapy spiders?

I tried adding ORM to my pipeline. It crashed. Session errors. Connection problems. Duplicate key errors. I had no idea how to integrate ORM properly with Scrapy.

After hours of trial and error, I figured it out. Now my spiders save data with clean Python code, handle relationships automatically, and update existing records seamlessly.

Let me show you how to use ORM with Scrapy the right way.


What We'll Build

In this guide, we'll create:

A product scraper that:

  • Scrapes products from an e-commerce site
  • Scrapes reviews for each product
  • Saves everything to database using ORM
  • Handles product-review relationships automatically
  • Updates existing products (no duplicates)
  • All with clean Python code (no SQL!)

Final result:

# Clean pipeline code
def process_item(self, item, spider):
    product = Product(**item)
    session.merge(product)
    session.commit()
    # That's it!
Enter fullscreen mode Exit fullscreen mode

No messy SQL. Just Python objects.


Project Setup

Step 1: Install Dependencies

pip install scrapy sqlalchemy
Enter fullscreen mode Exit fullscreen mode

Step 2: Create Scrapy Project

scrapy startproject ecommerce
cd ecommerce
Enter fullscreen mode Exit fullscreen mode

Step 3: Project Structure

ecommerce/
├── scrapy.cfg
├── ecommerce/
│   ├── __init__.py
│   ├── settings.py
│   ├── models.py          # NEW: ORM models
│   ├── pipelines.py       # ORM pipeline
│   ├── items.py
│   └── spiders/
│       └── products.py
Enter fullscreen mode Exit fullscreen mode

Step 1: Define ORM Models

Create models.py with our database models:

# ecommerce/models.py
from sqlalchemy import create_engine, Column, Integer, String, Decimal, Text, DateTime, ForeignKey, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from datetime import datetime

Base = declarative_base()

class Product(Base):
    """Product model"""
    __tablename__ = 'products'

    id = Column(Integer, primary_key=True)
    name = Column(String(200), nullable=False)
    price = Column(Decimal(10, 2))
    url = Column(String(500), unique=True, nullable=False)
    description = Column(Text)
    category = Column(String(100))
    in_stock = Column(Boolean, default=True)
    image_url = Column(String(500))
    created_at = Column(DateTime, default=datetime.now)
    updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)

    # Relationship to reviews
    reviews = relationship("Review", back_populates="product", cascade="all, delete-orphan")

    def __repr__(self):
        return f"<Product(name='{self.name}', price={self.price})>"

class Review(Base):
    """Review model"""
    __tablename__ = 'reviews'

    id = Column(Integer, primary_key=True)
    product_id = Column(Integer, ForeignKey('products.id'), nullable=False)
    rating = Column(Integer)
    title = Column(String(200))
    text = Column(Text)
    author = Column(String(100))
    created_at = Column(DateTime, default=datetime.now)

    # Relationship to product
    product = relationship("Product", back_populates="reviews")

    def __repr__(self):
        return f"<Review(rating={self.rating}, author='{self.author}')>"

def create_tables(engine):
    """Create all tables"""
    Base.metadata.create_all(engine)

def get_engine(database_url='sqlite:///ecommerce.db'):
    """Create database engine"""
    return create_engine(database_url, echo=False)

def get_session(engine):
    """Create database session"""
    Session = sessionmaker(bind=engine)
    return Session()
Enter fullscreen mode Exit fullscreen mode

What this does:

  • Defines Product table (stores product info)
  • Defines Review table (stores reviews)
  • Creates relationship (one product has many reviews)
  • Provides helper functions for setup

Step 2: Define Scrapy Items

# ecommerce/items.py
import scrapy

class ProductItem(scrapy.Item):
    name = scrapy.Field()
    price = scrapy.Field()
    url = scrapy.Field()
    description = scrapy.Field()
    category = scrapy.Field()
    in_stock = scrapy.Field()
    image_url = scrapy.Field()

class ReviewItem(scrapy.Item):
    product_url = scrapy.Field()  # To link review to product
    rating = scrapy.Field()
    title = scrapy.Field()
    text = scrapy.Field()
    author = scrapy.Field()
Enter fullscreen mode Exit fullscreen mode

Step 3: Create ORM Pipeline

This is the magic part:

# ecommerce/pipelines.py
from sqlalchemy.orm import sessionmaker
from ecommerce.models import Product, Review, get_engine, create_tables

class ORMPipeline:
    def __init__(self):
        """Initialize database connection"""
        # Create engine
        self.engine = get_engine('sqlite:///ecommerce.db')

        # Create tables if they don't exist
        create_tables(self.engine)

        # Create session factory
        Session = sessionmaker(bind=self.engine)
        self.session = Session()

    def process_item(self, item, spider):
        """Process item and save to database"""
        # Check item type
        if item.__class__.__name__ == 'ProductItem':
            return self.process_product(item, spider)
        elif item.__class__.__name__ == 'ReviewItem':
            return self.process_review(item, spider)

        return item

    def process_product(self, item, spider):
        """Save product to database"""
        try:
            # Check if product exists
            existing = self.session.query(Product).filter(
                Product.url == item['url']
            ).first()

            if existing:
                # Update existing product
                existing.name = item.get('name')
                existing.price = item.get('price')
                existing.description = item.get('description')
                existing.category = item.get('category')
                existing.in_stock = item.get('in_stock', True)
                existing.image_url = item.get('image_url')

                spider.logger.info(f"Updated product: {item['name']}")
            else:
                # Create new product
                product = Product(
                    name=item.get('name'),
                    price=item.get('price'),
                    url=item.get('url'),
                    description=item.get('description'),
                    category=item.get('category'),
                    in_stock=item.get('in_stock', True),
                    image_url=item.get('image_url')
                )

                self.session.add(product)
                spider.logger.info(f"Created product: {item['name']}")

            # Commit changes
            self.session.commit()

        except Exception as e:
            spider.logger.error(f"Error saving product: {e}")
            self.session.rollback()

        return item

    def process_review(self, item, spider):
        """Save review to database"""
        try:
            # Find the product this review belongs to
            product = self.session.query(Product).filter(
                Product.url == item['product_url']
            ).first()

            if not product:
                spider.logger.warning(f"Product not found for review: {item['product_url']}")
                return item

            # Create review
            review = Review(
                product_id=product.id,
                rating=item.get('rating'),
                title=item.get('title'),
                text=item.get('text'),
                author=item.get('author')
            )

            self.session.add(review)
            self.session.commit()

            spider.logger.info(f"Created review for: {product.name}")

        except Exception as e:
            spider.logger.error(f"Error saving review: {e}")
            self.session.rollback()

        return item

    def close_spider(self, spider):
        """Close database connection when spider closes"""
        # Get statistics
        total_products = self.session.query(Product).count()
        total_reviews = self.session.query(Review).count()

        spider.logger.info(f"Total products in database: {total_products}")
        spider.logger.info(f"Total reviews in database: {total_reviews}")

        # Close session
        self.session.close()
Enter fullscreen mode Exit fullscreen mode

What this pipeline does:

  • Creates database connection on spider start
  • Handles both products and reviews
  • Updates existing products (no duplicates!)
  • Links reviews to products automatically
  • Logs statistics when spider closes
  • Handles errors gracefully

Step 4: Create Spider

# ecommerce/spiders/products.py
import scrapy
from ecommerce.items import ProductItem, ReviewItem

class ProductSpider(scrapy.Spider):
    name = 'products'
    allowed_domains = ['example.com']
    start_urls = ['https://example.com/products']

    def parse(self, response):
        """Parse product listing page"""

        # Extract each product
        for product in response.css('.product'):
            product_url = response.urljoin(
                product.css('a::attr(href)').get()
            )

            # Go to product detail page
            yield scrapy.Request(
                product_url,
                callback=self.parse_product
            )

        # Follow pagination
        next_page = response.css('.next::attr(href)').get()
        if next_page:
            yield response.follow(next_page, self.parse)

    def parse_product(self, response):
        """Parse individual product page"""

        # Extract product data
        product = ProductItem()
        product['name'] = response.css('h1.product-name::text').get()
        product['price'] = self.extract_price(
            response.css('.price::text').get()
        )
        product['url'] = response.url
        product['description'] = response.css('.description::text').get()
        product['category'] = response.css('.category::text').get()
        product['in_stock'] = self.check_stock(
            response.css('.stock::text').get()
        )
        product['image_url'] = response.css('.product-image::attr(src)').get()

        yield product

        # Now scrape reviews for this product
        for review in response.css('.review'):
            review_item = ReviewItem()
            review_item['product_url'] = response.url
            review_item['rating'] = self.extract_rating(
                review.css('.rating::attr(data-rating)').get()
            )
            review_item['title'] = review.css('.review-title::text').get()
            review_item['text'] = review.css('.review-text::text').get()
            review_item['author'] = review.css('.author::text').get()

            yield review_item

    def extract_price(self, price_text):
        """Extract numeric price from text"""
        if not price_text:
            return None

        # Remove currency symbols and convert to float
        clean_price = price_text.replace('$', '').replace(',', '').strip()

        try:
            return float(clean_price)
        except ValueError:
            return None

    def check_stock(self, stock_text):
        """Check if product is in stock"""
        if not stock_text:
            return True

        return 'in stock' in stock_text.lower()

    def extract_rating(self, rating_text):
        """Extract numeric rating"""
        if not rating_text:
            return None

        try:
            return int(rating_text)
        except ValueError:
            return None
Enter fullscreen mode Exit fullscreen mode

Step 5: Configure Settings

# ecommerce/settings.py

BOT_NAME = 'ecommerce'
SPIDER_MODULES = ['ecommerce.spiders']

# Enable pipeline
ITEM_PIPELINES = {
    'ecommerce.pipelines.ORMPipeline': 300,
}

# Be polite
ROBOTSTXT_OBEY = True
DOWNLOAD_DELAY = 1

# User agent
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
Enter fullscreen mode Exit fullscreen mode

Step 6: Run the Spider

scrapy crawl products
Enter fullscreen mode Exit fullscreen mode

What happens:

  1. Spider scrapes product pages
  2. For each product, yields ProductItem
  3. Pipeline saves product to database (or updates if exists)
  4. Spider scrapes reviews for that product
  5. Yields ReviewItem for each review
  6. Pipeline saves reviews (linked to product automatically!)
  7. Repeat for all products

No SQL written! All handled by ORM.


Querying Your Data

After scraping, query your data:

# query_data.py
from ecommerce.models import Product, Review, get_engine, get_session

# Create session
engine = get_engine('sqlite:///ecommerce.db')
session = get_session(engine)

print("=== Product Statistics ===")
print(f"Total products: {session.query(Product).count()}")
print(f"Total reviews: {session.query(Review).count()}")

# Get products with reviews
print("\n=== Products with Reviews ===")
products_with_reviews = session.query(Product).filter(
    Product.reviews.any()
).all()

for product in products_with_reviews[:5]:
    print(f"\n{product.name} (${product.price})")
    print(f"  Reviews: {len(product.reviews)}")

    # Calculate average rating
    if product.reviews:
        avg_rating = sum(r.rating for r in product.reviews) / len(product.reviews)
        print(f"  Average rating: {avg_rating:.1f}/5")

    # Show first review
    if product.reviews:
        review = product.reviews[0]
        print(f"  Latest review: \"{review.title}\" by {review.author}")

# Find expensive products
print("\n=== Expensive Products (>$500) ===")
expensive = session.query(Product).filter(
    Product.price > 500
).order_by(Product.price.desc()).limit(5)

for product in expensive:
    print(f"{product.name}: ${product.price}")

# Find highly rated products
print("\n=== Highly Rated Products (4+ stars) ===")
from sqlalchemy import func

highly_rated = session.query(Product).join(Review).group_by(
    Product.id
).having(
    func.avg(Review.rating) >= 4
).all()

for product in highly_rated[:5]:
    avg_rating = sum(r.rating for r in product.reviews) / len(product.reviews)
    print(f"{product.name}: {avg_rating:.1f} stars ({len(product.reviews)} reviews)")

session.close()
Enter fullscreen mode Exit fullscreen mode

The power of ORM:

  • Access reviews: product.reviews
  • Access product: review.product
  • Relationships handled automatically!

Advanced: Batch Processing

For better performance, save items in batches:

# pipelines.py (improved version)
class BatchORMPipeline:
    def __init__(self):
        self.engine = get_engine('sqlite:///ecommerce.db')
        create_tables(self.engine)
        Session = sessionmaker(bind=self.engine)
        self.session = Session()

        # Batch storage
        self.products_batch = []
        self.reviews_batch = []
        self.batch_size = 100

    def process_item(self, item, spider):
        if item.__class__.__name__ == 'ProductItem':
            self.products_batch.append(item)

            # Save batch when full
            if len(self.products_batch) >= self.batch_size:
                self.save_products_batch(spider)

        elif item.__class__.__name__ == 'ReviewItem':
            self.reviews_batch.append(item)

            if len(self.reviews_batch) >= self.batch_size:
                self.save_reviews_batch(spider)

        return item

    def save_products_batch(self, spider):
        """Save batch of products"""
        try:
            for item in self.products_batch:
                # Check if exists
                existing = self.session.query(Product).filter(
                    Product.url == item['url']
                ).first()

                if existing:
                    # Update
                    for key, value in item.items():
                        setattr(existing, key, value)
                else:
                    # Create
                    product = Product(**item)
                    self.session.add(product)

            self.session.commit()
            spider.logger.info(f"Saved batch of {len(self.products_batch)} products")

            # Clear batch
            self.products_batch = []

        except Exception as e:
            spider.logger.error(f"Error saving batch: {e}")
            self.session.rollback()
            self.products_batch = []

    def save_reviews_batch(self, spider):
        """Save batch of reviews"""
        try:
            for item in self.reviews_batch:
                # Find product
                product = self.session.query(Product).filter(
                    Product.url == item['product_url']
                ).first()

                if product:
                    review = Review(
                        product_id=product.id,
                        rating=item.get('rating'),
                        title=item.get('title'),
                        text=item.get('text'),
                        author=item.get('author')
                    )
                    self.session.add(review)

            self.session.commit()
            spider.logger.info(f"Saved batch of {len(self.reviews_batch)} reviews")

            self.reviews_batch = []

        except Exception as e:
            spider.logger.error(f"Error saving batch: {e}")
            self.session.rollback()
            self.reviews_batch = []

    def close_spider(self, spider):
        """Save remaining items"""
        if self.products_batch:
            self.save_products_batch(spider)

        if self.reviews_batch:
            self.save_reviews_batch(spider)

        # Statistics
        total_products = self.session.query(Product).count()
        total_reviews = self.session.query(Review).count()

        spider.logger.info(f"Final: {total_products} products, {total_reviews} reviews")

        self.session.close()
Enter fullscreen mode Exit fullscreen mode

Batch processing benefits:

  • Faster (fewer commits)
  • More efficient (fewer database operations)
  • Better for large scraping jobs

Using PostgreSQL Instead of SQLite

For production, use PostgreSQL:

Step 1: Install PostgreSQL Driver

pip install psycopg2-binary
Enter fullscreen mode Exit fullscreen mode

Step 2: Update models.py

# ecommerce/models.py

def get_engine(database_url=None):
    """Create database engine"""
    if database_url is None:
        # Use PostgreSQL in production
        database_url = 'postgresql://username:password@localhost/ecommerce_db'

    return create_engine(database_url, echo=False, pool_size=10, max_overflow=20)
Enter fullscreen mode Exit fullscreen mode

Step 3: Create PostgreSQL Database

# In PostgreSQL
psql -U postgres

CREATE DATABASE ecommerce_db;
CREATE USER scrapy_user WITH PASSWORD 'your_password';
GRANT ALL PRIVILEGES ON DATABASE ecommerce_db TO scrapy_user;
Enter fullscreen mode Exit fullscreen mode

Step 4: Update settings.py

# ecommerce/settings.py

# Database URL
DATABASE_URL = 'postgresql://scrapy_user:your_password@localhost/ecommerce_db'
Enter fullscreen mode Exit fullscreen mode

Step 5: Pass URL to Pipeline

# pipelines.py
class ORMPipeline:
    def __init__(self):
        from ecommerce.settings import DATABASE_URL
        self.engine = get_engine(DATABASE_URL)
        # Rest of code...
Enter fullscreen mode Exit fullscreen mode

That's it! Now using PostgreSQL with ORM.


Handling Multiple Spiders

For distributed crawling, use connection pooling:

# pipelines.py
from sqlalchemy.pool import QueuePool

class PooledORMPipeline:
    # Class-level engine (shared across spiders)
    _engine = None

    @classmethod
    def from_crawler(cls, crawler):
        """Create pipeline with shared engine"""
        if cls._engine is None:
            cls._engine = create_engine(
                'postgresql://user:pass@localhost/db',
                poolclass=QueuePool,
                pool_size=10,
                max_overflow=20
            )
            create_tables(cls._engine)

        return cls()

    def __init__(self):
        """Create session from shared engine"""
        Session = sessionmaker(bind=self._engine)
        self.session = Session()

    def process_item(self, item, spider):
        # Same as before
        pass

    def close_spider(self, spider):
        self.session.close()
        # Don't close engine (shared across spiders)
Enter fullscreen mode Exit fullscreen mode

Benefits:

  • Multiple spiders share connection pool
  • Better performance
  • Handles concurrent access properly

Common Issues and Solutions

Issue 1: "Table already exists"

Error:

sqlalchemy.exc.OperationalError: table products already exists
Enter fullscreen mode Exit fullscreen mode

Solution:
Use create_tables() only once, or check if tables exist:

def create_tables(engine):
    Base.metadata.create_all(engine, checkfirst=True)
Enter fullscreen mode Exit fullscreen mode

Issue 2: Session Errors

Error:

Session is closed
Enter fullscreen mode Exit fullscreen mode

Solution:
Always create new session for each spider:

def __init__(self):
    Session = sessionmaker(bind=self.engine)
    self.session = Session()  # New session
Enter fullscreen mode Exit fullscreen mode

Issue 3: Duplicate Key Errors

Error:

UNIQUE constraint failed: products.url
Enter fullscreen mode Exit fullscreen mode

Solution:
Check if product exists before inserting:

existing = self.session.query(Product).filter(
    Product.url == item['url']
).first()

if existing:
    # Update instead of insert
    pass
Enter fullscreen mode Exit fullscreen mode

Issue 4: Relationship Not Working

Problem:
product.reviews returns empty list even though reviews exist.

Solution:
Make sure product_id is set correctly:

review = Review(
    product_id=product.id,  # Must be set!
    rating=5
)
Enter fullscreen mode Exit fullscreen mode

Best Practices

1. Always Use Transactions

try:
    # Database operations
    self.session.add(product)
    self.session.commit()
except:
    self.session.rollback()
    raise
Enter fullscreen mode Exit fullscreen mode

2. Close Sessions Properly

def close_spider(self, spider):
    self.session.close()
Enter fullscreen mode Exit fullscreen mode

3. Handle None Values

product = Product(
    name=item.get('name', 'Unknown'),  # Default value
    price=item.get('price', 0.0)
)
Enter fullscreen mode Exit fullscreen mode

4. Use Indexes

class Product(Base):
    __tablename__ = 'products'

    url = Column(String(500), unique=True, index=True)  # Index!
    category = Column(String(100), index=True)  # Index!
Enter fullscreen mode Exit fullscreen mode

5. Log Everything

spider.logger.info(f"Saved product: {product.name}")
spider.logger.error(f"Error: {e}")
Enter fullscreen mode Exit fullscreen mode

Complete Working Example

Here's everything together:

models.py

from sqlalchemy import create_engine, Column, Integer, String, Decimal, Text, DateTime, ForeignKey, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from datetime import datetime

Base = declarative_base()

class Product(Base):
    __tablename__ = 'products'
    id = Column(Integer, primary_key=True)
    name = Column(String(200), nullable=False)
    price = Column(Decimal(10, 2))
    url = Column(String(500), unique=True, nullable=False, index=True)
    category = Column(String(100), index=True)
    created_at = Column(DateTime, default=datetime.now)
    reviews = relationship("Review", back_populates="product")

class Review(Base):
    __tablename__ = 'reviews'
    id = Column(Integer, primary_key=True)
    product_id = Column(Integer, ForeignKey('products.id'))
    rating = Column(Integer)
    text = Column(Text)
    product = relationship("Product", back_populates="reviews")

def get_engine(db_url='sqlite:///ecommerce.db'):
    return create_engine(db_url)

def create_tables(engine):
    Base.metadata.create_all(engine)
Enter fullscreen mode Exit fullscreen mode

Run your spider:

scrapy crawl products
Enter fullscreen mode Exit fullscreen mode

Query your data:

python query_data.py
Enter fullscreen mode Exit fullscreen mode

Done!


Summary

What we learned:

  • How to integrate ORM with Scrapy
  • Define models for products and reviews
  • Create ORM pipeline
  • Handle relationships automatically
  • Update existing records
  • Batch processing for performance

Key code:

Models:

class Product(Base):
    __tablename__ = 'products'
    reviews = relationship("Review")
Enter fullscreen mode Exit fullscreen mode

Pipeline:

def process_item(self, item, spider):
    product = Product(**item)
    self.session.add(product)
    self.session.commit()
Enter fullscreen mode Exit fullscreen mode

Query:

product = session.query(Product).get(1)
for review in product.reviews:
    print(review.rating)
Enter fullscreen mode Exit fullscreen mode

Benefits:

  • No SQL needed
  • Clean Python code
  • Automatic relationships
  • Easy updates
  • Type safety

Remember:

  • Define models clearly
  • Handle duplicates (check before insert)
  • Close sessions properly
  • Use batch processing for large jobs
  • PostgreSQL for production

You now have production-ready ORM integration with Scrapy!

Happy scraping! 🕷️

Top comments (0)