DEV Community

Pradip Panjiyar
Pradip Panjiyar

Posted on

How I Built a MongoDB Archiving System for Crawled Data

How I Built a MongoDB Archiving System for Crawled Data

The Problem: Data Chaos at Scale

Imagine scraping 100+ college websites daily. Notices get updated. Events disappear. New announcements pop up every hour. Your database becomes a graveyard of duplicates, outdated entries, and lost history.

That was my reality building CollegeBuzz — an AICTE academic news aggregator.

The challenge wasn't just collecting data. It was managing its lifecycle:

  • ❌ Overwrite everything? You lose historical context
  • ❌ Blindly insert? Hello, duplicate hell
  • ❌ Manual cleanup? Not scalable at 10,000+ records/day

I needed something smarter. Here's how I built an automated archiving system that keeps data fresh, preserves history, and stays performant at scale.


The Architecture: Active + Archive Pattern

Core Concept

Instead of one bloated collection, I split data into two purpose-built collections:

A simple block diagram showing an “Active + Archive” data pattern. On top is the Active Collection (user-facing, always fresh). An arrow points downward with the note “after 30 days or event passes” leading to the Archive Collection (historical records for analytics and audits).

Why this works:

  • Users query only fresh data (faster searches)
  • Archive grows unbounded without impacting performance
  • You can always restore or analyze historical trends

The Implementation: Smart Deduplication

Problem 1: How do you detect duplicates?

A notice titled "Admissions Open 2025" might appear on multiple pages, or get re-crawled daily. I needed a deterministic unique identifier.

Solution: Composite Unique Keys

Instead of MongoDB's _id, I use business logic to create unique identifiers:

def _create_unique_identifier(self, record):
    """
    Creates a composite key from title + URL
    Falls back to full record comparison if needed
    """
    unique_id = {}

    if 'title' in record and record['title']:
        unique_id['title'] = record['title']

    if 'url' in record and record['url']:
        unique_id['url'] = record['url']

    # If no title/URL, compare all fields except timestamps
    if not unique_id:
        unique_id = {
            k: v for k, v in record.items() 
            if k not in ['crawled_at', 'last_updated_at', '_id']
        }

    return unique_id
Enter fullscreen mode Exit fullscreen mode

Why not content hashing?
While SHA-256 hashing is elegant, I found composite keys more debuggable. If there's a collision, I can instantly see which title/URL caused it. Hashes hide this context.


The Magic: Timestamp Tracking

Every record carries two timestamps:

Field Purpose Example
crawled_at First time we ever saw this record 2025-01-08 10:00:00
last_updated_at Most recent time we re-crawled it 2025-10-03 14:30:00

The Critical Rule:

crawled_at NEVER changes. last_updated_at ALWAYS updates.

This creates a paper trail:

  • If crawled_at is 60 days old but last_updated_at is yesterday → the content is still live, just stable
  • If both are 60 days old → probably abandoned
# Preserve original crawl date, update last seen
if existing_record:
    original_crawled_at = existing_record['crawled_at']
else:
    original_crawled_at = datetime.now()

record['crawled_at'] = original_crawled_at  # Frozen in time
record['last_updated_at'] = datetime.now()  # Always fresh
Enter fullscreen mode Exit fullscreen mode

The Archiving Logic: Time-Based + Event-Based

Records get archived under two conditions:

1. Age-Based Archiving (30-day threshold)

def should_archive_by_age(record, threshold_days=30):
    """
    Archive if the record was FIRST CRAWLED more than 30 days ago
    """
    archive_threshold = datetime.now() - timedelta(days=threshold_days)

    return record['crawled_at'] < archive_threshold
Enter fullscreen mode Exit fullscreen mode

2. Event-Based Archiving

def should_archive_by_event_date(record):
    """
    Archive if the event date has passed
    Example: "Admission closes: 2025-09-15" → Auto-archive on Sept 16
    """
    event_date = parse_date(record.get('event_date'))

    if event_date and event_date < datetime.now().date():
        return True

    return False
Enter fullscreen mode Exit fullscreen mode

Combined logic:

should_archive = (
    should_archive_by_age(record, 30) or 
    should_archive_by_event_date(record)
)
Enter fullscreen mode Exit fullscreen mode

Real-World Example: Archiving Old News

Let's say you manually changed a record's crawled_at to January 8, 2025 (more than 30 days ago). Here's what happens:

from mongodb_handler import MongoDBHandler
from datetime import datetime, timedelta

db = MongoDBHandler()

# Current record in 'news' collection:
# {
#   "title": "New Campus Opened",
#   "url": "https://college.edu/news/123",
#   "crawled_at": "2025-01-08 10:00:00",  # 9+ months old!
#   "last_updated_at": "2025-10-01 08:00:00"
# }

