In the last blog, I learned about ORM. Writing Python instead of SQL. Clean code. No more complex INSERT statements.
But how do I actually use it in my Scrapy spiders?
I tried adding ORM to my pipeline. It crashed. Session errors. Connection problems. Duplicate key errors. I had no idea how to integrate ORM properly with Scrapy.
After hours of trial and error, I figured it out. Now my spiders save data with clean Python code, handle relationships automatically, and update existing records seamlessly.
Let me show you how to use ORM with Scrapy the right way.
What We'll Build
In this guide, we'll create:
A product scraper that:
- Scrapes products from an e-commerce site
- Scrapes reviews for each product
- Saves everything to database using ORM
- Handles product-review relationships automatically
- Updates existing products (no duplicates)
- All with clean Python code (no SQL!)
Final result:
# Clean pipeline code
def process_item(self, item, spider):
product = Product(**item)
session.merge(product)
session.commit()
# That's it!
No messy SQL. Just Python objects.
Project Setup
Step 1: Install Dependencies
pip install scrapy sqlalchemy
Step 2: Create Scrapy Project
scrapy startproject ecommerce
cd ecommerce
Step 3: Project Structure
ecommerce/
├── scrapy.cfg
├── ecommerce/
│ ├── __init__.py
│ ├── settings.py
│ ├── models.py # NEW: ORM models
│ ├── pipelines.py # ORM pipeline
│ ├── items.py
│ └── spiders/
│ └── products.py
Step 1: Define ORM Models
Create models.py with our database models:
# ecommerce/models.py
from sqlalchemy import create_engine, Column, Integer, String, Decimal, Text, DateTime, ForeignKey, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from datetime import datetime
Base = declarative_base()
class Product(Base):
"""Product model"""
__tablename__ = 'products'
id = Column(Integer, primary_key=True)
name = Column(String(200), nullable=False)
price = Column(Decimal(10, 2))
url = Column(String(500), unique=True, nullable=False)
description = Column(Text)
category = Column(String(100))
in_stock = Column(Boolean, default=True)
image_url = Column(String(500))
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
# Relationship to reviews
reviews = relationship("Review", back_populates="product", cascade="all, delete-orphan")
def __repr__(self):
return f"<Product(name='{self.name}', price={self.price})>"
class Review(Base):
"""Review model"""
__tablename__ = 'reviews'
id = Column(Integer, primary_key=True)
product_id = Column(Integer, ForeignKey('products.id'), nullable=False)
rating = Column(Integer)
title = Column(String(200))
text = Column(Text)
author = Column(String(100))
created_at = Column(DateTime, default=datetime.now)
# Relationship to product
product = relationship("Product", back_populates="reviews")
def __repr__(self):
return f"<Review(rating={self.rating}, author='{self.author}')>"
def create_tables(engine):
"""Create all tables"""
Base.metadata.create_all(engine)
def get_engine(database_url='sqlite:///ecommerce.db'):
"""Create database engine"""
return create_engine(database_url, echo=False)
def get_session(engine):
"""Create database session"""
Session = sessionmaker(bind=engine)
return Session()
What this does:
- Defines Product table (stores product info)
- Defines Review table (stores reviews)
- Creates relationship (one product has many reviews)
- Provides helper functions for setup
Step 2: Define Scrapy Items
# ecommerce/items.py
import scrapy
class ProductItem(scrapy.Item):
name = scrapy.Field()
price = scrapy.Field()
url = scrapy.Field()
description = scrapy.Field()
category = scrapy.Field()
in_stock = scrapy.Field()
image_url = scrapy.Field()
class ReviewItem(scrapy.Item):
product_url = scrapy.Field() # To link review to product
rating = scrapy.Field()
title = scrapy.Field()
text = scrapy.Field()
author = scrapy.Field()
Step 3: Create ORM Pipeline
This is the magic part:
# ecommerce/pipelines.py
from sqlalchemy.orm import sessionmaker
from ecommerce.models import Product, Review, get_engine, create_tables
class ORMPipeline:
def __init__(self):
"""Initialize database connection"""
# Create engine
self.engine = get_engine('sqlite:///ecommerce.db')
# Create tables if they don't exist
create_tables(self.engine)
# Create session factory
Session = sessionmaker(bind=self.engine)
self.session = Session()
def process_item(self, item, spider):
"""Process item and save to database"""
# Check item type
if item.__class__.__name__ == 'ProductItem':
return self.process_product(item, spider)
elif item.__class__.__name__ == 'ReviewItem':
return self.process_review(item, spider)
return item
def process_product(self, item, spider):
"""Save product to database"""
try:
# Check if product exists
existing = self.session.query(Product).filter(
Product.url == item['url']
).first()
if existing:
# Update existing product
existing.name = item.get('name')
existing.price = item.get('price')
existing.description = item.get('description')
existing.category = item.get('category')
existing.in_stock = item.get('in_stock', True)
existing.image_url = item.get('image_url')
spider.logger.info(f"Updated product: {item['name']}")
else:
# Create new product
product = Product(
name=item.get('name'),
price=item.get('price'),
url=item.get('url'),
description=item.get('description'),
category=item.get('category'),
in_stock=item.get('in_stock', True),
image_url=item.get('image_url')
)
self.session.add(product)
spider.logger.info(f"Created product: {item['name']}")
# Commit changes
self.session.commit()
except Exception as e:
spider.logger.error(f"Error saving product: {e}")
self.session.rollback()
return item
def process_review(self, item, spider):
"""Save review to database"""
try:
# Find the product this review belongs to
product = self.session.query(Product).filter(
Product.url == item['product_url']
).first()
if not product:
spider.logger.warning(f"Product not found for review: {item['product_url']}")
return item
# Create review
review = Review(
product_id=product.id,
rating=item.get('rating'),
title=item.get('title'),
text=item.get('text'),
author=item.get('author')
)
self.session.add(review)
self.session.commit()
spider.logger.info(f"Created review for: {product.name}")
except Exception as e:
spider.logger.error(f"Error saving review: {e}")
self.session.rollback()
return item
def close_spider(self, spider):
"""Close database connection when spider closes"""
# Get statistics
total_products = self.session.query(Product).count()
total_reviews = self.session.query(Review).count()
spider.logger.info(f"Total products in database: {total_products}")
spider.logger.info(f"Total reviews in database: {total_reviews}")
# Close session
self.session.close()
What this pipeline does:
- Creates database connection on spider start
- Handles both products and reviews
- Updates existing products (no duplicates!)
- Links reviews to products automatically
- Logs statistics when spider closes
- Handles errors gracefully
Step 4: Create Spider
# ecommerce/spiders/products.py
import scrapy
from ecommerce.items import ProductItem, ReviewItem
class ProductSpider(scrapy.Spider):
name = 'products'
allowed_domains = ['example.com']
start_urls = ['https://example.com/products']
def parse(self, response):
"""Parse product listing page"""
# Extract each product
for product in response.css('.product'):
product_url = response.urljoin(
product.css('a::attr(href)').get()
)
# Go to product detail page
yield scrapy.Request(
product_url,
callback=self.parse_product
)
# Follow pagination
next_page = response.css('.next::attr(href)').get()
if next_page:
yield response.follow(next_page, self.parse)
def parse_product(self, response):
"""Parse individual product page"""
# Extract product data
product = ProductItem()
product['name'] = response.css('h1.product-name::text').get()
product['price'] = self.extract_price(
response.css('.price::text').get()
)
product['url'] = response.url
product['description'] = response.css('.description::text').get()
product['category'] = response.css('.category::text').get()
product['in_stock'] = self.check_stock(
response.css('.stock::text').get()
)
product['image_url'] = response.css('.product-image::attr(src)').get()
yield product
# Now scrape reviews for this product
for review in response.css('.review'):
review_item = ReviewItem()
review_item['product_url'] = response.url
review_item['rating'] = self.extract_rating(
review.css('.rating::attr(data-rating)').get()
)
review_item['title'] = review.css('.review-title::text').get()
review_item['text'] = review.css('.review-text::text').get()
review_item['author'] = review.css('.author::text').get()
yield review_item
def extract_price(self, price_text):
"""Extract numeric price from text"""
if not price_text:
return None
# Remove currency symbols and convert to float
clean_price = price_text.replace('$', '').replace(',', '').strip()
try:
return float(clean_price)
except ValueError:
return None
def check_stock(self, stock_text):
"""Check if product is in stock"""
if not stock_text:
return True
return 'in stock' in stock_text.lower()
def extract_rating(self, rating_text):
"""Extract numeric rating"""
if not rating_text:
return None
try:
return int(rating_text)
except ValueError:
return None
Step 5: Configure Settings
# ecommerce/settings.py
BOT_NAME = 'ecommerce'
SPIDER_MODULES = ['ecommerce.spiders']
# Enable pipeline
ITEM_PIPELINES = {
'ecommerce.pipelines.ORMPipeline': 300,
}
# Be polite
ROBOTSTXT_OBEY = True
DOWNLOAD_DELAY = 1
# User agent
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
Step 6: Run the Spider
scrapy crawl products
What happens:
- Spider scrapes product pages
- For each product, yields ProductItem
- Pipeline saves product to database (or updates if exists)
- Spider scrapes reviews for that product
- Yields ReviewItem for each review
- Pipeline saves reviews (linked to product automatically!)
- Repeat for all products
No SQL written! All handled by ORM.
Querying Your Data
After scraping, query your data:
# query_data.py
from ecommerce.models import Product, Review, get_engine, get_session
# Create session
engine = get_engine('sqlite:///ecommerce.db')
session = get_session(engine)
print("=== Product Statistics ===")
print(f"Total products: {session.query(Product).count()}")
print(f"Total reviews: {session.query(Review).count()}")
# Get products with reviews
print("\n=== Products with Reviews ===")
products_with_reviews = session.query(Product).filter(
Product.reviews.any()
).all()
for product in products_with_reviews[:5]:
print(f"\n{product.name} (${product.price})")
print(f" Reviews: {len(product.reviews)}")
# Calculate average rating
if product.reviews:
avg_rating = sum(r.rating for r in product.reviews) / len(product.reviews)
print(f" Average rating: {avg_rating:.1f}/5")
# Show first review
if product.reviews:
review = product.reviews[0]
print(f" Latest review: \"{review.title}\" by {review.author}")
# Find expensive products
print("\n=== Expensive Products (>$500) ===")
expensive = session.query(Product).filter(
Product.price > 500
).order_by(Product.price.desc()).limit(5)
for product in expensive:
print(f"{product.name}: ${product.price}")
# Find highly rated products
print("\n=== Highly Rated Products (4+ stars) ===")
from sqlalchemy import func
highly_rated = session.query(Product).join(Review).group_by(
Product.id
).having(
func.avg(Review.rating) >= 4
).all()
for product in highly_rated[:5]:
avg_rating = sum(r.rating for r in product.reviews) / len(product.reviews)
print(f"{product.name}: {avg_rating:.1f} stars ({len(product.reviews)} reviews)")
session.close()
The power of ORM:
- Access reviews:
product.reviews - Access product:
review.product - Relationships handled automatically!
Advanced: Batch Processing
For better performance, save items in batches:
# pipelines.py (improved version)
class BatchORMPipeline:
def __init__(self):
self.engine = get_engine('sqlite:///ecommerce.db')
create_tables(self.engine)
Session = sessionmaker(bind=self.engine)
self.session = Session()
# Batch storage
self.products_batch = []
self.reviews_batch = []
self.batch_size = 100
def process_item(self, item, spider):
if item.__class__.__name__ == 'ProductItem':
self.products_batch.append(item)
# Save batch when full
if len(self.products_batch) >= self.batch_size:
self.save_products_batch(spider)
elif item.__class__.__name__ == 'ReviewItem':
self.reviews_batch.append(item)
if len(self.reviews_batch) >= self.batch_size:
self.save_reviews_batch(spider)
return item
def save_products_batch(self, spider):
"""Save batch of products"""
try:
for item in self.products_batch:
# Check if exists
existing = self.session.query(Product).filter(
Product.url == item['url']
).first()
if existing:
# Update
for key, value in item.items():
setattr(existing, key, value)
else:
# Create
product = Product(**item)
self.session.add(product)
self.session.commit()
spider.logger.info(f"Saved batch of {len(self.products_batch)} products")
# Clear batch
self.products_batch = []
except Exception as e:
spider.logger.error(f"Error saving batch: {e}")
self.session.rollback()
self.products_batch = []
def save_reviews_batch(self, spider):
"""Save batch of reviews"""
try:
for item in self.reviews_batch:
# Find product
product = self.session.query(Product).filter(
Product.url == item['product_url']
).first()
if product:
review = Review(
product_id=product.id,
rating=item.get('rating'),
title=item.get('title'),
text=item.get('text'),
author=item.get('author')
)
self.session.add(review)
self.session.commit()
spider.logger.info(f"Saved batch of {len(self.reviews_batch)} reviews")
self.reviews_batch = []
except Exception as e:
spider.logger.error(f"Error saving batch: {e}")
self.session.rollback()
self.reviews_batch = []
def close_spider(self, spider):
"""Save remaining items"""
if self.products_batch:
self.save_products_batch(spider)
if self.reviews_batch:
self.save_reviews_batch(spider)
# Statistics
total_products = self.session.query(Product).count()
total_reviews = self.session.query(Review).count()
spider.logger.info(f"Final: {total_products} products, {total_reviews} reviews")
self.session.close()
Batch processing benefits:
- Faster (fewer commits)
- More efficient (fewer database operations)
- Better for large scraping jobs
Using PostgreSQL Instead of SQLite
For production, use PostgreSQL:
Step 1: Install PostgreSQL Driver
pip install psycopg2-binary
Step 2: Update models.py
# ecommerce/models.py
def get_engine(database_url=None):
"""Create database engine"""
if database_url is None:
# Use PostgreSQL in production
database_url = 'postgresql://username:password@localhost/ecommerce_db'
return create_engine(database_url, echo=False, pool_size=10, max_overflow=20)
Step 3: Create PostgreSQL Database
# In PostgreSQL
psql -U postgres
CREATE DATABASE ecommerce_db;
CREATE USER scrapy_user WITH PASSWORD 'your_password';
GRANT ALL PRIVILEGES ON DATABASE ecommerce_db TO scrapy_user;
Step 4: Update settings.py
# ecommerce/settings.py
# Database URL
DATABASE_URL = 'postgresql://scrapy_user:your_password@localhost/ecommerce_db'
Step 5: Pass URL to Pipeline
# pipelines.py
class ORMPipeline:
def __init__(self):
from ecommerce.settings import DATABASE_URL
self.engine = get_engine(DATABASE_URL)
# Rest of code...
That's it! Now using PostgreSQL with ORM.
Handling Multiple Spiders
For distributed crawling, use connection pooling:
# pipelines.py
from sqlalchemy.pool import QueuePool
class PooledORMPipeline:
# Class-level engine (shared across spiders)
_engine = None
@classmethod
def from_crawler(cls, crawler):
"""Create pipeline with shared engine"""
if cls._engine is None:
cls._engine = create_engine(
'postgresql://user:pass@localhost/db',
poolclass=QueuePool,
pool_size=10,
max_overflow=20
)
create_tables(cls._engine)
return cls()
def __init__(self):
"""Create session from shared engine"""
Session = sessionmaker(bind=self._engine)
self.session = Session()
def process_item(self, item, spider):
# Same as before
pass
def close_spider(self, spider):
self.session.close()
# Don't close engine (shared across spiders)
Benefits:
- Multiple spiders share connection pool
- Better performance
- Handles concurrent access properly
Common Issues and Solutions
Issue 1: "Table already exists"
Error:
sqlalchemy.exc.OperationalError: table products already exists
Solution:
Use create_tables() only once, or check if tables exist:
def create_tables(engine):
Base.metadata.create_all(engine, checkfirst=True)
Issue 2: Session Errors
Error:
Session is closed
Solution:
Always create new session for each spider:
def __init__(self):
Session = sessionmaker(bind=self.engine)
self.session = Session() # New session
Issue 3: Duplicate Key Errors
Error:
UNIQUE constraint failed: products.url
Solution:
Check if product exists before inserting:
existing = self.session.query(Product).filter(
Product.url == item['url']
).first()
if existing:
# Update instead of insert
pass
Issue 4: Relationship Not Working
Problem:
product.reviews returns empty list even though reviews exist.
Solution:
Make sure product_id is set correctly:
review = Review(
product_id=product.id, # Must be set!
rating=5
)
Best Practices
1. Always Use Transactions
try:
# Database operations
self.session.add(product)
self.session.commit()
except:
self.session.rollback()
raise
2. Close Sessions Properly
def close_spider(self, spider):
self.session.close()
3. Handle None Values
product = Product(
name=item.get('name', 'Unknown'), # Default value
price=item.get('price', 0.0)
)
4. Use Indexes
class Product(Base):
__tablename__ = 'products'
url = Column(String(500), unique=True, index=True) # Index!
category = Column(String(100), index=True) # Index!
5. Log Everything
spider.logger.info(f"Saved product: {product.name}")
spider.logger.error(f"Error: {e}")
Complete Working Example
Here's everything together:
models.py
from sqlalchemy import create_engine, Column, Integer, String, Decimal, Text, DateTime, ForeignKey, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from datetime import datetime
Base = declarative_base()
class Product(Base):
__tablename__ = 'products'
id = Column(Integer, primary_key=True)
name = Column(String(200), nullable=False)
price = Column(Decimal(10, 2))
url = Column(String(500), unique=True, nullable=False, index=True)
category = Column(String(100), index=True)
created_at = Column(DateTime, default=datetime.now)
reviews = relationship("Review", back_populates="product")
class Review(Base):
__tablename__ = 'reviews'
id = Column(Integer, primary_key=True)
product_id = Column(Integer, ForeignKey('products.id'))
rating = Column(Integer)
text = Column(Text)
product = relationship("Product", back_populates="reviews")
def get_engine(db_url='sqlite:///ecommerce.db'):
return create_engine(db_url)
def create_tables(engine):
Base.metadata.create_all(engine)
Run your spider:
scrapy crawl products
Query your data:
python query_data.py
Done!
Summary
What we learned:
- How to integrate ORM with Scrapy
- Define models for products and reviews
- Create ORM pipeline
- Handle relationships automatically
- Update existing records
- Batch processing for performance
Key code:
Models:
class Product(Base):
__tablename__ = 'products'
reviews = relationship("Review")
Pipeline:
def process_item(self, item, spider):
product = Product(**item)
self.session.add(product)
self.session.commit()
Query:
product = session.query(Product).get(1)
for review in product.reviews:
print(review.rating)
Benefits:
- No SQL needed
- Clean Python code
- Automatic relationships
- Easy updates
- Type safety
Remember:
- Define models clearly
- Handle duplicates (check before insert)
- Close sessions properly
- Use batch processing for large jobs
- PostgreSQL for production
You now have production-ready ORM integration with Scrapy!
Happy scraping! 🕷️
Top comments (0)