DEV Community

HarshKumar Jha
HarshKumar Jha

Posted on

From Weeks to 15 Minutes: How We Built a Data Migration System That Changed Everything - Part 2

Part 2: The Architecture That Actually Works (Not Just Survives)

Link from Part 1: https://dev.to/mr_harshkumar_jha/from-weeks-to-15-minutes-how-we-built-a-data-migration-system-that-changed-everything-9gg


The Problem We Had (And You Probably Have Too)

Remember when I said our system crashed at 200K records?

That wasn't a random failure. It was architecture screaming for help.

Our original code looked "fine" on paper. But when you feed it a million records, it doesn't just slow down—it breaks in weird ways.

  • Database connections maxed out (too many tiny queries)
  • Memory kept growing (loading everything at once)
  • CPU pegged at 100% (no async processing)
  • Redis filled up (no compression)
  • No way to recover (crashes meant starting over)
  • No monitoring (couldn't see what was breaking)

We'd look at the logs and think: "Which part is actually broken?"

Spoiler: It wasn't the code. It was the thinking behind the code.

We were solving yesterday's problems (50K records) with yesterday's architecture.


The Breakthrough: Stop Thinking Sequential

Here's the thing nobody tells you:

When you scale 10x, you can't just optimize code. You have to rethink architecture.

Optimization gets you 10-20% improvement. Architecture gets you 100x.

We realized: Our system wasn't designed for scale. It was designed for convenience.

So we asked five questions:

  1. What if we never loaded all data into memory at once?
  2. What if we queried the database only once, not a million times?
  3. What if we processed everything asynchronously, not blocking the user?
  4. What if the system could recover from any failure automatically?
  5. What if we could see exactly what's happening in real-time?

The answer to all five: A completely different system.


The Five-Layer Architecture (Simplified)

Imagine moving houses. You don't carry everything at once, right?

Bad way: Load all boxes, drive to new house, unload all at once.

  • Your truck breaks (too heavy)
  • You get tired (single person)
  • New house overwhelmed (can't fit everything)
  • If truck breaks, you've lost everything
  • No idea how much is left to move

Good way: Load smart, drive with helpers, unload strategically, track everything.

  • Small trips per person
  • Multiple helpers work in parallel
  • New house gets organized flow
  • If one trip fails, others continue
  • Always know: "23 of 50 boxes done"

Our system works the same way:

┌─────────────────────────────────────────┐
│         USER UPLOADS BIG FILE           │
└──────────────────┬──────────────────────┘
                   │
        ┌──────────┴──────────┐
        │                     │
   ┌────▼────┐          ┌────▼─────┐
   │  AWS S3 │          │PostgreSQL│
   │ (Inbox) │          │(Metadata)│
   └────┬────┘          └────┬─────┘
        │                    │
        └─────────┬──────────┘
                  │
         ┌────────▼────────┐
         │     Redis       │
         │ (Staging Area)  │
         │  - Raw data     │
         │  - Transformed  │
         │  - Checkpoints  │ ← NEW: Recovery points
         └────────┬────────┘
                  │
         ┌────────▼────────┐
         │     Celery      │
         │   (Workers)     │
         │  - 5-8 at once  │
         │  - Auto-scale   │ ← NEW: Grows/shrinks
         └────────┬────────┘
                  │
         ┌────────▼────────┐
         │ Monitoring      │ ← NEW: Real-time dashboard
         │ - Progress      │
         │ - Alerts        │
         │ - Health        │
         └────────┬────────┘
                  │
         ┌────────▼────────┐
         │  Live Database  │
         │  (Final Dest)   │
         └─────────────────┘
Enter fullscreen mode Exit fullscreen mode

Why this works:

  • S3: Infinitely scalable (file sits there, doesn't hurt)
  • PostgreSQL: Stores only metadata (tiny, never gets hammered)
  • Redis: Fast temp storage + checkpoints for recovery
  • Celery: Parallel workers (8 people working, not 1)
  • Monitoring: See everything, catch problems early
  • Live DB: Only gets clean, validated data

Each layer does ONE thing. Does it well.


Layer 1: The S3 Inbox (Never Crashes)

When someone uploads a 184MB file, we don't process it immediately.

We put it in S3. Done.

# That's literally it
file.save_to_s3(bucket='migrations', timeout=1_hour)
Enter fullscreen mode Exit fullscreen mode

Why this matters:

Your server isn't tied up. If the file is 1MB or 1GB, S3 handles it. You just point and move on.

Compare this to:

# Old way (BAD)
df = pd.read_csv(file)  # File must fit in memory. Dies at 184MB on small server.
Enter fullscreen mode Exit fullscreen mode

Layer 2: The Intelligence Layer (One Query, Infinite Uses)

This is where most teams fail.

The typical mistake:

for row in million_rows:
    # Check if consumer exists
    consumer = Consumer.objects.get(consumer_no=row['consumer_no'])
Enter fullscreen mode Exit fullscreen mode

What happens:

  • Row 1: Query database (50ms)
  • Row 2: Query database (50ms)
  • ...
  • Row 1,000,000: Query database (50ms)

Total time: 1,000,000 × 50ms = 13.8 hours

And your database dies at row 50,000 anyway.

We do it differently:

class SmartCache:
    """
    Load valid values ONCE. Reuse forever.
    """

    def __init__(self):
        self.valid_consumers = None  # Will cache here
        self.valid_meters = None
        self.valid_plans = None
        self.valid_territories = None

    def preload_all_relationships_once(self):
        """
        Query database for ALL relationships upfront
        This is the magic that prevents N+1 queries
        """
        print("🔄 Preloading all relationships...")

        # Query 1: Load ALL consumers (one query)
        self.valid_consumers = set(
            Consumer.objects.values_list('consumer_no', flat=True)
        )
        print(f"✅ Loaded {len(self.valid_consumers)} consumers")

        # Query 2: Load ALL meters (one query)
        self.valid_meters = set(
            Meter.objects.values_list('meter_number', flat=True)
        )
        print(f"✅ Loaded {len(self.valid_meters)} meters")

        # Query 3: Load ALL plans (one query)
        self.valid_plans = {
            plan.name: plan.id
            for plan in Plan.objects.all()
        }
        print(f"✅ Loaded {len(self.valid_plans)} plans")

        # Query 4: Load ALL territories (one query)
        self.valid_territories = {
            t.code: t.id
            for t in Territory.objects.all()
        }
        print(f"✅ Loaded {len(self.valid_territories)} territories")

        print("✅ Preloading complete. Ready to validate 1M rows.")

    def is_consumer_valid(self, consumer_no):
        """
        Check instantly (zero database queries)
        O(1) lookup in set
        """
        return consumer_no in self.valid_consumers

    def is_meter_valid(self, meter_number):
        """O(1) lookup"""
        return meter_number in self.valid_meters

    def validate_all_relationships(self, rows):
        """
        Validate 1 million rows with ZERO database queries
        """
        errors = []

        for idx, row in enumerate(rows):
            # Check consumer exists (in-memory, instant)
            if not self.is_consumer_valid(row['consumer_no']):
                errors.append({
                    "row": idx + 1,
                    "field": "consumer_no",
                    "value": row['consumer_no'],
                    "error": "Consumer not found in system"
                })

            # Check meter exists (in-memory, instant)
            if not self.is_meter_valid(row['meter_number']):
                errors.append({
                    "row": idx + 1,
                    "field": "meter_number",
                    "value": row['meter_number'],
                    "error": "Meter not found in system"
                })

        return errors
Enter fullscreen mode Exit fullscreen mode

What happens now:

  • Load all relationships: 4 queries (5 seconds total)
  • Validate 1,000,000 rows: Zero queries (12 seconds, all in-memory)

Impact:

Before: 1,000,000 queries × 50ms = 13.8 hours + database crash
After: 4 queries + 1M in-memory checks = 17 seconds total
Improvement: 2,929x faster
Enter fullscreen mode Exit fullscreen mode

The key insight: Your database is not a lookup service. It's a data store.

Query once. Validate a million times in memory.


Layer 3: Async Processing (The Game Changer)

Here's a question: When someone uploads a file, do they want to wait 25 minutes?

No.

So why would you make them?

# Old way (USER BLOCKED)
def migrate_data(file):
    raw_data = load_csv(file)
    transformed = transform_all(raw_data)  # 5 mins, user waits
    validated = validate_all(transformed)  # 5 mins, user waits
    final = insert_to_db(validated)        # 10 mins, user waits

    return success  # Finally!
Enter fullscreen mode Exit fullscreen mode

Better way (USER NOT BLOCKED):

def migrate_data(file):
    """
    User uploads file. Immediately gets response.
    Processing happens in background.
    """
    # Store file path
    job = create_migration_job(file)

    # Send to background queue (Celery)
    process_migration_async.delay(job.id)

    # User immediately gets: "Processing started!"
    return {"status": "queued", "job_id": job.id}

# Meanwhile, in background...
@celery_app.task
def process_migration_async(job_id):
    """
    This runs on a worker, not the web server
    """
    raw_data = load_csv(job.file)
    transformed = transform_all(raw_data)
    validated = validate_all(transformed)
    final = insert_to_db(validated)

    # Email user when done: "Your data is ready!"
Enter fullscreen mode Exit fullscreen mode

User experience:

  • User: "I'll upload this"
  • System: "Got it! Check back in 5 mins"
  • User: Goes back to work, gets email when ready
  • Everyone happy

System benefit:

  • Web server free to handle other requests
  • Processing happens on separate workers
  • Can scale workers independently
  • If one worker crashes, others keep going

The Magic: Celery Workers (Multiple Hands)

Imagine you have 1 person processing 1 million boxes.

They work 24 hours straight. Still takes 2 weeks.

Now imagine 8 people working together.

Same amount of work. 8x faster.

That's Celery.

# Start 5 workers (each can process a chunk)
celery -A myapp worker --concurrency=5

# Each worker processes independently
Worker 1: Processing rows 1-10,000
Worker 2: Processing rows 10,001-20,000
Worker 3: Processing rows 20,001-30,000
Worker 4: Processing rows 30,001-40,000
Worker 5: Processing rows 40,001-50,000

# All in parallel
Enter fullscreen mode Exit fullscreen mode

What makes it work:

# The worker knows exactly what to do
@app.task(bind=True, max_retries=3)
def process_chunk(self, chunk_index):
    """
    One worker, one chunk at a time
    Automatically retries on failure
    """
    try:
        # Get chunk from cache (already there)
        chunk = redis.get(f"chunk:{chunk_index}")

        # Transform it
        transformed = [transform_row(row) for row in chunk]

        # Validate it
        validated = [v for v in transformed if is_valid(v)]

        # Save it
        db.bulk_insert(validated)

        # Done! Next worker takes next chunk
        return {"rows_processed": len(validated)}

    except Exception as e:
        # Retry with exponential backoff
        raise self.retry(
            exc=e,
            countdown=2 ** self.request.retries  # 2s, 4s, 8s
        )
Enter fullscreen mode Exit fullscreen mode

Real numbers:

1 worker: 1,000,000 rows in 40 minutes
8 workers: 1,000,000 rows in 5 minutes

That's not a typo. 8x faster, roughly.
Enter fullscreen mode Exit fullscreen mode

The Secret Nobody Talks About: Dynamic Chunk Sizing

Here's where it gets smart.

You don't use fixed chunk sizes. That's dumb.

Why?

Because:

  • Small server (2GB RAM) + big chunks = CRASH
  • Large server (32GB RAM) + small chunks = WASTE
  • High CPU load + big chunks = OVERHEAT

We calculate chunk size on the fly:

import psutil

class AdaptiveChunker:
    """
    Dynamically adjust chunk size based on system health
    This is what prevents crashes at scale
    """

    def __init__(self):
        self.base_chunk_size = 5000
        self.min_chunk_size = 1000
        self.max_chunk_size = 10000

    def calculate_how_much_i_can_handle(self):
        """
        Ask system: "How much work can you do right now?"
        """
        memory = psutil.virtual_memory()
        cpu = psutil.cpu_percent(interval=0.1)

        # Get current metrics
        memory_percent = memory.percent
        available_memory_gb = memory.available / (1024**3)

        # Start with base size
        chunk_size = self.base_chunk_size

        # Rule 1: Low memory? Reduce chunk size
        if memory_percent > 80:  # Over 80% memory used
            chunk_size = self.min_chunk_size
            logger.warning(
                f"⚠️ Memory high ({memory_percent}%), "
                f"reducing chunk to {chunk_size} rows"
            )

        elif memory_percent > 60:  # Over 60% memory used
            chunk_size = int(self.base_chunk_size * 0.6)
            logger.info(f"Memory at {memory_percent}%, chunk={chunk_size}")

        # Rule 2: High CPU? Give it time to cool down
        if cpu > 85:  # CPU pegged
            chunk_size = min(chunk_size, 2000)
            logger.warning(
                f"⚠️ CPU high ({cpu}%), "
                f"reducing chunk to {chunk_size} rows"
            )

        # Rule 3: System healthy? Maximize throughput
        elif memory_percent < 40 and cpu < 50:
            chunk_size = self.max_chunk_size
            logger.info(f"✅ System healthy, chunk={chunk_size}")

        # Ensure within bounds
        return max(
            self.min_chunk_size,
            min(chunk_size, self.max_chunk_size)
        )

    def process_intelligently(self, session_id, total_rows):
        """
        Process with dynamic chunking
        """
        processed_rows = 0

        while processed_rows < total_rows:
            # Calculate optimal chunk size EVERY iteration
            chunk_size = self.calculate_how_much_i_can_handle()

            # Process this chunk
            chunk = get_chunk(session_id, processed_rows, chunk_size)
            result = process_chunk(chunk)

            processed_rows += len(chunk)

            # Log performance
            logger.info(
                f"✅ Processed {processed_rows}/{total_rows} "
                f"(chunk_size={chunk_size})"
            )
Enter fullscreen mode Exit fullscreen mode

What happens:

Small server struggles:
- Chunk 1: Memory 40%, CPU 45% → use 8,000 rows ✅
- Chunk 2: Memory 75%, CPU 60% → use 3,000 rows ✅
- Chunk 3: Memory 85%, CPU 80% → use 1,000 rows (careful!) ✅
- Chunk 4: Memory 60%, CPU 55% → use 5,000 rows ✅

Never crashes. Just adjusts on the fly.

Large server thrives:
- All chunks: Memory 20%, CPU 30% → use 10,000 rows ✅
- Finishes 3x faster than small server
- Both complete successfully
Enter fullscreen mode Exit fullscreen mode

Layer 4: The Checkpoint System (Never Lose Progress)

Here's a nightmare scenario:

You're migrating 2 million records.

At row 1,847,392, your network fails.

Old system: Start from scratch. Waste 40 minutes.

Our system: Resume from row 1,840,000. Continue in 2 minutes.

How?

class CheckpointManager:
    """
    Save progress every 10,000 rows
    Resume from exact failure point
    """

    def __init__(self, redis_client):
        self.redis = redis_client
        self.checkpoint_interval = 10000  # Save every 10K rows

    def should_create_checkpoint(self, processed_rows):
        """
        Should we save progress now?
        """
        return processed_rows % self.checkpoint_interval == 0

    def create_checkpoint(self, session_id, processed_rows, state):
        """
        Save current progress
        """
        checkpoint = {
            "session_id": session_id,
            "processed_rows": processed_rows,
            "timestamp": datetime.utcnow().isoformat(),
            "state": state,
            "can_resume": True,
            "chunk_size_at_checkpoint": state.get('current_chunk_size'),
            "workers_active": state.get('active_workers'),
        }

        key = f"session:{session_id}:checkpoint"

        # Store in Redis with 2-hour expiry
        self.redis.setex(
            key,
            7200,  # 2 hours
            json.dumps(checkpoint)
        )

        logger.info(
            f"✅ Checkpoint saved: {processed_rows:,} rows completed"
        )

    def get_last_checkpoint(self, session_id):
        """
        Get last saved checkpoint
        """
        key = f"session:{session_id}:checkpoint"
        checkpoint_data = self.redis.get(key)

        if not checkpoint_data:
            return None

        return json.loads(checkpoint_data)

    def resume_from_checkpoint(self, session_id):
        """
        Resume migration from last successful checkpoint
        """
        checkpoint = self.get_last_checkpoint(session_id)

        if not checkpoint:
            logger.info("No checkpoint found, starting from beginning")
            return 0

        processed_rows = checkpoint['processed_rows']

        logger.info(
            f"🔄 Resuming from checkpoint: "
            f"row {processed_rows:,} "
            f"(saved at {checkpoint['timestamp']})"
        )

        return processed_rows

# Usage in migration
def process_with_checkpoints(session_id, total_rows):
    """
    Process with automatic checkpoints
    """
    checkpoint_mgr = CheckpointManager(redis_client)

    # Try to resume from checkpoint
    start_row = checkpoint_mgr.resume_from_checkpoint(session_id)

    processed_rows = start_row

    while processed_rows < total_rows:
        # Process chunk
        chunk = get_chunk(session_id, processed_rows, 5000)
        process_chunk(chunk)

        processed_rows += len(chunk)

        # Save checkpoint every 10K rows
        if checkpoint_mgr.should_create_checkpoint(processed_rows):
            state = {
                'current_chunk_size': len(chunk),
                'active_workers': get_active_worker_count(),
            }
            checkpoint_mgr.create_checkpoint(
                session_id,
                processed_rows,
                state
            )
Enter fullscreen mode Exit fullscreen mode

Real-World Scenario:

Migration starts: 2,000,000 rows to process

10,000 rows ✅ → Checkpoint saved
20,000 rows ✅ → Checkpoint saved
...
800,000 rows ✅ → Checkpoint saved
810,000 rows ✅ → Checkpoint saved

💥 NETWORK FAILURE at row 847,392

System detects failure, checks checkpoint:
  Last checkpoint: 840,000 rows
  Rows lost: 7,392 (less than 1 chunk)

System resumes:
  Starting from: 840,000
  Remaining: 1,160,000 rows
  Time to complete: 7 minutes

VS without checkpoints:
  Starting from: 0
  Remaining: 2,000,000 rows
  Time to complete: 42 minutes wasted
Enter fullscreen mode Exit fullscreen mode

The economics:

Without checkpoints:
- Network blip at 90% complete = Start over
- Power outage at row 1.8M = Start over
- Worker crash at any point = Start over

With checkpoints:
- Network blip = Resume in seconds
- Power outage = Resume from last 10K
- Worker crash = Another worker picks up
Enter fullscreen mode Exit fullscreen mode

Layer 5: Real-Time Monitoring (See Everything)

You can't fix what you can't see.

The Dashboard Users See:

class MigrationMetrics:
    """
    Track everything in real-time
    Users see this dashboard while migration runs
    """

    def __init__(self, session_id):
        self.session_id = session_id
        self.redis = get_redis_connection()

    def track_progress(self):
        """
        Calculate and cache real-time metrics
        """
        session = MigrationSession.objects.get(id=self.session_id)

        # Calculate all metrics
        elapsed_time = (datetime.utcnow() - session.started_at).total_seconds()

        metrics = {
            "session_id": self.session_id,
            "status": session.status,

            # Progress metrics
            "total_rows": session.total_rows,
            "processed_rows": session.processed_rows,
            "progress_percent": round(
                (session.processed_rows / session.total_rows) * 100, 2
            ),

            # Performance metrics
            "elapsed_time": elapsed_time,
            "elapsed_formatted": self._format_duration(elapsed_time),
            "rows_per_second": self._calculate_throughput(session),
            "estimated_time_remaining": self._calculate_eta(session),


            # Quality metrics
            "validation_errors": session.error_count,
            "success_rate": round(
                ((session.processed_rows - session.error_count) / 
                 session.processed_rows * 100) if session.processed_rows > 0 else 0,
                2
            ),

            # Last activity
            "last_activity": session.last_activity.isoformat(),
            "last_checkpoint": session.last_checkpoint_at,
        }

        # Cache for dashboard (1 minute TTL)
        self.redis.setex(
            f"session:{self.session_id}:metrics",
            60,
            json.dumps(metrics)
        )

        return metrics

    def _calculate_throughput(self, session):
        """Calculate current processing speed"""
        if session.processed_rows == 0:
            return 0

        elapsed = (datetime.utcnow() - session.started_at).total_seconds()
        return round(session.processed_rows / elapsed) if elapsed > 0 else 0

    def _calculate_eta(self, session):
        """Estimate time remaining"""
        throughput = self._calculate_throughput(session)

        if throughput == 0:
            return None

        remaining_rows = session.total_rows - session.processed_rows
        seconds_remaining = remaining_rows / throughput

        return {
            "seconds": int(seconds_remaining),
            "formatted": self._format_duration(seconds_remaining)
        }

    def _format_duration(self, seconds):
        """Human-readable duration"""
        if seconds < 60:
            return f"{int(seconds)}s"
        elif seconds < 3600:
            minutes = int(seconds / 60)
            secs = int(seconds % 60)
            return f"{minutes}m {secs}s"
        else:
            hours = int(seconds / 3600)
            minutes = int((seconds % 3600) / 60)
            return f"{hours}h {minutes}m"

    def _get_active_worker_count(self):
        """Count active Celery workers"""
        from celery import current_app
        inspect = current_app.control.inspect()
        stats = inspect.stats()
        return len(stats) if stats else 0
Enter fullscreen mode Exit fullscreen mode

What the user sees (live updating dashboard):

┌─────────────────────────────────────────────────────────────┐
│  MIGRATION SESSION: abc-123-def                             │
│  Data Type: Consumers                                       │
│  Status: Processing ●                                       │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────┬───────────────────────────────────────┐
│  Progress           │  Performance                          │
├─────────────────────┼───────────────────────────────────────┤
│  1,247,892          │  Throughput: 2,847 rows/sec           │
│  ████████████░░░░░  │  Elapsed: 7m 18s                      │
│  62.4% complete     │  ETA: 4m 42s                          │
│                     │                                       │
│  Errors: 31,247     │  Active Workers: 8                    │
│  Success: 98.5%     │  Memory: 156MB / 8GB (2%)             │ 
└─────────────────────┴───────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│  Recent Activity (Live)                                     │
├─────────────────────────────────────────────────────────────┤
│  ✅ Chunk 247 complete: 5,000 rows (1.2s)                   │
│  ✅ Chunk 248 complete: 5,000 rows (1.1s)                   │
│  💾 Checkpoint saved at 1,250,000 rows                      │
│  ✅ Chunk 249 complete: 5,000 rows (1.3s)                   │
│  ⚠️  Validation error: Row 1,247,892 (invalid email)        │
│  ✅ Chunk 250 complete: 5,000 rows (1.2s)                   │
└─────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Why this matters:

  1. User confidence - "I can see it's actually working"
  2. Problem detection - "Why did throughput drop?"
  3. Resource awareness - "Should I scale up workers?"
  4. Time planning - "I'll grab coffee, 5 mins left"

Layer 7: Three-Layer Validation (Catch Errors Early)

Most systems validate one way. We validate three.

Layer 1: Does the file make sense?

required_fields = ['consumer_no', 'first_name', 'email', 'phone']

if not all(field in csv_headers for field in required_fields):
    return "Missing required columns. Upload failed."
Enter fullscreen mode Exit fullscreen mode

Layer 2: Are the values correct?

# Check email format
if not is_valid_email(row['email']):
    errors.append("Row 5: Invalid email")

# Check phone is numeric
if not is_numeric(row['phone']):
    errors.append("Row 12: Phone must be numbers")
Enter fullscreen mode Exit fullscreen mode

Layer 3: Do relationships make sense?

# Does this consumer actually exist?
if row['consumer_no'] not in valid_consumers:
    errors.append("Row 89: Consumer not found in system")

# Is this meter already assigned?
if meter_already_mapped(row['meter_no']):
    errors.append("Row 145: Meter already assigned to another consumer")
Enter fullscreen mode Exit fullscreen mode

Why three layers?

Each layer catches different mistakes.

  • Layer 1: Catches file corruption
  • Layer 2: Catches bad data
  • Layer 3: Catches business logic violations

Together? They catch 98% of problems before they touch production.


One More Thing: Redis Compression (The Underrated Hero)

We store 1 million records in Redis temporarily.

Uncompressed? 850MB. Your Redis crashes.

Compressed? 127MB. Still plenty of space.

import zlib
import pickle

def compress_data(data):
    """
    Compress before storing in Redis
    6.7:1 compression ratio
    """
    serialized = pickle.dumps(data)
    compressed = zlib.compress(serialized, level=6)
    return compressed

def decompress_data(compressed):
    """
    Decompress when retrieving
    Takes 400ms for 5K rows (negligible)
    """
    serialized = zlib.decompress(compressed)
    data = pickle.loads(serialized)
    return data

# Usage
cache.set(f"chunk:{i}", compress_data(chunk_data))
chunk = decompress_data(cache.get(f"chunk:{i}"))
Enter fullscreen mode Exit fullscreen mode

The math:

  • Compression ratio: 6.7:1
  • Memory saved: 85%
  • Decompression time: 400ms per 5K chunk (negligible)
  • Cost saved: Huge (16GB Redis → 4GB Redis)

It's one of those "boring" optimizations that's actually genius.


Real Performance: What Does This Actually Achieve?

We tested with real data. Real numbers.

The Setup:

  • 1,000,000 consumer records
  • 25 fields per record
  • Real-world messy data
  • Standard server (8GB RAM, 4 cores)

The Results:

STAGE                           TIME
─────────────────────────────────────
File upload to S3               1m 30s
Load to Redis (with compress)   1m 45s
Preload relationships (4 queries) 5s
Fuzzy mapping (find columns)    18s
Transform all rows              3m 20s
Validate 3 layers               4m 50s
Insert to database (8 workers)  8m 40s
─────────────────────────────────────
TOTAL                          20m 28s
Enter fullscreen mode Exit fullscreen mode

For 1 million records. On a standard server. Zero crashes.

With:

  • Real-time monitoring dashboard
  • Automatic checkpoints every 10K rows
  • Smart alerts catching problems
  • Auto-scaling workers (5 → 8 → 5)
  • Adaptive chunk sizing (1K - 10K)
  • Zero database overload (4 queries total)

Before our changes? Would crash around 200K records.


Why This Beats Everything Else

Let's be honest. You could use a migration tool from the market.

But here's what you get with this architecture:

  • Works at any scale (1K to 10M records)
  • Auto-adjusts (chunk size, workers, everything)
  • Never crashes (validation catches errors first)
  • Fast feedback (user not blocked waiting)
  • Easy to debug (real-time dashboard shows everything)
  • Recovers from failures (checkpoints every 10K rows)
  • Catches problems early (smart alerts)
  • Costs almost nothing (runs on existing infrastructure)

The migration tools? They work. But they're generic.

This? This is built for your data. Your system. Your scale.


The Moment It All Clicked

We had a customer with 2 million records.

Old system: "This will take 2 weeks and probably crash."

New system: "25 minutes, guaranteed, no downtime."

They were skeptical. We showed them the architecture.

More importantly, we showed them the dashboard:

They said: "Wait, I can actually SEE what's happening?"

We said: "Yes. And if anything goes wrong, you'll know immediately."

Ran the migration. 25 minutes 42 seconds. Success.

They asked: "How are you doing this so fast AND so safely?"

We said: "We're not smarter. We just thought differently."


What You Need to Remember

If you're building something that scales:

  1. Separate concerns — Files go to S3, metadata to DB, temp data to Redis
  2. Query once, reuse forever — Cache valid values upfront (4 queries, not 4M)
  3. Process async — Don't block users waiting
  4. Use workers — One person can't do million tasks; 8 people can
  5. Adjust dynamically — Chunk size based on system health
  6. Validate ruthlessly — Three layers catch 98% of errors
  7. Compress everything — 6:1 compression is free performance
  8. Save checkpoints — Resume from failure, never start over
  9. Monitor everything — Real-time dashboard shows what's happening
  10. Alert proactively — Catch problems before they become disasters

Do these things. You'll handle any scale.


Next Up: Part 3

Next week, we dive into:

  • What actually happens at 2 million records (with REAL production numbers)
  • How auto-scaling actually works (code + scenarios)
  • The bulk insert strategy (45 mins → 2.5 mins)
  • The lessons we learned (some painful)
  • Economics: How much does this cost?

Hint: The answer will surprise you.


Questions?

Drop them below. I genuinely love talking about this.

And if you built something similar, tell me about it. Would love to hear your story.

Share this with your team if you're dealing with scale. Nobody should have to learn these lessons the hard way.


Next: Part 3: When 2 Million Records Hit — The Reality Check


Written with ❤️ for engineers who believe that scale shouldn't mean pain, and monitoring shouldn't be an afterthought.

If this helped, a share would mean a lot. Your colleague is probably struggling with migrations right now.

Top comments (0)