# Run archiving
db.archive_old_records()

# Output:
# [news] Found 1 records to archive.
# Archiving: {'title': 'New Campus Opened', 'url': 'https://college.edu/news/123'}
#   - crawled_at: 2025-01-08 10:00:00
#   - last_updated_at: 2025-10-01 08:00:00
# ✅ Archived 1 record(s) from news
Enter fullscreen mode Exit fullscreen mode

What happened under the hood:

  1. MongoDB query finds records where crawled_at < (today - 30 days)
  2. Record is copied to news_archived collection
  3. Record is deleted from news collection
  4. All timestamps are preserved (this is crucial for analytics!)

Performance Optimization: Indexing Strategy

With 100K+ records, queries need to be fast. Here's my indexing setup:

# Create indexes on both collections
active_collection.create_index("crawled_at")
active_collection.create_index("last_updated_at")
active_collection.create_index([("title", 1), ("url", 1)], unique=True)

archived_collection.create_index("crawled_at")
archived_collection.create_index("last_updated_at")
Enter fullscreen mode Exit fullscreen mode

Why these indexes?

  • crawled_at → Fast archiving queries (WHERE crawled_at < threshold)
  • last_updated_at → Sort by freshness for user-facing queries
  • (title, url) composite → O(1) duplicate detection

Before indexing: Archive query took 4.2s on 50K records
After indexing: Same query in 120ms


Handling Edge Cases

Case 1: Date Strings vs DateTime Objects

Scraped dates come in wild formats:

  • "15 May 2024"
  • "2024-05-15"
  • "May 15, 2024"

My parser handles them all:

def _parse_date(self, date_str):
    """
    Robust date parsing with multiple format fallbacks
    """
    if not date_str or not isinstance(date_str, str):
        return None

    date_formats = [
        "%d %B %Y",     # "15 May 2024"
        "%Y-%m-%d",     # "2024-05-15"
        "%d-%m-%Y",     # "15-05-2024"
        "%B %d, %Y",    # "May 15, 2024"
    ]

    clean_date = re.sub(r'\s+', ' ', date_str.strip())

    for fmt in date_formats:
        try:
            return datetime.strptime(clean_date, fmt).date()
        except ValueError:
            continue

    return None
Enter fullscreen mode Exit fullscreen mode

Case 2: Records Without Timestamps

If an old record exists without crawled_at, the system handles it gracefully:

original_crawled_at = (
    existing_record.get('crawled_at') or 
    datetime.now()  # Default to now if missing
)
Enter fullscreen mode Exit fullscreen mode

Testing: Verify It Works

I built a self-test function to validate archiving:

def test_archive_functionality(self, collection_name="news"):
    """
    1. Insert a record with crawled_at = 31 days ago
    2. Run archive_old_records()
    3. Verify it moved to archive
    """
    old_date = datetime.now() - timedelta(days=31)
    test_record = {
        "title": f"Test Record {datetime.now().timestamp()}",
        "url": f"https://test.com/{datetime.now().timestamp()}",
        "crawled_at": old_date,
        "last_updated_at": old_date
    }

    # Insert to active collection
    test_collection = self.db[collection_name]
    result = test_collection.insert_one(test_record)

    # Run archiving
    self.archive_old_records()

    # Verify: Should be in archive, not in active
    archived = self.db[f"{collection_name}_archived"].find_one({
        "title": test_record["title"]
    })

    active = test_collection.find_one({"_id": result.inserted_id})

    assert archived is not None, "Record not in archive!"
    assert active is None, "Record still in active!"

    print("✅ Archive test passed!")
Enter fullscreen mode Exit fullscreen mode

Run it:

python mongodb_handler.py  # Runs built-in test
Enter fullscreen mode Exit fullscreen mode

Lessons Learned (The Hard Way)

1. Never Trust Scraper Consistency

Websites change formats overnight. Your archiving logic needs to be defensive:

  • Always validate dates before parsing
  • Use try-except blocks around timestamp operations
  • Log failed parses for manual review

2. Indexes Are Non-Negotiable

I initially skipped indexing ("we'll add it later"). BAD IDEA. Archiving 10K records took 45 minutes. After indexing: 2 minutes.

Rule: Add indexes BEFORE your first large crawl.

3. Separate Active from Archive Early

I tried using a single collection with an is_archived flag. Query complexity exploded:

# Nightmare query
active_records = collection.find({
    "is_archived": False,
    "crawled_at": {"$gt": threshold}
})
Enter fullscreen mode Exit fullscreen mode

With separate collections:

# Clean query
active_records = active_collection.find()
Enter fullscreen mode Exit fullscreen mode

4. Manual Archive Triggers Are Essential

