DEV Community

Pradip Panjiyar
Pradip Panjiyar

Posted on

How I Built a MongoDB Archiving System for Crawled Data

How I Built a MongoDB Archiving System for Crawled Data

The Problem: Data Chaos at Scale

Imagine scraping 100+ college websites daily. Notices get updated. Events disappear. New announcements pop up every hour. Your database becomes a graveyard of duplicates, outdated entries, and lost history.

That was my reality building CollegeBuzz — an AICTE academic news aggregator.

The challenge wasn't just collecting data. It was managing its lifecycle:

  • ❌ Overwrite everything? You lose historical context
  • ❌ Blindly insert? Hello, duplicate hell
  • ❌ Manual cleanup? Not scalable at 10,000+ records/day

I needed something smarter. Here's how I built an automated archiving system that keeps data fresh, preserves history, and stays performant at scale.


The Architecture: Active + Archive Pattern

Core Concept

Instead of one bloated collection, I split data into two purpose-built collections:

A simple block diagram showing an “Active + Archive” data pattern. On top is the Active Collection (user-facing, always fresh). An arrow points downward with the note “after 30 days or event passes” leading to the Archive Collection (historical records for analytics and audits).

Why this works:

  • Users query only fresh data (faster searches)
  • Archive grows unbounded without impacting performance
  • You can always restore or analyze historical trends

The Implementation: Smart Deduplication

Problem 1: How do you detect duplicates?

A notice titled "Admissions Open 2025" might appear on multiple pages, or get re-crawled daily. I needed a deterministic unique identifier.

Solution: Composite Unique Keys

Instead of MongoDB's _id, I use business logic to create unique identifiers:

def _create_unique_identifier(self, record):
    """
    Creates a composite key from title + URL
    Falls back to full record comparison if needed
    """
    unique_id = {}

    if 'title' in record and record['title']:
        unique_id['title'] = record['title']

    if 'url' in record and record['url']:
        unique_id['url'] = record['url']

    # If no title/URL, compare all fields except timestamps
    if not unique_id:
        unique_id = {
            k: v for k, v in record.items() 
            if k not in ['crawled_at', 'last_updated_at', '_id']
        }

    return unique_id
Enter fullscreen mode Exit fullscreen mode

Why not content hashing?
While SHA-256 hashing is elegant, I found composite keys more debuggable. If there's a collision, I can instantly see which title/URL caused it. Hashes hide this context.


The Magic: Timestamp Tracking

Every record carries two timestamps:

Field Purpose Example
crawled_at First time we ever saw this record 2025-01-08 10:00:00
last_updated_at Most recent time we re-crawled it 2025-10-03 14:30:00

The Critical Rule:

crawled_at NEVER changes. last_updated_at ALWAYS updates.

This creates a paper trail:

  • If crawled_at is 60 days old but last_updated_at is yesterday → the content is still live, just stable
  • If both are 60 days old → probably abandoned
# Preserve original crawl date, update last seen
if existing_record:
    original_crawled_at = existing_record['crawled_at']
else:
    original_crawled_at = datetime.now()

record['crawled_at'] = original_crawled_at  # Frozen in time
record['last_updated_at'] = datetime.now()  # Always fresh
Enter fullscreen mode Exit fullscreen mode

The Archiving Logic: Time-Based + Event-Based

Records get archived under two conditions:

1. Age-Based Archiving (30-day threshold)

def should_archive_by_age(record, threshold_days=30):
    """
    Archive if the record was FIRST CRAWLED more than 30 days ago
    """
    archive_threshold = datetime.now() - timedelta(days=threshold_days)

    return record['crawled_at'] < archive_threshold
Enter fullscreen mode Exit fullscreen mode

2. Event-Based Archiving

def should_archive_by_event_date(record):
    """
    Archive if the event date has passed
    Example: "Admission closes: 2025-09-15" → Auto-archive on Sept 16
    """
    event_date = parse_date(record.get('event_date'))

    if event_date and event_date < datetime.now().date():
        return True

    return False
Enter fullscreen mode Exit fullscreen mode

Combined logic:

should_archive = (
    should_archive_by_age(record, 30) or 
    should_archive_by_event_date(record)
)
Enter fullscreen mode Exit fullscreen mode

Real-World Example: Archiving Old News

Let's say you manually changed a record's crawled_at to January 8, 2025 (more than 30 days ago). Here's what happens:

