The first time I ran my spider twice, I ended up with duplicate data. Product "Widget A" appeared 5 times in my database. Same URL, same data, just duplicated.
I thought "easy, I'll just check if the URL exists before inserting." But that doesn't work when Scrapy processes thousands of items in parallel.
Then I tried storing URLs in memory. That worked until my spider crashed and I lost the entire set. When I restarted, I got duplicates again.
After dealing with millions of items, I've learned the right ways to handle duplicates. Let me show you all the approaches and when to use each.
The Problem: Why Duplicates Happen
Duplicates come from several sources:
1. Running Spider Multiple Times
- Yesterday's scrape + today's scrape = duplicates
- No memory of what you already scraped
2. Pagination Bugs
- Page 1 and Page 2 might show same item
- Website bug, not your fault
3. Same Item in Multiple Categories
- Product appears in "Electronics" and "Sale Items"
- Same product, different paths
4. URL Variations
https://example.com/product/123https://example.com/product/123?ref=homepage- Same product, different URLs
5. Rerunning After Crash
- Spider crashes halfway
- Restart duplicates first half
You need a strategy for each scenario.
Strategy 1: Drop Duplicates in Pipeline (Simple)
The simplest approach: check if item already exists before saving.
Basic Duplicate Filter
# pipelines.py
from scrapy.exceptions import DropItem
class DuplicatesPipeline:
def __init__(self):
self.urls_seen = set()
def process_item(self, item, spider):
url = item['url']
if url in self.urls_seen:
raise DropItem(f'Duplicate item found: {url}')
self.urls_seen.add(url)
return item
Enable it:
# settings.py
ITEM_PIPELINES = {
'myproject.pipelines.DuplicatesPipeline': 100, # Run first
'myproject.pipelines.SavePipeline': 300, # Save after dedup
}
What the Docs Don't Tell You
Problem 1: Lost on restart
If spider crashes, urls_seen is lost. When you restart, you get duplicates.
Problem 2: Memory usage
With millions of URLs, the set grows huge and uses lots of memory.
Problem 3: Multiple spiders
Each spider has its own urls_seen. No sharing between runs or spiders.
This works for:
- Single spider run
- Small number of items (< 100k)
- Development/testing
Strategy 2: Persistent Duplicate Filter (Better)
Save seen URLs to a file so they survive crashes:
# pipelines.py
from scrapy.exceptions import DropItem
import os
class PersistentDuplicatesPipeline:
def __init__(self):
self.urls_seen = set()
self.filename = 'seen_urls.txt'
def open_spider(self, spider):
# Load previously seen URLs
if os.path.exists(self.filename):
with open(self.filename, 'r') as f:
self.urls_seen = set(line.strip() for line in f)
spider.logger.info(f'Loaded {len(self.urls_seen)} seen URLs')
def close_spider(self, spider):
# Save URLs for next run
with open(self.filename, 'w') as f:
for url in self.urls_seen:
f.write(url + '\n')
spider.logger.info(f'Saved {len(self.urls_seen)} URLs')
def process_item(self, item, spider):
url = item['url']
if url in self.urls_seen:
raise DropItem(f'Duplicate: {url}')
self.urls_seen.add(url)
return item
Now URLs persist between runs!
Multi-Field Deduplication
Sometimes URL isn't unique. Use multiple fields:
class MultiFieldDuplicatesPipeline:
def __init__(self):
self.items_seen = set()
def process_item(self, item, spider):
# Create unique key from multiple fields
key = (item['name'], item['category'], item['price'])
if key in self.items_seen:
raise DropItem(f'Duplicate: {key}')
self.items_seen.add(key)
return item
Or hash the entire item:
import hashlib
import json
class HashDuplicatesPipeline:
def __init__(self):
self.hashes_seen = set()
def process_item(self, item, spider):
# Create hash of item content
item_dict = dict(item)
item_json = json.dumps(item_dict, sort_keys=True)
item_hash = hashlib.md5(item_json.encode()).hexdigest()
if item_hash in self.hashes_seen:
raise DropItem(f'Duplicate content: {item_hash}')
self.hashes_seen.add(item_hash)
return item
Strategy 3: Database-Level Deduplication (Production)
Let the database handle duplicates with UNIQUE constraints:
PostgreSQL with UPSERT
# pipelines.py
import psycopg2
class PostgresUpsertPipeline:
def open_spider(self, spider):
self.conn = psycopg2.connect(
host='localhost',
database='scrapy_db',
user='user',
password='pass'
)
self.cursor = self.conn.cursor()
# Create table with UNIQUE constraint
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id SERIAL PRIMARY KEY,
url TEXT UNIQUE, -- UNIQUE constraint
name TEXT,
price DECIMAL,
last_seen TIMESTAMP DEFAULT NOW()
)
''')
self.conn.commit()
def close_spider(self, spider):
self.conn.close()
def process_item(self, item, spider):
# UPSERT: Insert or update if exists
self.cursor.execute('''
INSERT INTO products (url, name, price, last_seen)
VALUES (%s, %s, %s, NOW())
ON CONFLICT (url)
DO UPDATE SET
name = EXCLUDED.name,
price = EXCLUDED.price,
last_seen = NOW()
''', (item['url'], item['name'], item['price']))
self.conn.commit()
return item
Benefits:
- Database guarantees uniqueness
- Survives crashes
- Updates existing items
- Shared across spider runs
- No memory overhead in Python
MongoDB with Upsert
import pymongo
class MongoUpsertPipeline:
def open_spider(self, spider):
self.client = pymongo.MongoClient('mongodb://localhost:27017')
self.db = self.client['scrapy_db']
self.collection = self.db[spider.name]
# Create unique index
self.collection.create_index('url', unique=True)
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
# Upsert: update if exists, insert if not
self.collection.update_one(
{'url': item['url']},
{'$set': dict(item)},
upsert=True
)
return item
Strategy 4: Scrapy's Built-In Duplicate Filter
Scrapy has built-in duplicate filtering for requests (not items):
# settings.py
DUPEFILTER_CLASS = 'scrapy.dupefilters.RFPDupeFilter' # Default
DUPEFILTER_DEBUG = True # Log duplicates
This prevents visiting the same URL twice in a single run.
How It Works
def parse(self, response):
# First time visiting this URL
yield scrapy.Request('https://example.com/page', callback=self.parse_page)
# This gets filtered (same URL)
yield scrapy.Request('https://example.com/page', callback=self.parse_page)
The second request is dropped automatically.
Persistent Request Filtering
Make it survive crashes:
# settings.py
DUPEFILTER_CLASS = 'scrapy.dupefilters.RFPDupeFilter'
JOBDIR = 'crawl_state' # Saves state
Now run:
scrapy crawl myspider
State saves to crawl_state/. If spider crashes, restart and it continues where it left off without duplicates.
What the Docs Don't Tell You
This only filters requests, not items!
Same URL might generate different items (e.g., product list with changing inventory). You still need item deduplication.
Strategy 5: Scrapy Deltafetch (Only New Items)
scrapy-deltafetch extension only scrapes pages that changed:
pip install scrapy-deltafetch
# settings.py
SPIDER_MIDDLEWARES = {
'scrapy_deltafetch.DeltaFetch': 100,
}
DELTAFETCH_ENABLED = True
How it works:
- First run: scrapes everything
- Subsequent runs: only scrapes changed pages
- Tracks page content hash
Perfect for monitoring price changes, stock updates, etc.
Strategy 6: Time-Based Deduplication
Only keep recent items, drop old duplicates:
from datetime import datetime, timedelta
class TimedDuplicatesPipeline:
def __init__(self):
self.items_with_time = {} # url: timestamp
self.max_age = timedelta(days=7)
def process_item(self, item, spider):
url = item['url']
now = datetime.now()
if url in self.items_with_time:
last_seen = self.items_with_time[url]
age = now - last_seen
if age < self.max_age:
# Seen recently, drop
raise DropItem(f'Duplicate (seen {age} ago): {url}')
# New or old enough to re-scrape
self.items_with_time[url] = now
return item
Useful when you want to periodically re-scrape items.
Strategy 7: Fingerprinting (Advanced)
Create unique fingerprints for complex deduplication:
import hashlib
from scrapy.utils.request import fingerprint
class FingerprintDuplicatesPipeline:
def __init__(self):
self.fingerprints = set()
def process_item(self, item, spider):
# Create fingerprint from important fields
fields = [
item.get('name', ''),
item.get('brand', ''),
item.get('sku', ''),
]
content = '|'.join(str(f) for f in fields)
fp = hashlib.sha1(content.encode()).hexdigest()
if fp in self.fingerprints:
raise DropItem(f'Duplicate fingerprint: {fp}')
self.fingerprints.add(fp)
return item
Combining Strategies
Use multiple approaches together:
# settings.py
ITEM_PIPELINES = {
'myproject.pipelines.DuplicatesPipeline': 100, # Memory check (fast)
'myproject.pipelines.DatabaseCheckPipeline': 200, # DB check (thorough)
'myproject.pipelines.SavePipeline': 300, # Save non-duplicates
}
Pipeline 1: Fast in-memory check for this run
Pipeline 2: Check database for previous runs
Pipeline 3: Save if it passed both checks
Handling URL Variations
Same product, different URLs. Normalize them:
from urllib.parse import urlparse, parse_qs, urlencode
def normalize_url(url):
# Remove tracking parameters
parsed = urlparse(url)
# Remove query params like ?ref=, ?utm_, etc.
query = parse_qs(parsed.query)
# Keep only important params
important_params = ['id', 'product', 'sku']
clean_query = {k: v for k, v in query.items() if k in important_params}
# Rebuild URL
clean_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
if clean_query:
clean_url += '?' + urlencode(clean_query, doseq=True)
return clean_url.rstrip('/')
class NormalizedDuplicatesPipeline:
def __init__(self):
self.urls_seen = set()
def process_item(self, item, spider):
url = normalize_url(item['url'])
if url in self.urls_seen:
raise DropItem(f'Duplicate (normalized): {url}')
self.urls_seen.add(url)
return item
Handles:
https://example.com/product/123?ref=homepagehttps://example.com/product/123?utm_source=googlehttps://example.com/product/123/
All become: https://example.com/product/123
Monitoring Duplicates
Track how many duplicates you're catching:
class DuplicatesStatsPipeline:
def __init__(self):
self.urls_seen = set()
self.duplicates_count = 0
self.items_count = 0
def process_item(self, item, spider):
url = item['url']
self.items_count += 1
if url in self.urls_seen:
self.duplicates_count += 1
spider.logger.warning(
f'Duplicate rate: {self.duplicates_count}/{self.items_count} '
f'({self.duplicates_count/self.items_count*100:.1f}%)'
)
raise DropItem(f'Duplicate: {url}')
self.urls_seen.add(url)
return item
def close_spider(self, spider):
spider.logger.info(
f'Total duplicates caught: {self.duplicates_count} '
f'out of {self.items_count} items'
)
Complete Production Example
Here's a production-ready duplicate handling pipeline:
# pipelines.py
from scrapy.exceptions import DropItem
import psycopg2
from urllib.parse import urlparse, parse_qs
import hashlib
import os
class ProductionDuplicatesPipeline:
def __init__(self):
# In-memory cache for fast checking
self.memory_cache = set()
# Persistent cache file
self.cache_file = 'seen_urls.txt'
# Stats
self.duplicates = 0
self.new_items = 0
def open_spider(self, spider):
# Load cache from file
if os.path.exists(self.cache_file):
with open(self.cache_file, 'r') as f:
self.memory_cache = set(line.strip() for line in f)
spider.logger.info(f'Loaded {len(self.memory_cache)} cached URLs')
# Connect to database
self.conn = psycopg2.connect(
host='localhost',
database='scrapy_db'
)
self.cursor = self.conn.cursor()
def close_spider(self, spider):
# Save cache
with open(self.cache_file, 'w') as f:
for url in self.memory_cache:
f.write(url + '\n')
# Close database
self.conn.close()
# Log stats
spider.logger.info(f'New items: {self.new_items}')
spider.logger.info(f'Duplicates: {self.duplicates}')
def process_item(self, item, spider):
# Normalize URL
url = self.normalize_url(item['url'])
# Check memory cache first (fast)
if url in self.memory_cache:
self.duplicates += 1
raise DropItem(f'Duplicate (cache): {url}')
# Check database (slower but thorough)
self.cursor.execute(
'SELECT 1 FROM products WHERE url = %s',
(url,)
)
if self.cursor.fetchone():
self.duplicates += 1
self.memory_cache.add(url) # Cache for future checks
raise DropItem(f'Duplicate (database): {url}')
# New item!
self.new_items += 1
self.memory_cache.add(url)
item['url'] = url # Use normalized URL
return item
def normalize_url(self, url):
# Remove tracking parameters
parsed = urlparse(url)
query = parse_qs(parsed.query)
# Keep only important params
important = ['id', 'product', 'sku']
clean_query = {k: v for k, v in query.items() if k in important}
# Rebuild
clean_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
if clean_query:
from urllib.parse import urlencode
clean_url += '?' + urlencode(clean_query, doseq=True)
return clean_url.rstrip('/')
This pipeline:
- Checks memory first (fast)
- Checks database if not in memory (thorough)
- Normalizes URLs
- Persists cache across runs
- Tracks statistics
Quick Reference
Simple In-Memory
class DuplicatesPipeline:
def __init__(self):
self.seen = set()
def process_item(self, item, spider):
if item['url'] in self.seen:
raise DropItem('Duplicate')
self.seen.add(item['url'])
return item
Persistent File-Based
def open_spider(self, spider):
with open('seen.txt', 'r') as f:
self.seen = set(line.strip() for line in f)
def close_spider(self, spider):
with open('seen.txt', 'w') as f:
for url in self.seen:
f.write(url + '\n')
Database UPSERT
self.cursor.execute('''
INSERT INTO products (url, name, price)
VALUES (%s, %s, %s)
ON CONFLICT (url) DO UPDATE
SET name = EXCLUDED.name, price = EXCLUDED.price
''', (url, name, price))
Summary
For Small Projects:
- In-memory set with
urls_seen - Simple and fast
For Production:
- Database with UNIQUE constraints
- Persistent across runs and crashes
For Monitoring:
- Time-based deduplication
- Deltafetch for change detection
Best Practices:
- Normalize URLs before checking
- Use database UPSERT for production
- Cache in memory for speed
- Track duplicate statistics
- Persist cache across runs
Start simple, upgrade as needed. The right approach depends on your scale and requirements.
Happy scraping! 🕷️
Top comments (0)