How I Built a MongoDB Archiving System for Crawled Data
The Problem: Data Chaos at Scale
Imagine scraping 100+ college websites daily. Notices get updated. Events disappear. New announcements pop up every hour. Your database becomes a graveyard of duplicates, outdated entries, and lost history.
That was my reality building CollegeBuzz — an AICTE academic news aggregator.
The challenge wasn't just collecting data. It was managing its lifecycle:
- ❌ Overwrite everything? You lose historical context
- ❌ Blindly insert? Hello, duplicate hell
- ❌ Manual cleanup? Not scalable at 10,000+ records/day
I needed something smarter. Here's how I built an automated archiving system that keeps data fresh, preserves history, and stays performant at scale.
The Architecture: Active + Archive Pattern
Core Concept
Instead of one bloated collection, I split data into two purpose-built collections:
Why this works:
- Users query only fresh data (faster searches)
- Archive grows unbounded without impacting performance
- You can always restore or analyze historical trends
The Implementation: Smart Deduplication
Problem 1: How do you detect duplicates?
A notice titled "Admissions Open 2025" might appear on multiple pages, or get re-crawled daily. I needed a deterministic unique identifier.
Solution: Composite Unique Keys
Instead of MongoDB's _id
, I use business logic to create unique identifiers:
def _create_unique_identifier(self, record):
"""
Creates a composite key from title + URL
Falls back to full record comparison if needed
"""
unique_id = {}
if 'title' in record and record['title']:
unique_id['title'] = record['title']
if 'url' in record and record['url']:
unique_id['url'] = record['url']
# If no title/URL, compare all fields except timestamps
if not unique_id:
unique_id = {
k: v for k, v in record.items()
if k not in ['crawled_at', 'last_updated_at', '_id']
}
return unique_id
Why not content hashing?
While SHA-256 hashing is elegant, I found composite keys more debuggable. If there's a collision, I can instantly see which title/URL caused it. Hashes hide this context.
The Magic: Timestamp Tracking
Every record carries two timestamps:
Field | Purpose | Example |
---|---|---|
crawled_at |
First time we ever saw this record | 2025-01-08 10:00:00 |
last_updated_at |
Most recent time we re-crawled it | 2025-10-03 14:30:00 |
The Critical Rule:
crawled_at
NEVER changes. last_updated_at
ALWAYS updates.
This creates a paper trail:
- If
crawled_at
is 60 days old butlast_updated_at
is yesterday → the content is still live, just stable - If both are 60 days old → probably abandoned
# Preserve original crawl date, update last seen
if existing_record:
original_crawled_at = existing_record['crawled_at']
else:
original_crawled_at = datetime.now()
record['crawled_at'] = original_crawled_at # Frozen in time
record['last_updated_at'] = datetime.now() # Always fresh
The Archiving Logic: Time-Based + Event-Based
Records get archived under two conditions:
1. Age-Based Archiving (30-day threshold)
def should_archive_by_age(record, threshold_days=30):
"""
Archive if the record was FIRST CRAWLED more than 30 days ago
"""
archive_threshold = datetime.now() - timedelta(days=threshold_days)
return record['crawled_at'] < archive_threshold
2. Event-Based Archiving
def should_archive_by_event_date(record):
"""
Archive if the event date has passed
Example: "Admission closes: 2025-09-15" → Auto-archive on Sept 16
"""
event_date = parse_date(record.get('event_date'))
if event_date and event_date < datetime.now().date():
return True
return False
Combined logic:
should_archive = (
should_archive_by_age(record, 30) or
should_archive_by_event_date(record)
)
Real-World Example: Archiving Old News
Let's say you manually changed a record's crawled_at
to January 8, 2025 (more than 30 days ago). Here's what happens:
from mongodb_handler import MongoDBHandler
from datetime import datetime, timedelta
db = MongoDBHandler()
# Current record in 'news' collection:
# {
# "title": "New Campus Opened",
# "url": "https://college.edu/news/123",
# "crawled_at": "2025-01-08 10:00:00", # 9+ months old!
# "last_updated_at": "2025-10-01 08:00:00"
# }
# Run archiving
db.archive_old_records()
# Output:
# [news] Found 1 records to archive.
# Archiving: {'title': 'New Campus Opened', 'url': 'https://college.edu/news/123'}
# - crawled_at: 2025-01-08 10:00:00
# - last_updated_at: 2025-10-01 08:00:00
# ✅ Archived 1 record(s) from news
What happened under the hood:
- MongoDB query finds records where
crawled_at < (today - 30 days)
- Record is copied to
news_archived
collection - Record is deleted from
news
collection - All timestamps are preserved (this is crucial for analytics!)
Performance Optimization: Indexing Strategy
With 100K+ records, queries need to be fast. Here's my indexing setup:
# Create indexes on both collections
active_collection.create_index("crawled_at")
active_collection.create_index("last_updated_at")
active_collection.create_index([("title", 1), ("url", 1)], unique=True)
archived_collection.create_index("crawled_at")
archived_collection.create_index("last_updated_at")
Why these indexes?
-
crawled_at
→ Fast archiving queries (WHERE crawled_at < threshold
) -
last_updated_at
→ Sort by freshness for user-facing queries -
(title, url)
composite → O(1) duplicate detection
Before indexing: Archive query took 4.2s on 50K records
After indexing: Same query in 120ms
Handling Edge Cases
Case 1: Date Strings vs DateTime Objects
Scraped dates come in wild formats:
"15 May 2024"
"2024-05-15"
"May 15, 2024"
My parser handles them all:
def _parse_date(self, date_str):
"""
Robust date parsing with multiple format fallbacks
"""
if not date_str or not isinstance(date_str, str):
return None
date_formats = [
"%d %B %Y", # "15 May 2024"
"%Y-%m-%d", # "2024-05-15"
"%d-%m-%Y", # "15-05-2024"
"%B %d, %Y", # "May 15, 2024"
]
clean_date = re.sub(r'\s+', ' ', date_str.strip())
for fmt in date_formats:
try:
return datetime.strptime(clean_date, fmt).date()
except ValueError:
continue
return None
Case 2: Records Without Timestamps
If an old record exists without crawled_at
, the system handles it gracefully:
original_crawled_at = (
existing_record.get('crawled_at') or
datetime.now() # Default to now if missing
)
Testing: Verify It Works
I built a self-test function to validate archiving:
def test_archive_functionality(self, collection_name="news"):
"""
1. Insert a record with crawled_at = 31 days ago
2. Run archive_old_records()
3. Verify it moved to archive
"""
old_date = datetime.now() - timedelta(days=31)
test_record = {
"title": f"Test Record {datetime.now().timestamp()}",
"url": f"https://test.com/{datetime.now().timestamp()}",
"crawled_at": old_date,
"last_updated_at": old_date
}
# Insert to active collection
test_collection = self.db[collection_name]
result = test_collection.insert_one(test_record)
# Run archiving
self.archive_old_records()
# Verify: Should be in archive, not in active
archived = self.db[f"{collection_name}_archived"].find_one({
"title": test_record["title"]
})
active = test_collection.find_one({"_id": result.inserted_id})
assert archived is not None, "Record not in archive!"
assert active is None, "Record still in active!"
print("✅ Archive test passed!")
Run it:
python mongodb_handler.py # Runs built-in test
Lessons Learned (The Hard Way)
1. Never Trust Scraper Consistency
Websites change formats overnight. Your archiving logic needs to be defensive:
- Always validate dates before parsing
- Use try-except blocks around timestamp operations
- Log failed parses for manual review
2. Indexes Are Non-Negotiable
I initially skipped indexing ("we'll add it later"). BAD IDEA. Archiving 10K records took 45 minutes. After indexing: 2 minutes.
Rule: Add indexes BEFORE your first large crawl.
3. Separate Active from Archive Early
I tried using a single collection with an is_archived
flag. Query complexity exploded:
# Nightmare query
active_records = collection.find({
"is_archived": False,
"crawled_at": {"$gt": threshold}
})
With separate collections:
# Clean query
active_records = active_collection.find()
4. Manual Archive Triggers Are Essential
Sometimes you need to force-archive records (e.g., after fixing date parsing bugs):
# Force archive everything older than 60 days
db.manually_archive_old_records(days_threshold=60)
Real-World Impact: By The Numbers
After deploying this system:
Metric | Before | After |
---|---|---|
Active collection size | 450K records | 12K records |
Average query time (user search) | 3.2s | 180ms |
Duplicate notices | ~15% of dataset | 0% |
Historical data available | ❌ Lost on updates | ✅ Full audit trail |
Disk usage (with compression) | 8.4GB | 2.1GB (active) + 4.2GB (archive) |
The hidden win: Being able to answer questions like:
- "How long do admission notices usually stay live?"
- "Which colleges update their news most frequently?"
- "Can we restore that deleted event from March?"
The Complete Code
Here's the full archiving system (simplified for readability):
from pymongo import MongoClient
from datetime import datetime, timedelta
class MongoDBHandler:
def __init__(self, db_name="AICTE_Scraper"):
self.client = MongoClient("localhost", 27017)
self.db = self.client[db_name]
self.active_collections = ["news", "events", "admissions"]
# Create archive collections
for collection in self.active_collections:
self.create_collection_if_not_exists(f"{collection}_archived")
def insert_data(self, collection_name, records):
"""
Insert with smart deduplication and auto-archiving
"""
current_time = datetime.now()
archive_threshold = current_time - timedelta(days=30)
active = self.db[collection_name]
archive = self.db[f"{collection_name}_archived"]
for record in records:
unique_id = self._create_unique_identifier(record)
existing = active.find_one(unique_id)
# Preserve original crawl date
if existing and 'crawled_at' in existing:
original_crawled_at = existing['crawled_at']
else:
original_crawled_at = current_time
record['crawled_at'] = original_crawled_at
record['last_updated_at'] = current_time
# Check if should archive
should_archive = original_crawled_at < archive_threshold
if should_archive:
if existing:
active.delete_one(unique_id)
archive.update_one(unique_id, {"$set": record}, upsert=True)
else:
active.update_one(unique_id, {"$set": record}, upsert=True)
def archive_old_records(self):
"""
Move records older than 30 days to archive
"""
threshold = datetime.now() - timedelta(days=30)
for collection_name in self.active_collections:
active = self.db[collection_name]
archive = self.db[f"{collection_name}_archived"]
old_records = list(active.find({"crawled_at": {"$lt": threshold}}))
for record in old_records:
record_id = record.pop('_id')
unique_id = self._create_unique_identifier(record)
archive.update_one(unique_id, {"$set": record}, upsert=True)
active.delete_one({"_id": record_id})
print(f"✅ Archived {len(old_records)} from {collection_name}")
Full implementation with tests: GitHub Link
What's Next?
I'm working on:
- TTL-based archiving using MongoDB's built-in TTL indexes
- Incremental archiving (archive in batches to avoid blocking)
- Change detection (highlight what fields changed between versions)
- Archive compression (BSON → compressed JSON for long-term storage)
Closing Thoughts
Building this archiving system taught me that data management is harder than data collection.
If you're building any system that involves:
- 🕷️ Web scraping
- 📰 News aggregation
- 📅 Event tracking
- 📊 Time-series data
Think about archiving on Day 1, not Day 100.
Your future self (and your database) will thank you.
Resources
- MongoDB Time-Series Collections
- 🔧 Full code on GitHub
- 💬 Questions? Drop a comment or DM me [@pradippanjiyar ]
Found this helpful? Hit that ❤️ and follow for more web scraping deep dives!
This post is part of a series on building CollegeBuzz. Next up: "Handling 100+ Website Scrapers with Python's asyncio"
Top comments (0)