from mongodb_handler import MongoDBHandler
from datetime import datetime, timedelta

db = MongoDBHandler()

# Current record in 'news' collection:
# {
#   "title": "New Campus Opened",
#   "url": "https://college.edu/news/123",
#   "crawled_at": "2025-01-08 10:00:00",  # 9+ months old!
#   "last_updated_at": "2025-10-01 08:00:00"
# }

# Run archiving
db.archive_old_records()

# Output:
# [news] Found 1 records to archive.
# Archiving: {'title': 'New Campus Opened', 'url': 'https://college.edu/news/123'}
#   - crawled_at: 2025-01-08 10:00:00
#   - last_updated_at: 2025-10-01 08:00:00
# ✅ Archived 1 record(s) from news
Enter fullscreen mode Exit fullscreen mode

What happened under the hood:

  1. MongoDB query finds records where crawled_at < (today - 30 days)
  2. Record is copied to news_archived collection
  3. Record is deleted from news collection
  4. All timestamps are preserved (this is crucial for analytics!)

Performance Optimization: Indexing Strategy

With 100K+ records, queries need to be fast. Here's my indexing setup:

# Create indexes on both collections
active_collection.create_index("crawled_at")
active_collection.create_index("last_updated_at")
active_collection.create_index([("title", 1), ("url", 1)], unique=True)

archived_collection.create_index("crawled_at")
archived_collection.create_index("last_updated_at")
Enter fullscreen mode Exit fullscreen mode

Why these indexes?

  • crawled_at → Fast archiving queries (WHERE crawled_at < threshold)
  • last_updated_at → Sort by freshness for user-facing queries
  • (title, url) composite → O(1) duplicate detection

Before indexing: Archive query took 4.2s on 50K records
After indexing: Same query in 120ms


Handling Edge Cases

Case 1: Date Strings vs DateTime Objects

Scraped dates come in wild formats:

  • "15 May 2024"
  • "2024-05-15"
  • "May 15, 2024"

My parser handles them all:

def _parse_date(self, date_str):
    """
    Robust date parsing with multiple format fallbacks
    """
    if not date_str or not isinstance(date_str, str):
        return None

    date_formats = [
        "%d %B %Y",     # "15 May 2024"
        "%Y-%m-%d",     # "2024-05-15"
        "%d-%m-%Y",     # "15-05-2024"
        "%B %d, %Y",    # "May 15, 2024"
    ]

    clean_date = re.sub(r'\s+', ' ', date_str.strip())

    for fmt in date_formats:
        try:
            return datetime.strptime(clean_date, fmt).date()
        except ValueError:
            continue

    return None
Enter fullscreen mode Exit fullscreen mode

Case 2: Records Without Timestamps

If an old record exists without crawled_at, the system handles it gracefully:

original_crawled_at = (
    existing_record.get('crawled_at') or 
    datetime.now()  # Default to now if missing
)
Enter fullscreen mode Exit fullscreen mode

Testing: Verify It Works

I built a self-test function to validate archiving:

def test_archive_functionality(self, collection_name="news"):
    """
    1. Insert a record with crawled_at = 31 days ago
    2. Run archive_old_records()
    3. Verify it moved to archive
    """
    old_date = datetime.now() - timedelta(days=31)
    test_record = {
        "title": f"Test Record {datetime.now().timestamp()}",
        "url": f"https://test.com/{datetime.now().timestamp()}",
        "crawled_at": old_date,
        "last_updated_at": old_date
    }

    # Insert to active collection
    test_collection = self.db[collection_name]
    result = test_collection.insert_one(test_record)

    # Run archiving
    self.archive_old_records()

    # Verify: Should be in archive, not in active
    archived = self.db[f"{collection_name}_archived"].find_one({
        "title": test_record["title"]
    })

    active = test_collection.find_one({"_id": result.inserted_id})

    assert archived is not None, "Record not in archive!"
    assert active is None, "Record still in active!"

    print("✅ Archive test passed!")
Enter fullscreen mode Exit fullscreen mode

Run it:

python mongodb_handler.py  # Runs built-in test
Enter fullscreen mode Exit fullscreen mode

Lessons Learned (The Hard Way)

1. Never Trust Scraper Consistency

Websites change formats overnight. Your archiving logic needs to be defensive:

  • Always validate dates before parsing
  • Use try-except blocks around timestamp operations
  • Log failed parses for manual review