Sometimes you need to force-archive records (e.g., after fixing date parsing bugs):

# Force archive everything older than 60 days
db.manually_archive_old_records(days_threshold=60)
Enter fullscreen mode Exit fullscreen mode

Real-World Impact: By The Numbers

After deploying this system:

Metric Before After
Active collection size 450K records 12K records
Average query time (user search) 3.2s 180ms
Duplicate notices ~15% of dataset 0%
Historical data available ❌ Lost on updates ✅ Full audit trail
Disk usage (with compression) 8.4GB 2.1GB (active) + 4.2GB (archive)

The hidden win: Being able to answer questions like:

  • "How long do admission notices usually stay live?"
  • "Which colleges update their news most frequently?"
  • "Can we restore that deleted event from March?"

The Complete Code

Here's the full archiving system (simplified for readability):

from pymongo import MongoClient
from datetime import datetime, timedelta

class MongoDBHandler:
    def __init__(self, db_name="AICTE_Scraper"):
        self.client = MongoClient("localhost", 27017)
        self.db = self.client[db_name]
        self.active_collections = ["news", "events", "admissions"]

        # Create archive collections
        for collection in self.active_collections:
            self.create_collection_if_not_exists(f"{collection}_archived")

    def insert_data(self, collection_name, records):
        """
        Insert with smart deduplication and auto-archiving
        """
        current_time = datetime.now()
        archive_threshold = current_time - timedelta(days=30)

        active = self.db[collection_name]
        archive = self.db[f"{collection_name}_archived"]

        for record in records:
            unique_id = self._create_unique_identifier(record)
            existing = active.find_one(unique_id)

            # Preserve original crawl date
            if existing and 'crawled_at' in existing:
                original_crawled_at = existing['crawled_at']
            else:
                original_crawled_at = current_time

            record['crawled_at'] = original_crawled_at
            record['last_updated_at'] = current_time

            # Check if should archive
            should_archive = original_crawled_at < archive_threshold

            if should_archive:
                if existing:
                    active.delete_one(unique_id)
                archive.update_one(unique_id, {"$set": record}, upsert=True)
            else:
                active.update_one(unique_id, {"$set": record}, upsert=True)

    def archive_old_records(self):
        """
        Move records older than 30 days to archive
        """
        threshold = datetime.now() - timedelta(days=30)

        for collection_name in self.active_collections:
            active = self.db[collection_name]
            archive = self.db[f"{collection_name}_archived"]

            old_records = list(active.find({"crawled_at": {"$lt": threshold}}))

            for record in old_records:
                record_id = record.pop('_id')
                unique_id = self._create_unique_identifier(record)

                archive.update_one(unique_id, {"$set": record}, upsert=True)
                active.delete_one({"_id": record_id})

            print(f"✅ Archived {len(old_records)} from {collection_name}")
Enter fullscreen mode Exit fullscreen mode

Full implementation with tests: GitHub Link


What's Next?

I'm working on:

  1. TTL-based archiving using MongoDB's built-in TTL indexes
  2. Incremental archiving (archive in batches to avoid blocking)
  3. Change detection (highlight what fields changed between versions)
  4. Archive compression (BSON → compressed JSON for long-term storage)

Closing Thoughts

Building this archiving system taught me that data management is harder than data collection.

If you're building any system that involves:

  • 🕷️ Web scraping
  • 📰 News aggregation
  • 📅 Event tracking
  • 📊 Time-series data

Think about archiving on Day 1, not Day 100.

Your future self (and your database) will thank you.


Resources

Found this helpful? Hit that ❤️ and follow for more web scraping deep dives!


This post is part of a series on building CollegeBuzz. Next up: "Handling 100+ Website Scrapers with Python's asyncio"

Top comments (2)

Collapse
 
onlineproxy profile image
OnlineProxy

In addition to the classic Active + Archive paradigm, I’ve taken multiple complementary data lifecycle patterns like time-partitioned sets, hot/warm/cold tiering, and hybrid configurations of databases with object storage. Range partitioning and partial indexing for relational backends like PostgreSQL move active data apart, and exports of data to Parquet on S3 make for economical long-haul analytics. TTL indexes suit volatile data just fine, yet deliberate archiving or soft deletes fit better when it comes to immutable, audited sets of data. I would usually employ schema versioning and layers of projections for field evolutions in archives. Generally speaking, one would want the hot path to remain lean and snappy, yet ensure that older data is queryable, compliant, and inexpensive to keep storing.

Collapse
 
pradippanjiyar profile image
Pradip Panjiyar

That’s a great perspective; I hadn’t considered combining S3 + Parquet exports for long-term analytics. My focus was mostly on MongoDB-level archiving, but your hybrid approach would definitely scale better for large datasets.