2. Indexes Are Non-Negotiable

I initially skipped indexing ("we'll add it later"). BAD IDEA. Archiving 10K records took 45 minutes. After indexing: 2 minutes.

Rule: Add indexes BEFORE your first large crawl.

3. Separate Active from Archive Early

I tried using a single collection with an is_archived flag. Query complexity exploded:

# Nightmare query
active_records = collection.find({
    "is_archived": False,
    "crawled_at": {"$gt": threshold}
})
Enter fullscreen mode Exit fullscreen mode

With separate collections:

# Clean query
active_records = active_collection.find()
Enter fullscreen mode Exit fullscreen mode

4. Manual Archive Triggers Are Essential

Sometimes you need to force-archive records (e.g., after fixing date parsing bugs):

# Force archive everything older than 60 days
db.manually_archive_old_records(days_threshold=60)
Enter fullscreen mode Exit fullscreen mode

Real-World Impact: By The Numbers

After deploying this system:

Metric Before After
Active collection size 450K records 12K records
Average query time (user search) 3.2s 180ms
Duplicate notices ~15% of dataset 0%
Historical data available ❌ Lost on updates ✅ Full audit trail
Disk usage (with compression) 8.4GB 2.1GB (active) + 4.2GB (archive)

The hidden win: Being able to answer questions like:

  • "How long do admission notices usually stay live?"
  • "Which colleges update their news most frequently?"
  • "Can we restore that deleted event from March?"

The Complete Code

Here's the full archiving system (simplified for readability):

from pymongo import MongoClient
from datetime import datetime, timedelta

class MongoDBHandler:
    def __init__(self, db_name="AICTE_Scraper"):
        self.client = MongoClient("localhost", 27017)
        self.db = self.client[db_name]
        self.active_collections = ["news", "events", "admissions"]

        # Create archive collections
        for collection in self.active_collections:
            self.create_collection_if_not_exists(f"{collection}_archived")

    def insert_data(self, collection_name, records):
        """
        Insert with smart deduplication and auto-archiving
        """
        current_time = datetime.now()
        archive_threshold = current_time - timedelta(days=30)

        active = self.db[collection_name]
        archive = self.db[f"{collection_name}_archived"]

        for record in records:
            unique_id = self._create_unique_identifier(record)
            existing = active.find_one(unique_id)

            # Preserve original crawl date
            if existing and 'crawled_at' in existing:
                original_crawled_at = existing['crawled_at']
            else:
                original_crawled_at = current_time

            record['crawled_at'] = original_crawled_at
            record['last_updated_at'] = current_time

            # Check if should archive
            should_archive = original_crawled_at < archive_threshold

            if should_archive:
                if existing:
                    active.delete_one(unique_id)
                archive.update_one(unique_id, {"$set": record}, upsert=True)
            else:
                active.update_one(unique_id, {"$set": record}, upsert=True)

    def archive_old_records(self):
        """
        Move records older than 30 days to archive
        """
        threshold = datetime.now() - timedelta(days=30)

        for collection_name in self.active_collections:
            active = self.db[collection_name]
            archive = self.db[f"{collection_name}_archived"]

            old_records = list(active.find({"crawled_at": {"$lt": threshold}}))

            for record in old_records:
                record_id = record.pop('_id')
                unique_id = self._create_unique_identifier(record)

                archive.update_one(unique_id, {"$set": record}, upsert=True)
                active.delete_one({"_id": record_id})

            print(f"✅ Archived {len(old_records)} from {collection_name}")
Enter fullscreen mode Exit fullscreen mode

Full implementation with tests: GitHub Link


What's Next?

I'm working on:

  1. TTL-based archiving using MongoDB's built-in TTL indexes
  2. Incremental archiving (archive in batches to avoid blocking)
  3. Change detection (highlight what fields changed between versions)
  4. Archive compression (BSON → compressed JSON for long-term storage)

Closing Thoughts

Building this archiving system taught me that data management is harder than data collection.

If you're building any system that involves:

  • 🕷️ Web scraping
  • 📰 News aggregation
  • 📅 Event tracking
  • 📊 Time-series data

Think about archiving on Day 1, not Day 100.

Your future self (and your database) will thank you.


Resources

Found this helpful? Hit that ❤️ and follow for more web scraping deep dives!


This post is part of a series on building CollegeBuzz. Next up: "Handling 100+ Website Scrapers with Python's asyncio"

Top comments (0)