DEV Community

Cover image for Zero-Downtime Database Migration: The Complete Engineering Guide
Arijit Ghosh
Arijit Ghosh

Posted on

Zero-Downtime Database Migration: The Complete Engineering Guide

Mission Critical: Migrating a production database is like replacing the engine of a plane mid-flight β€” users continue using your application while you swap the core infrastructure beneath it. One mistake can result in downtime, data corruption, or significant revenue loss.

The gold standard: Zero Downtime Migration (ZDM).

Table of Contents

  1. Why Migrate Databases?
  2. The Universal 5-Phase Migration Framework
  3. Migration Scenario Deep Dives
  4. Observability & Monitoring
  5. Risk Mitigation & Rollback Strategies
  6. Operational Playbook
  7. Best Practices & Lessons Learned

🎯 Why Migrate Databases?

Database migrations aren't undertaken lightly. Here are the primary business and technical drivers:

Business Drivers

  • πŸ’° Cost Optimization: Oracle licenses can cost millions annually
  • πŸš€ Scalability Requirements: Handle billions of records across distributed systems
  • ⚑ Performance Improvements: Modern databases offer superior query performance
  • πŸ”“ Vendor Independence: Avoid lock-in with proprietary solutions
  • ☁️ Cloud Migration: Leverage managed services and elastic scaling

Technical Drivers

  • πŸ›‘οΈ Security & Compliance: Modern encryption, audit trails, regulatory requirements
  • πŸ”„ Feature Requirements: ACID transactions, complex queries, or flexible schemas
  • 🧩 System Consolidation: Merge databases after acquisitions or reorganizations
  • πŸ“ˆ Operational Excellence: Automated backups, patching, monitoring

πŸ—οΈ The Universal 5-Phase Migration Framework

Every successful zero-downtime migration follows this battle-tested pattern, regardless of source and target database types:

Universal 5-Phase Migration Framework

Phase 1: Preparation & Planning

The foundation of any successful migration. Poor planning is the #1 cause of migration failures.

Component Description Tools/Techniques Duration
Schema Mapping Analyze source vs target structure compatibility AWS SCT, ora2pg, custom analysis scripts 1-2 weeks
Tool Selection Choose CDC, replication, and ETL technologies Debezium, DMS, GoldenGate, Kafka Connect 3-5 days
Rollback Strategy Design failure recovery procedures Feature flags, DNS switching, backup restoration 1 week
SLA Definition Define acceptable latency and downtime thresholds Business stakeholder meetings 2-3 days

Critical Planning Questions:

  • What is the maximum acceptable replication lag?
  • How will schema differences be handled?
  • What is the rollback time requirement?
  • Which application components need updates?

Phase 2: Bulk Load (Historical Data)

Transfer existing data from source to target database. This is typically the longest phase.

Architectural Diagram for Bulk Historical Data Load

Implementation Strategies by Migration Type:

Migration Type Recommended Approach Tools Typical Duration
PostgreSQL β†’ Cassandra Spark with JDBC + Cassandra connectors Spark, DataStax drivers 2-7 days
MongoDB β†’ MySQL mongoexport + custom ETL scripts Python, Pandas, SQLAlchemy 1-3 days
Oracle β†’ PostgreSQL pgloader or AWS DMS pgloader, DMS, ora2pg 3-10 days
On-Prem β†’ Cloud Native dump/restore + parallel processing pg_dump, aws s3 cp, parallel 1-5 days

Phase 3: Change Data Capture (CDC)

Real-time synchronization of ongoing changes while the bulk load completes and during the dual-write phase.

Architectural Diagram for CDC

CDC Tool Selection Matrix:

Source DB Target DB Recommended Tool Complexity Latency Cost
PostgreSQL Cassandra Debezium + Kafka + Custom Sink Medium <1s $
MySQL MongoDB Debezium + Kafka + MongoDB Connector Low <2s $
Oracle PostgreSQL Oracle GoldenGate + DMS High <5s $$$
SQL Server Azure SQL Native CDC + Azure Data Factory Low <3s $$
MongoDB Elasticsearch MongoDB Change Streams + Logstash Medium <2s $

Phase 4: Dual Writes (Safety Layer)

Write to both databases simultaneously to ensure data consistency and provide a safety net during cutover.

import logging
from typing import Any, Dict, Optional
from dataclasses import dataclass
from enum import Enum

class WriteResult(Enum):
    SUCCESS = "success"
    SOURCE_FAILED = "source_failed"
    TARGET_FAILED = "target_failed"
    BOTH_FAILED = "both_failed"

@dataclass
class DatabaseOperation:
    operation_type: str
    table: str
    data: Dict[str, Any]
    primary_key: Optional[str] = None

class DatabaseMigrationProxy:
    def __init__(self, source_db, target_db, feature_flag_service, metrics_service):
        self.source_db = source_db
        self.target_db = target_db
        self.feature_flags = feature_flag_service
        self.metrics = metrics_service
        self.logger = logging.getLogger(__name__)

    def execute_write(self, operation: DatabaseOperation) -> WriteResult:
        """Execute write operation with dual-write support and comprehensive error handling"""

        source_success = False
        target_success = False

        try:
            # Primary write (source database - source of truth)
            self.logger.debug(f"Executing {operation.operation_type} on source DB")
            source_result = self._execute_on_source(operation)
            source_success = True
            self.metrics.increment('source_write_success')

        except Exception as e:
            self.logger.error(f"Source write failed: {e}")
            self.metrics.increment('source_write_failure')
            return WriteResult.SOURCE_FAILED

        # Secondary write (target database) - only if enabled via feature flag
        if self.feature_flags.is_enabled('dual_write_mode'):
            try:
                self.logger.debug(f"Executing {operation.operation_type} on target DB")
                transformed_operation = self._transform_for_target(operation)
                target_result = self._execute_on_target(transformed_operation)
                target_success = True
                self.metrics.increment('target_write_success')

            except Exception as e:
                self.logger.warning(f"Target write failed (non-blocking): {e}")
                self.metrics.increment('target_write_failure')
                # Target failure is non-blocking during dual-write phase
                self._enqueue_for_retry(operation)

        # Log dual-write results for monitoring
        if source_success and target_success:
            self.metrics.increment('dual_write_success')
            return WriteResult.SUCCESS
        elif source_success:
            return WriteResult.SUCCESS  # Target failure is acceptable during migration
        else:
            return WriteResult.SOURCE_FAILED

    def execute_read(self, query: str, params: Dict[str, Any] = None):
        """Route read operations based on migration phase"""

        read_percentage = self.feature_flags.get_percentage('read_from_target')

        if self._should_read_from_target(read_percentage):
            try:
                self.logger.debug("Reading from target database")
                result = self.target_db.execute(query, params)
                self.metrics.increment('target_read_success')
                return result
            except Exception as e:
                self.logger.error(f"Target read failed, falling back to source: {e}")
                self.metrics.increment('target_read_failure')
                # Fallback to source on target read failure

        self.logger.debug("Reading from source database")
        result = self.source_db.execute(query, params)
        self.metrics.increment('source_read_success')
        return result

    def _execute_on_source(self, operation: DatabaseOperation):
        """Execute operation on source database"""
        if operation.operation_type == 'INSERT':
            return self.source_db.insert(operation.table, operation.data)
        elif operation.operation_type == 'UPDATE':
            return self.source_db.update(operation.table, operation.data, operation.primary_key)
        elif operation.operation_type == 'DELETE':
            return self.source_db.delete(operation.table, operation.primary_key)

    def _execute_on_target(self, operation: DatabaseOperation):
        """Execute operation on target database with schema transformation"""
        if operation.operation_type == 'INSERT':
            return self.target_db.insert(operation.table, operation.data)
        elif operation.operation_type == 'UPDATE':
            return self.target_db.update(operation.table, operation.data, operation.primary_key)
        elif operation.operation_type == 'DELETE':
            return self.target_db.delete(operation.table, operation.primary_key)

    def _transform_for_target(self, operation: DatabaseOperation) -> DatabaseOperation:
        """Transform operation for target database schema"""
        # Example transformation logic (customize based on your schema differences)
        transformed_data = operation.data.copy()

        # Handle column name changes
        column_mappings = {
            'user_id': 'id',
            'created_date': 'created_at',
            'modified_date': 'updated_at'
        }

        for old_col, new_col in column_mappings.items():
            if old_col in transformed_data:
                transformed_data[new_col] = transformed_data.pop(old_col)

        # Handle data type conversions
        if 'created_at' in transformed_data and isinstance(transformed_data['created_at'], str):
            from datetime import datetime
            transformed_data['created_at'] = datetime.fromisoformat(transformed_data['created_at'])

        return DatabaseOperation(
            operation_type=operation.operation_type,
            table=operation.table,
            data=transformed_data,
            primary_key=operation.primary_key
        )

    def _should_read_from_target(self, percentage: int) -> bool:
        """Determine if read should be routed to target based on percentage rollout"""
        import random
        return random.randint(1, 100) <= percentage

    def _enqueue_for_retry(self, operation: DatabaseOperation):
        """Enqueue failed target writes for retry processing"""
        # Implementation would depend on your retry mechanism
        # Could use Redis queue, database table, or message queue
        self.logger.info(f"Enqueued operation for retry: {operation}")
Enter fullscreen mode Exit fullscreen mode

Phase 5: Cutover & Verification

The final phase where traffic is gradually shifted to the target database with comprehensive validation.

Architectural Diagram for Cutover & Verification


πŸ”„ Migration Scenario Deep Dives

Scenario 1: SQL β†’ NoSQL (PostgreSQL to Cassandra)

Business Case

Why Migrate?

  • πŸš€ Horizontal Scalability: Handle billions of IoT sensor readings across distributed commodity hardware
  • ⚑ High Write Throughput: Support 100K+ writes/second for real-time analytics
  • 🧩 Flexible Schema Evolution: Dynamic user profiles and metadata that change frequently
  • 🌍 Global Distribution: Multi-region deployment with local read/write capabilities

Architecture Overview

Architectural Diagram for SQL -> NoSQL Migration

Implementation Deep Dive

Step 1: Schema Design & Transformation

PostgreSQL normalized tables need to be denormalized for Cassandra's wide-column model:

-- PostgreSQL Source Schema
CREATE TABLE users (
    user_id SERIAL PRIMARY KEY,
    email VARCHAR(255) UNIQUE NOT NULL,
    name VARCHAR(255) NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE user_sessions (
    session_id UUID PRIMARY KEY,
    user_id INTEGER REFERENCES users(user_id),
    device_info JSONB,
    created_at TIMESTAMP DEFAULT NOW(),
    last_activity TIMESTAMP DEFAULT NOW()
);

CREATE TABLE user_events (
    event_id UUID PRIMARY KEY,
    user_id INTEGER REFERENCES users(user_id),
    event_type VARCHAR(50),
    event_data JSONB,
    created_at TIMESTAMP DEFAULT NOW()
);
Enter fullscreen mode Exit fullscreen mode
-- Cassandra Target Schema (Denormalized)
CREATE KEYSPACE user_data WITH REPLICATION = {
    'class': 'NetworkTopologyStrategy',
    'datacenter1': 3,
    'datacenter2': 3
};

-- User profile table (optimized for user lookup)
CREATE TABLE user_data.users (
    user_id BIGINT,
    email TEXT,
    name TEXT,
    created_at TIMESTAMP,
    session_count COUNTER,
    last_login TIMESTAMP,
    PRIMARY KEY (user_id)
);

-- User events table (optimized for time-series queries)
CREATE TABLE user_data.user_events_by_user (
    user_id BIGINT,
    event_date DATE,
    event_time TIMESTAMP,
    event_id UUID,
    event_type TEXT,
    event_data TEXT,
    device_info TEXT,
    PRIMARY KEY ((user_id, event_date), event_time, event_id)
) WITH CLUSTERING ORDER BY (event_time DESC);

-- Events by type table (optimized for analytics)
CREATE TABLE user_data.events_by_type (
    event_type TEXT,
    event_date DATE,
    event_time TIMESTAMP,
    user_id BIGINT,
    event_id UUID,
    event_data TEXT,
    PRIMARY KEY ((event_type, event_date), event_time, event_id)
) WITH CLUSTERING ORDER BY (event_time DESC);
Enter fullscreen mode Exit fullscreen mode

Step 2: Bulk Migration with Spark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, current_timestamp, date_format
from pyspark.sql.types import *
import json

def create_spark_session():
    return SparkSession.builder \
        .appName("PostgreSQL_to_Cassandra_Migration") \
        .config("spark.cassandra.connection.host", "cassandra-cluster-1,cassandra-cluster-2") \
        .config("spark.cassandra.connection.port", "9042") \
        .config("spark.cassandra.auth.username", "cassandra_user") \
        .config("spark.cassandra.auth.password", "secure_password") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .getOrCreate()

def migrate_users_table(spark):
    """Migrate users table with data transformation"""

    # Extract from PostgreSQL with optimized partitioning
    users_df = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://pg-primary:5432/production") \
        .option("dbtable", "users") \
        .option("user", "migration_user") \
        .option("password", "secure_password") \
        .option("driver", "org.postgresql.Driver") \
        .option("numPartitions", "10") \
        .option("partitionColumn", "user_id") \
        .option("lowerBound", "1") \
        .option("upperBound", "1000000") \
        .load()

    # Transform for Cassandra
    cassandra_users_df = users_df.select(
        col("user_id").cast(LongType()).alias("user_id"),
        col("email").alias("email"),
        col("name").alias("name"),
        col("created_at").alias("created_at"),
        lit(0).alias("session_count"),  # Initialize counter
        lit(None).cast(TimestampType()).alias("last_login")
    )

    # Write to Cassandra with batch optimization
    cassandra_users_df.write \
        .format("org.apache.spark.sql.cassandra") \
        .options(table="users", keyspace="user_data") \
        .option("spark.cassandra.output.batch.size.rows", "1000") \
        .option("spark.cassandra.output.concurrent.writes", "10") \
        .mode("append") \
        .save()

    return users_df.count()

def migrate_events_with_denormalization(spark):
    """Migrate events with denormalization for time-series optimization"""

    # Join users and events for denormalization
    query = """
    (SELECT 
        e.event_id,
        e.user_id,
        u.email,
        u.name,
        e.event_type,
        e.event_data,
        s.device_info,
        e.created_at
     FROM user_events e
     JOIN users u ON e.user_id = u.user_id
     LEFT JOIN user_sessions s ON s.user_id = e.user_id
     ORDER BY e.created_at) as events_with_user_data
    """

    events_df = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://pg-primary:5432/production") \
        .option("dbtable", query) \
        .option("user", "migration_user") \
        .option("password", "secure_password") \
        .load()

    # Transform for Cassandra time-series tables
    events_by_user_df = events_df.select(
        col("user_id").cast(LongType()),
        date_format(col("created_at"), "yyyy-MM-dd").cast(DateType()).alias("event_date"),
        col("created_at").alias("event_time"),
        col("event_id"),
        col("event_type"),
        col("event_data").cast(StringType()),
        when(col("device_info").isNotNull(), col("device_info").cast(StringType())).alias("device_info")
    )

    # Write to user-centric events table
    events_by_user_df.write \
        .format("org.apache.spark.sql.cassandra") \
        .options(table="user_events_by_user", keyspace="user_data") \
        .option("spark.cassandra.output.batch.size.rows", "500") \
        .mode("append") \
        .save()

    # Write to type-centric events table for analytics
    events_by_type_df = events_df.select(
        col("event_type"),
        date_format(col("created_at"), "yyyy-MM-dd").cast(DateType()).alias("event_date"),
        col("created_at").alias("event_time"),
        col("user_id").cast(LongType()),
        col("event_id"),
        col("event_data").cast(StringType())
    )

    events_by_type_df.write \
        .format("org.apache.spark.sql.cassandra") \
        .options(table="events_by_type", keyspace="user_data") \
        .option("spark.cassandra.output.batch.size.rows", "500") \
        .mode("append") \
        .save()

def main():
    spark = create_spark_session()

    try:
        print("Starting PostgreSQL to Cassandra migration...")

        # Migrate users table
        user_count = migrate_users_table(spark)
        print(f"Migrated {user_count} users")

        # Migrate events with denormalization
        migrate_events_with_denormalization(spark)
        print("Events migration completed")

        print("Bulk migration completed successfully!")

    except Exception as e:
        print(f"Migration failed: {e}")
        raise
    finally:
        spark.stop()

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Step 3: Real-time CDC with Debezium

# Debezium PostgreSQL Connector Configuration
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: postgres-cassandra-connector
  labels:
    strimzi.io/cluster: migration-cluster
spec:
  class: io.debezium.connector.postgresql.PostgresConnector
  tasksMax: 3
  config:
    database.hostname: pg-primary.production.local
    database.port: 5432
    database.user: debezium_user
    database.password: secure_password
    database.dbname: production
    database.server.name: production-postgres

    # Table filtering
    table.include.list: public.users,public.user_sessions,public.user_events

    # Change event routing
    transforms: route
    transforms.route.type: io.debezium.transforms.ByLogicalTableRouter
    transforms.route.topic.regex: production-postgres.public.(.*)
    transforms.route.topic.replacement: postgres.events.$1

    # Snapshot configuration
    snapshot.mode: initial
    slot.name: debezium_migration_slot

    # Performance tuning
    max.batch.size: 2048
    max.queue.size: 8192

    # Schema evolution
    include.schema.changes: true
    schema.history.internal.kafka.topic: schema-changes.production-postgres
    schema.history.internal.kafka.bootstrap.servers: kafka-cluster:9092
Enter fullscreen mode Exit fullscreen mode

Step 4: Custom Kafka β†’ Cassandra Sink Connector

@Component
public class CassandraSinkConnector extends SinkConnector {

    private Map<String, String> configProps;

    @Override
    public void start(Map<String, String> props) {
        this.configProps = props;
    }

    @Override
    public Class<? extends Task> taskClass() {
        return CassandraSinkTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List<Map<String, String>> taskConfigs = new ArrayList<>();
        for (int i = 0; i < maxTasks; i++) {
            taskConfigs.add(new HashMap<>(configProps));
        }
        return taskConfigs;
    }
}

@Component
public class CassandraSinkTask extends SinkTask {

    private CqlSession cassandraSession;
    private Map<String, PreparedStatement> preparedStatements;

    @Override
    public void start(Map<String, String> props) {
        // Initialize Cassandra connection
        cassandraSession = CqlSession.builder()
            .addContactPoint(new InetSocketAddress(props.get("cassandra.host"), 9042))
            .withAuthCredentials(props.get("cassandra.username"), props.get("cassandra.password"))
            .withKeyspace("user_data")
            .build();

        // Prepare statements for each table
        prepareStatements();
    }

    @Override
    public void put(Collection<SinkRecord> records) {
        for (SinkRecord record : records) {
            try {
                processRecord(record);
            } catch (Exception e) {
                log.error("Failed to process record: {}", record, e);
                // Implement retry logic or DLQ
                handleFailedRecord(record, e);
            }
        }
    }

    private void processRecord(SinkRecord record) {
        String tableName = extractTableName(record.topic());
        Struct payload = (Struct) record.value();
        String operation = payload.getString("op"); // c=create, u=update, d=delete

        switch (operation) {
            case "c":
            case "u":
                handleUpsert(tableName, payload.getStruct("after"));
                break;
            case "d":
                handleDelete(tableName, payload.getStruct("before"));
                break;
        }
    }

    private void handleUpsert(String tableName, Struct data) {
        switch (tableName) {
            case "users":
                upsertUser(data);
                break;
            case "user_events":
                insertUserEvent(data);
                break;
            case "user_sessions":
                upsertUserSession(data);
                break;
        }
    }

    private void upsertUser(Struct userData) {
        PreparedStatement stmt = preparedStatements.get("upsert_user");
        cassandraSession.execute(stmt.bind(
            userData.getInt64("user_id"),
            userData.getString("email"),
            userData.getString("name"),
            Instant.ofEpochMilli(userData.getInt64("created_at"))
        ));
    }

    private void insertUserEvent(Struct eventData) {
        // Insert into both user-centric and type-centric tables for denormalization
        Long userId = eventData.getInt64("user_id");
        String eventType = eventData.getString("event_type");
        Instant eventTime = Instant.ofEpochMilli(eventData.getInt64("created_at"));
        LocalDate eventDate = eventTime.atZone(ZoneOffset.UTC).toLocalDate();

        // Insert into user_events_by_user table
        PreparedStatement userEventStmt = preparedStatements.get("insert_user_event");
        cassandraSession.execute(userEventStmt.bind(
            userId,
            eventDate,
            eventTime,
            UUID.fromString(eventData.getString("event_id")),
            eventType,
            eventData.getString("event_data"),
            eventData.getString("device_info")
        ));

        // Insert into events_by_type table for analytics
        PreparedStatement typeEventStmt = preparedStatements.get("insert_event_by_type");
        cassandraSession.execute(typeEventStmt.bind(
            eventType,
            eventDate,
            eventTime,
            userId,
            UUID.fromString(eventData.getString("event_id")),
            eventData.getString("event_data")
        ));
    }

    private void prepareStatements() {
        preparedStatements = new HashMap<>();

        preparedStatements.put("upsert_user",
            cassandraSession.prepare(
                "INSERT INTO users (user_id, email, name, created_at) VALUES (?, ?, ?, ?)"
            ));

        preparedStatements.put("insert_user_event",
            cassandraSession.prepare(
                "INSERT INTO user_events_by_user (user_id, event_date, event_time, event_id, event_type, event_data, device_info) VALUES (?, ?, ?, ?, ?, ?, ?)"
            ));

        preparedStatements.put("insert_event_by_type",
            cassandraSession.prepare(
                "INSERT INTO events_by_type (event_type, event_date, event_time, user_id, event_id, event_data) VALUES (?, ?, ?, ?, ?, ?)"
            ));
    }
}
Enter fullscreen mode Exit fullscreen mode

Scenario 2: NoSQL β†’ SQL (MongoDB to MySQL)

Business Case

Why Migrate?

  • πŸ’° ACID Transactions: Critical for financial operations, order processing, payment systems
  • πŸ” Complex Analytical Queries: JOINs, aggregations, window functions for business intelligence
  • πŸ›‘οΈ Data Integrity Enforcement: Foreign key constraints, check constraints, schema validation
  • πŸ“Š Mature Ecosystem: Better tooling for reporting, data warehousing, and analytics

Schema Transformation Strategy

MongoDB's document-oriented structure needs careful normalization for relational databases:

// MongoDB Document Structure (Before)
{
  "_id": ObjectId("507f1f77bcf86cd799439011"),
  "customer_id": 12345,
  "profile": {
    "first_name": "Alice",
    "last_name": "Johnson",
    "email": "alice@example.com",
    "phone": "+1-555-0123",
    "address": {
      "street": "123 Main St",
      "city": "San Francisco",
      "state": "CA",
      "zip_code": "94105"
    },
    "preferences": {
      "marketing_emails": true,
      "sms_notifications": false,
      "theme": "dark"
    }
  },
  "orders": [
    {
      "order_id": 1001,
      "order_date": ISODate("2024-03-15T10:30:00Z"),
      "total_amount": 299.99,
      "status": "completed",
      "items": [
        {
          "product_id": "PROD123",
          "product_name": "Wireless Headphones",
          "quantity": 1,
          "unit_price": 199.99,
          "category": "Electronics"
        },
        {
          "product_id": "PROD456",
          "product_name": "Phone Case",
          "quantity": 2,
          "unit_price": 50.00,
          "category": "Accessories"
        }
      ],
      "shipping": {
        "method": "express",
        "cost": 15.99,
        "tracking_number": "TRACK123456"
      }
    }
  ],
  "payment_methods": [
    {
      "method_id": "pm_001",
      "type": "credit_card",
      "last_four": "4242",
      "expires": "12/25",
      "is_default": true
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode
-- MySQL Normalized Schema (After)

-- Customer core information
CREATE TABLE customers (
    customer_id BIGINT PRIMARY KEY,
    first_name VARCHAR(100) NOT NULL,
    last_name VARCHAR(100) NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    phone VARCHAR(20),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

    INDEX idx_email (email),
    INDEX idx_phone (phone)
) ENGINE=InnoDB;

-- Customer addresses (normalized)
CREATE TABLE customer_addresses (
    address_id BIGINT AUTO_INCREMENT PRIMARY KEY,
    customer_id BIGINT NOT NULL,
    address_type ENUM('billing', 'shipping', 'both') DEFAULT 'both',
    street VARCHAR(255) NOT NULL,
    city VARCHAR(100) NOT NULL,
    state VARCHAR(50) NOT NULL,
    zip_code VARCHAR(10) NOT NULL,
    country VARCHAR(50) DEFAULT 'US',
    is_active BOOLEAN DEFAULT TRUE,

    FOREIGN KEY (customer_id) REFERENCES customers(customer_id) ON DELETE CASCADE,
    INDEX idx_customer_addresses (customer_id)
) ENGINE=InnoDB;

-- Customer preferences (JSON for flexibility)
CREATE TABLE customer_preferences (
    customer_id BIGINT PRIMARY KEY,
    marketing_emails BOOLEAN DEFAULT FALSE,
    sms_notifications BOOLEAN DEFAULT FALSE,
    theme ENUM('light', 'dark', 'auto') DEFAULT 'light',
    preferences_json JSON, -- Store additional preferences as JSON

    FOREIGN KEY (customer_id) REFERENCES customers(customer_id) ON DELETE CASCADE
) ENGINE=InnoDB;

-- Orders table
CREATE TABLE orders (
    order_id BIGINT PRIMARY KEY,
    customer_id BIGINT NOT NULL,
    order_date TIMESTAMP NOT NULL,
    total_amount DECIMAL(12, 2) NOT NULL,
    status ENUM('pending', 'processing', 'shipped', 'completed', 'cancelled') NOT NULL,
    shipping_method VARCHAR(50),
    shipping_cost DECIMAL(8, 2),
    tracking_number VARCHAR(100),

    FOREIGN KEY (customer_id) REFERENCES customers(customer_id),
    INDEX idx_customer_orders (customer_id, order_date),
    INDEX idx_order_date (order_date),
    INDEX idx_order_status (status)
) ENGINE=InnoDB;

-- Order items table
CREATE TABLE order_items (
    item_id BIGINT AUTO_INCREMENT PRIMARY KEY,
    order_id BIGINT NOT NULL,
    product_id VARCHAR(50) NOT NULL,
    product_name VARCHAR(255) NOT NULL,
    category VARCHAR(100),
    quantity INT NOT NULL,
    unit_price DECIMAL(10, 2) NOT NULL,
    total_price DECIMAL(10, 2) GENERATED ALWAYS AS (quantity * unit_price) STORED,

    FOREIGN KEY (order_id) REFERENCES orders(order_id) ON DELETE CASCADE,
    INDEX idx_order_items (order_id),
    INDEX idx_product (product_id),
    INDEX idx_category (category)
) ENGINE=InnoDB;

-- Payment methods table
CREATE TABLE payment_methods (
    method_id VARCHAR(50) PRIMARY KEY,
    customer_id BIGINT NOT NULL,
    payment_type ENUM('credit_card', 'debit_card', 'paypal', 'apple_pay') NOT NULL,
    last_four VARCHAR(4),
    expires VARCHAR(7), -- MM/YY format
    is_default BOOLEAN DEFAULT FALSE,
    is_active BOOLEAN DEFAULT TRUE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    FOREIGN KEY (customer_id) REFERENCES customers(customer_id) ON DELETE CASCADE,
    INDEX idx_customer_payment_methods (customer_id)
) ENGINE=InnoDB;

-- Ensure only one default payment method per customer
CREATE TRIGGER ensure_one_default_payment_method
    BEFORE INSERT ON payment_methods
    FOR EACH ROW
BEGIN
    IF NEW.is_default = TRUE THEN
        UPDATE payment_methods 
        SET is_default = FALSE 
        WHERE customer_id = NEW.customer_id AND is_default = TRUE;
    END IF;
END;
Enter fullscreen mode Exit fullscreen mode

Advanced ETL Pipeline

import pymongo
import mysql.connector
from mysql.connector import Error
import json
import logging
from datetime import datetime
from typing import Dict, List, Any, Optional
import pandas as pd
from dataclasses import dataclass

@dataclass
class MigrationStats:
    customers_processed: int = 0
    orders_processed: int = 0
    items_processed: int = 0
    errors: int = 0

class MongoToMySQLMigrator:
    def __init__(self, mongo_config: Dict, mysql_config: Dict):
        self.mongo_client = pymongo.MongoClient(mongo_config['connection_string'])
        self.mongo_db = self.mongo_client[mongo_config['database']]

        self.mysql_conn = mysql.connector.connect(**mysql_config)
        self.mysql_cursor = self.mysql_conn.cursor()

        self.stats = MigrationStats()
        self.logger = logging.getLogger(__name__)

        # Prepare SQL statements
        self.prepare_sql_statements()

    def prepare_sql_statements(self):
        """Prepare all SQL statements for better performance"""
        self.sql_statements = {
            'insert_customer': """
                INSERT INTO customers (customer_id, first_name, last_name, email, phone)
                VALUES (%s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                first_name = VALUES(first_name),
                last_name = VALUES(last_name),
                email = VALUES(email),
                phone = VALUES(phone)
            """,

            'insert_address': """
                INSERT INTO customer_addresses 
                (customer_id, address_type, street, city, state, zip_code, country)
                VALUES (%s, %s, %s, %s, %s, %s, %s)
            """,

            'insert_preferences': """
                INSERT INTO customer_preferences 
                (customer_id, marketing_emails, sms_notifications, theme, preferences_json)
                VALUES (%s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                marketing_emails = VALUES(marketing_emails),
                sms_notifications = VALUES(sms_notifications),
                theme = VALUES(theme),
                preferences_json = VALUES(preferences_json)
            """,

            'insert_order': """
                INSERT INTO orders 
                (order_id, customer_id, order_date, total_amount, status, 
                 shipping_method, shipping_cost, tracking_number)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                total_amount = VALUES(total_amount),
                status = VALUES(status)
            """,

            'insert_order_item': """
                INSERT INTO order_items 
                (order_id, product_id, product_name, category, quantity, unit_price)
                VALUES (%s, %s, %s, %s, %s, %s)
            """,

            'insert_payment_method': """
                INSERT INTO payment_methods 
                (method_id, customer_id, payment_type, last_four, expires, is_default)
                VALUES (%s, %s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                is_default = VALUES(is_default),
                is_active = VALUES(is_active)
            """
        }

    def migrate_customers(self, batch_size: int = 1000):
        """Migrate customers with batch processing"""
        collection = self.mongo_db.customers
        total_docs = collection.count_documents({})
        self.logger.info(f"Starting migration of {total_docs} customer documents")

        batch = []
        processed = 0

        try:
            for doc in collection.find().batch_size(batch_size):
                try:
                    batch.append(doc)

                    if len(batch) >= batch_size:
                        self.process_customer_batch(batch)
                        processed += len(batch)
                        self.logger.info(f"Processed {processed}/{total_docs} customers")
                        batch = []

                except Exception as e:
                    self.logger.error(f"Error processing customer {doc.get('customer_id')}: {e}")
                    self.stats.errors += 1

            # Process remaining batch
            if batch:
                self.process_customer_batch(batch)
                processed += len(batch)

            self.mysql_conn.commit()
            self.logger.info(f"Customer migration completed. Processed: {processed}, Errors: {self.stats.errors}")

        except Exception as e:
            self.mysql_conn.rollback()
            self.logger.error(f"Batch processing failed: {e}")
            raise

    def process_customer_batch(self, customer_batch: List[Dict]):
        """Process a batch of customers with all related data"""
        for doc in customer_batch:
            try:
                # Extract customer data
                customer_id = doc['customer_id']
                profile = doc.get('profile', {})

                # Insert customer
                self.mysql_cursor.execute(self.sql_statements['insert_customer'], (
                    customer_id,
                    profile.get('first_name'),
                    profile.get('last_name'),
                    profile.get('email'),
                    profile.get('phone')
                ))

                # Insert address if present
                if 'address' in profile:
                    address = profile['address']
                    self.mysql_cursor.execute(self.sql_statements['insert_address'], (
                        customer_id,
                        'both',  # default address type
                        address.get('street'),
                        address.get('city'),
                        address.get('state'),
                        address.get('zip_code'),
                        address.get('country', 'US')
                    ))

                # Insert preferences
                preferences = profile.get('preferences', {})
                self.mysql_cursor.execute(self.sql_statements['insert_preferences'], (
                    customer_id,
                    preferences.get('marketing_emails', False),
                    preferences.get('sms_notifications', False),
                    preferences.get('theme', 'light'),
                    json.dumps(preferences) if preferences else None
                ))

                # Process orders
                orders = doc.get('orders', [])
                for order in orders:
                    self.process_order(customer_id, order)

                # Process payment methods
                payment_methods = doc.get('payment_methods', [])
                for pm in payment_methods:
                    self.mysql_cursor.execute(self.sql_statements['insert_payment_method'], (
                        pm.get('method_id'),
                        customer_id,
                        pm.get('type'),
                        pm.get('last_four'),
                        pm.get('expires'),
                        pm.get('is_default', False)
                    ))

                self.stats.customers_processed += 1

            except Exception as e:
                self.logger.error(f"Error processing customer {customer_id}: {e}")
                self.stats.errors += 1

    def process_order(self, customer_id: int, order: Dict):
        """Process individual order with items"""
        order_id = order['order_id']

        # Parse order date
        order_date = order['order_date']
        if isinstance(order_date, str):
            order_date = datetime.fromisoformat(order_date.replace('Z', '+00:00'))

        # Insert order
        shipping = order.get('shipping', {})
        self.mysql_cursor.execute(self.sql_statements['insert_order'], (
            order_id,
            customer_id,
            order_date,
            order['total_amount'],
            order['status'],
            shipping.get('method'),
            shipping.get('cost'),
            shipping.get('tracking_number')
        ))

        # Insert order items
        items = order.get('items', [])
        for item in items:
            self.mysql_cursor.execute(self.sql_statements['insert_order_item'], (
                order_id,
                item['product_id'],
                item['product_name'],
                item.get('category'),
                item['quantity'],
                item['unit_price']
            ))
            self.stats.items_processed += 1

        self.stats.orders_processed += 1

    def validate_migration(self) -> Dict[str, Any]:
        """Validate migration results"""
        validation_results = {}

        # Count validation
        mongo_customer_count = self.mongo_db.customers.count_documents({})

        self.mysql_cursor.execute("SELECT COUNT(*) FROM customers")
        mysql_customer_count = self.mysql_cursor.fetchone()[0]

        validation_results['customer_count_match'] = mongo_customer_count == mysql_customer_count
        validation_results['mongo_customers'] = mongo_customer_count
        validation_results['mysql_customers'] = mysql_customer_count

        # Sample data validation
        self.mysql_cursor.execute("""
            SELECT c.customer_id, c.email, COUNT(o.order_id) as order_count
            FROM customers c
            LEFT JOIN orders o ON c.customer_id = o.customer_id
            GROUP BY c.customer_id, c.email
            LIMIT 10
        """)

        sample_customers = self.mysql_cursor.fetchall()
        validation_results['sample_customers'] = sample_customers

        return validation_results

    def close_connections(self):
        """Clean up database connections"""
        if self.mysql_cursor:
            self.mysql_cursor.close()
        if self.mysql_conn:
            self.mysql_conn.close()
        if self.mongo_client:
            self.mongo_client.close()

# Usage example
def main():
    mongo_config = {
        'connection_string': 'mongodb://mongo-cluster:27017',
        'database': 'ecommerce'
    }

    mysql_config = {
        'host': 'mysql-primary',
        'user': 'migration_user',
        'password': 'secure_password',
        'database': 'ecommerce_normalized',
        'autocommit': False,
        'use_unicode': True,
        'charset': 'utf8mb4'
    }

    migrator = MongoToMySQLMigrator(mongo_config, mysql_config)

    try:
        # Run migration
        migrator.migrate_customers(batch_size=500)

        # Validate results
        results = migrator.validate_migration()
        print(f"Migration validation: {results}")

    finally:
        migrator.close_connections()

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Scenario 3: SQL β†’ SQL (Oracle to PostgreSQL)

Business Case

Why Migrate?

  • πŸ’Έ Cost Reduction: Oracle licenses can cost $47,500 per processor + annual support fees
  • ⚑ Modern Features: Advanced JSON support, full-text search, extensions ecosystem
  • πŸ”“ Vendor Independence: Avoid Oracle's aggressive licensing audits and lock-in
  • 🌍 Cloud Native: Better integration with Kubernetes and cloud platforms

Migration Architecture

Architectural Diagram for SQL β†’ SQL Migration

Schema Conversion Deep Dive

Oracle vs PostgreSQL Data Type Mapping:

Oracle Type PostgreSQL Type Conversion Notes
VARCHAR2(n) VARCHAR(n) Direct mapping
NUMBER NUMERIC Precision may need adjustment
NUMBER(*) BIGINT For integer values
DATE TIMESTAMP Oracle DATE includes time
CLOB TEXT Large object handling
BLOB BYTEA Binary data storage
RAW(n) BYTEA Binary string data
XMLTYPE XML Native XML support

Advanced Schema Conversion Script:

-- Oracle source schema example
CREATE TABLE CUSTOMERS (
    CUSTOMER_ID NUMBER(10) PRIMARY KEY,
    FIRST_NAME VARCHAR2(50) NOT NULL,
    LAST_NAME VARCHAR2(100) NOT NULL,
    EMAIL VARCHAR2(255) UNIQUE,
    PHONE_NUMBER VARCHAR2(20),
    BIRTH_DATE DATE,
    CREATED_DATE DATE DEFAULT SYSDATE,
    ACCOUNT_BALANCE NUMBER(15,2) DEFAULT 0,
    CUSTOMER_DATA CLOB,
    PROFILE_IMAGE BLOB,
    STATUS NUMBER(1) CHECK (STATUS IN (0,1,2)),
    LAST_LOGIN_IP RAW(16)
);

-- Create sequence for auto-increment
CREATE SEQUENCE customer_seq START WITH 1 INCREMENT BY 1;

-- Create trigger for auto-increment
CREATE OR REPLACE TRIGGER customer_trigger
    BEFORE INSERT ON CUSTOMERS
    FOR EACH ROW
BEGIN
    IF :NEW.CUSTOMER_ID IS NULL THEN
        :NEW.CUSTOMER_ID := customer_seq.NEXTVAL;
    END IF;
END;

-- Oracle partitioned table
CREATE TABLE ORDER_HISTORY (
    ORDER_ID NUMBER(15) PRIMARY KEY,
    CUSTOMER_ID NUMBER(10) REFERENCES CUSTOMERS(CUSTOMER_ID),
    ORDER_DATE DATE NOT NULL,
    TOTAL_AMOUNT NUMBER(12,2),
    ORDER_STATUS VARCHAR2(20)
)
PARTITION BY RANGE (ORDER_DATE)
(
    PARTITION orders_2023 VALUES LESS THAN (DATE '2024-01-01'),
    PARTITION orders_2024 VALUES LESS THAN (DATE '2025-01-01'),
    PARTITION orders_future VALUES LESS THAN (MAXVALUE)
);
Enter fullscreen mode Exit fullscreen mode
-- Converted PostgreSQL schema
CREATE TABLE customers (
    customer_id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    first_name VARCHAR(50) NOT NULL,
    last_name VARCHAR(100) NOT NULL,
    email VARCHAR(255) UNIQUE,
    phone_number VARCHAR(20),
    birth_date DATE,
    created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    account_balance NUMERIC(15,2) DEFAULT 0,
    customer_data TEXT,
    profile_image BYTEA,
    status SMALLINT CHECK (status IN (0,1,2)),
    last_login_ip INET, -- PostgreSQL native IP type

    -- PostgreSQL specific indexes
    CONSTRAINT customers_email_check CHECK (email ~* '^[A-Za-z0-9._%-]+@[A-Za-z0-9.-]+[.][A-Za-z]+)
);

-- PostgreSQL native partitioning (v10+)
CREATE TABLE order_history (
    order_id BIGINT GENERATED ALWAYS AS IDENTITY,
    customer_id BIGINT REFERENCES customers(customer_id),
    order_date DATE NOT NULL,
    total_amount NUMERIC(12,2),
    order_status VARCHAR(20),

    PRIMARY KEY (order_id, order_date) -- Include partition key in PK
) PARTITION BY RANGE (order_date);

-- Create partitions
CREATE TABLE order_history_2023 PARTITION OF order_history
    FOR VALUES FROM ('2023-01-01') TO ('2024-01-01');

CREATE TABLE order_history_2024 PARTITION OF order_history
    FOR VALUES FROM ('2024-01-01') TO ('2025-01-01');

CREATE TABLE order_history_future PARTITION OF order_history
    FOR VALUES FROM ('2025-01-01') TO (UNBOUNDED);

-- PostgreSQL specific optimizations
CREATE INDEX CONCURRENTLY idx_customers_email_hash ON customers USING HASH (email);
CREATE INDEX CONCURRENTLY idx_customers_created_date_brin ON customers USING BRIN (created_date);
CREATE INDEX CONCURRENTLY idx_order_history_customer_date ON order_history (customer_id, order_date DESC);

-- Enable row-level security (PostgreSQL feature)
ALTER TABLE customers ENABLE ROW LEVEL SECURITY;

-- Create materialized view for analytics (PostgreSQL feature)
CREATE MATERIALIZED VIEW customer_order_summary AS
SELECT 
    c.customer_id,
    c.first_name,
    c.last_name,
    COUNT(oh.order_id) as total_orders,
    SUM(oh.total_amount) as total_spent,
    MAX(oh.order_date) as last_order_date
FROM customers c
LEFT JOIN order_history oh ON c.customer_id = oh.customer_id
GROUP BY c.customer_id, c.first_name, c.last_name;

-- Auto-refresh materialized view
CREATE OR REPLACE FUNCTION refresh_customer_summary()
RETURNS TRIGGER AS $
BEGIN
    REFRESH MATERIALIZED VIEW CONCURRENTLY customer_order_summary;
    RETURN NULL;
END;
$ LANGUAGE plpgsql;

CREATE TRIGGER refresh_customer_summary_trigger
    AFTER INSERT OR UPDATE OR DELETE ON order_history
    FOR EACH STATEMENT
    EXECUTE FUNCTION refresh_customer_summary();
Enter fullscreen mode Exit fullscreen mode

pgloader Configuration

LOAD DATABASE
    FROM oracle://oracle_user:password@oracle-host:1521/ORCL
    INTO postgresql://pg_user:password@postgres-host:5432/migrated_db

WITH 
    include drop,           -- Drop existing tables
    create tables,          -- Create table structure
    create indexes,         -- Recreate indexes
    reset sequences,        -- Reset identity sequences
    workers = 8,           -- Parallel workers
    concurrency = 4        -- Concurrent connections per worker

SET 
    work_mem to '256MB',
    maintenance_work_mem to '512MB'

CAST 
    type datetime to timestamptz drop default drop not null using zero-dates-to-null,
    type date drop not null drop default using zero-dates-to-null,
    column CUSTOMERS.CREATED_DATE to timestamptz using zero-dates-to-null,
    column ORDER_HISTORY.ORDER_DATE to date using zero-dates-to-null

EXCLUDING TABLE NAMES MATCHING ~/TEMP_/, ~/^BIN/

INCLUDING ONLY TABLE NAMES MATCHING 
    ~/CUSTOMERS/, ~/ORDER_HISTORY/, ~/PRODUCTS/, ~/CATEGORIES/

BEFORE LOAD DO
    $ CREATE SCHEMA IF NOT EXISTS migrated_schema; $,
    $ SET search_path TO migrated_schema, public; $

AFTER LOAD DO
    $ CREATE INDEX CONCURRENTLY idx_customers_email ON customers(email); $,
    $ CREATE INDEX CONCURRENTLY idx_orders_date ON order_history USING BRIN(order_date); $,
    $ ANALYZE; $;
Enter fullscreen mode Exit fullscreen mode

Scenario 4: On-Premises β†’ Cloud (PostgreSQL to AWS RDS)

Business Case

Why Migrate?

  • ☁️ Elastic Scalability: Auto-scaling based on demand, no hardware procurement delays
  • πŸ”§ Managed Operations: Automated backups, patching, monitoring, high availability
  • πŸ’° OPEX vs CAPEX: Pay-as-you-go model vs large upfront hardware investments
  • 🌍 Global Reach: Deploy in multiple regions with low-latency access
  • πŸ›‘οΈ Enhanced Security: AWS security services, encryption at rest/in transit

Migration Architecture

Architectural Diagram for On-Premises β†’ Cloud Migration

Step-by-Step Migration Process

Phase 1: Infrastructure Setup & Network Configuration

#!/bin/bash
# AWS Infrastructure Setup Script

# Create VPC for secure migration
aws ec2 create-vpc --cidr-block 10.0.0.0/16 --tag-specifications 'ResourceType=vpc,Tags=[{Key=Name,Value=migration-vpc}]'

# Create subnets for Multi-AZ RDS deployment
aws ec2 create-subnet --vpc-id vpc-12345678 --cidr-block 10.0.1.0/24 --availability-zone us-east-1a
aws ec2 create-subnet --vpc-id vpc-12345678 --cidr-block 10.0.2.0/24 --availability-zone us-east-1b

# Create DB subnet group
aws rds create-db-subnet-group \
    --db-subnet-group-name migration-subnet-group \
    --db-subnet-group-description "Subnet group for PostgreSQL migration" \
    --subnet-ids subnet-12345678 subnet-87654321

# Create security group for RDS
aws ec2 create-security-group \
    --group-name rds-migration-sg \
    --description "Security group for PostgreSQL RDS migration"

# Allow PostgreSQL port from on-premises network
aws ec2 authorize-security-group-ingress \
    --group-id sg-12345678 \
    --protocol tcp \
    --port 5432 \
    --cidr 192.168.0.0/16  # Your on-premises CIDR

# Set up Site-to-Site VPN
aws ec2 create-vpn-gateway --type ipsec.1
aws ec2 create-customer-gateway \
    --type ipsec.1 \
    --public-ip YOUR_PUBLIC_IP \
    --bgp-asn 65000

# Create RDS instance with appropriate configuration
aws rds create-db-instance \
    --db-instance-identifier postgres-migration-target \
    --db-instance-class db.r5.2xlarge \
    --engine postgres \
    --engine-version 15.4 \
    --master-username postgres \
    --master-user-password $(aws secretsmanager get-random-password --password-length 32 --exclude-characters "\"@/\\" --output text --query RandomPassword) \
    --allocated-storage 1000 \
    --storage-type gp3 \
    --storage-encrypted \
    --kms-key-id alias/aws/rds \
    --vpc-security-group-ids sg-12345678 \
    --db-subnet-group-name migration-subnet-group \
    --multi-az \
    --backup-retention-period 35 \
    --preferred-backup-window "03:00-04:00" \
    --preferred-maintenance-window "sun:04:00-sun:05:00" \
    --enable-performance-insights \
    --performance-insights-retention-period 7 \
    --monitoring-interval 60 \
    --monitoring-role-arn arn:aws:iam::ACCOUNT:role/rds-monitoring-role \
    --enable-cloudwatch-logs-exports postgresql \
    --deletion-protection
Enter fullscreen mode Exit fullscreen mode

Phase 2: Schema and Initial Data Migration

import boto3
import psycopg2
import subprocess
import logging
from datetime import datetime, timedelta
import json
from typing import Dict, List

class OnPremToRDSMigrator:
    def __init__(self, source_config: Dict, target_config: Dict):
        self.source_config = source_config
        self.target_config = target_config
        self.dms_client = boto3.client('dms')
        self.s3_client = boto3.client('s3')
        self.logger = logging.getLogger(__name__)

    def create_dms_replication_instance(self):
        """Create DMS replication instance for migration"""
        response = self.dms_client.create_replication_instance(
            ReplicationInstanceIdentifier='postgres-migration-instance',
            ReplicationInstanceClass='dms.r5.xlarge',
            AllocatedStorage=100,
            VpcSecurityGroupIds=['sg-12345678'],
            ReplicationSubnetGroupIdentifier='migration-subnet-group',
            MultiAZ=True,
            EngineVersion='3.4.7',
            PubliclyAccessible=False,
            Tags=[
                {
                    'Key': 'Project',
                    'Value': 'DatabaseMigration'
                },
                {
                    'Key': 'Environment',
                    'Value': 'Production'
                }
            ]
        )
        return response['ReplicationInstance']['ReplicationInstanceArn']

    def create_dms_endpoints(self):
        """Create source and target endpoints for DMS"""

        # Source endpoint (on-premises PostgreSQL)
        source_endpoint = self.dms_client.create_endpoint(
            EndpointIdentifier='postgres-onprem-source',
            EndpointType='source',
            EngineName='postgres',
            Username=self.source_config['username'],
            Password=self.source_config['password'],
            ServerName=self.source_config['host'],
            Port=self.source_config['port'],
            DatabaseName=self.source_config['database'],
            PostgreSQLSettings={
                'AfterConnectScript': 'SET search_path TO public;',
                'CaptureDdls': True,
                'MaxFileSize': 32768,
                'DatabaseName': self.source_config['database'],
                'DdlArtifactsSchema': 'public',
                'ExecuteTimeout': 60,
                'FailTasksOnLobTruncation': True,
                'HeartbeatEnable': True,
                'HeartbeatSchema': 'public',
                'HeartbeatFrequency': 5000,
                'Password': self.source_config['password'],
                'Port': self.source_config['port'],
                'ServerName': self.source_config['host'],
                'Username': self.source_config['username'],
                'SlotName': 'dms_migration_slot'
            }
        )

        # Target endpoint (RDS PostgreSQL)
        target_endpoint = self.dms_client.create_endpoint(
            EndpointIdentifier='postgres-rds-target',
            EndpointType='target',
            EngineName='postgres',
            Username=self.target_config['username'],
            Password=self.target_config['password'],
            ServerName=self.target_config['host'],
            Port=self.target_config['port'],
            DatabaseName=self.target_config['database'],
            PostgreSQLSettings={
                'AfterConnectScript': 'SET search_path TO public;',
                'DatabaseName': self.target_config['database'],
                'Password': self.target_config['password'],
                'Port': self.target_config['port'],
                'ServerName': self.target_config['host'],
                'Username': self.target_config['username']
            }
        )

        return source_endpoint, target_endpoint

    def create_migration_task(self, replication_instance_arn: str, 
                            source_endpoint_arn: str, target_endpoint_arn: str):
        """Create and start the migration task"""

        table_mappings = {
            "rules": [
                {
                    "rule-type": "selection",
                    "rule-id": "1",
                    "rule-name": "1",
                    "object-locator": {
                        "schema-name": "public",
                        "table-name": "%"
                    },
                    "rule-action": "include",
                    "filters": []
                },
                {
                    "rule-type": "transformation",
                    "rule-id": "2",
                    "rule-name": "2",
                    "rule-target": "schema",
                    "object-locator": {
                        "schema-name": "public"
                    },
                    "rule-action": "rename",
                    "value": "public"
                }
            ]
        }

        migration_task = self.dms_client.create_replication_task(
            ReplicationTaskIdentifier='postgres-migration-full-load-cdc',
            SourceEndpointArn=source_endpoint_arn,
            TargetEndpointArn=target_endpoint_arn,
            ReplicationInstanceArn=replication_instance_arn,
            MigrationType='full-load-and-cdc',
            TableMappings=json.dumps(table_mappings),
            ReplicationTaskSettings=json.dumps({
                "TargetMetadata": {
                    "TargetSchema": "",
                    "SupportLobs": True,
                    "FullLobMode": False,
                    "LobChunkSize": 0,
                    "LimitedSizeLobMode": True,
                    "LobMaxSize": 32,
                    "InlineLobMaxSize": 0,
                    "LoadMaxFileSize": 0,
                    "ParallelLoadThreads": 0,
                    "ParallelLoadBufferSize": 0,
                    "BatchApplyEnabled": True,
                    "TaskRecoveryTableEnabled": False,
                    "ParallelApplyThreads": 0,
                    "ParallelApplyBufferSize": 0,
                    "ParallelApplyQueuesPerThread": 0
                },
                "FullLoadSettings": {
                    "TargetTablePrepMode": "DROP_AND_CREATE",
                    "CreatePkAfterFullLoad": False,
                    "StopTaskCachedChangesApplied": False,
                    "StopTaskCachedChangesNotApplied": False,
                    "MaxFullLoadSubTasks": 8,
                    "TransactionConsistencyTimeout": 600,
                    "CommitRate": 10000
                },
                "Logging": {
                    "EnableLogging": True,
                    "LogComponents": [
                        {
                            "Id": "SOURCE_UNLOAD",
                            "Severity": "LOGGER_SEVERITY_DEFAULT"
                        },
                        {
                            "Id": "TARGET_LOAD",
                            "Severity": "LOGGER_SEVERITY_DEFAULT"
                        },
                        {
                            "Id": "SOURCE_CAPTURE",
                            "Severity": "LOGGER_SEVERITY_DEFAULT"
                        },
                        {
                            "Id": "TARGET_APPLY",
                            "Severity": "LOGGER_SEVERITY_DEFAULT"
                        }
                    ]
                },
                "ControlTablesSettings": {
                    "historyTimeslotInMinutes": 5,
                    "ControlSchema": "",
                    "HistoryTimeslotInMinutes": 5,
                    "HistoryTableEnabled": True,
                    "SuspendedTablesTableEnabled": True,
                    "StatusTableEnabled": True
                },
                "StreamBufferSettings": {
                    "StreamBufferCount": 3,
                    "StreamBufferSizeInMB": 8,
                    "CtrlStreamBufferSizeInMB": 5
                },
                "ChangeProcessingDdlHandlingPolicy": {
                    "HandleSourceTableDropped": True,
                    "HandleSourceTableTruncated": True,
                    "HandleSourceTableAltered": True
                },
                "ErrorBehavior": {
                    "DataErrorPolicy": "LOG_ERROR",
                    "DataTruncationErrorPolicy": "LOG_ERROR",
                    "DataErrorEscalationPolicy": "SUSPEND_TABLE",
                    "DataErrorEscalationCount": 0,
                    "TableErrorPolicy": "SUSPEND_TABLE",
                    "TableErrorEscalationPolicy": "STOP_TASK",
                    "TableErrorEscalationCount": 0,
                    "RecoverableErrorCount": -1,
                    "RecoverableErrorInterval": 5,
                    "RecoverableErrorThrottling": True,
                    "RecoverableErrorThrottlingMax": 1800,
                    "RecoverableErrorStopRetryAfterThrottlingMax": True,
                    "ApplyErrorDeletePolicy": "IGNORE_RECORD",
                    "ApplyErrorInsertPolicy": "LOG_ERROR",
                    "ApplyErrorUpdatePolicy": "LOG_ERROR",
                    "ApplyErrorEscalationPolicy": "LOG_ERROR",
                    "ApplyErrorEscalationCount": 0,
                    "ApplyErrorFailOnTruncationDdl": False,
                    "FullLoadIgnoreConflicts": True,
                    "FailOnTransactionConsistencyBreached": False,
                    "FailOnNoTablesCaptured": True
                }
            })
        )

        # Start the migration task
        self.dms_client.start_replication_task(
            ReplicationTaskArn=migration_task['ReplicationTask']['ReplicationTaskArn'],
            StartReplicationTaskType='start-replication'
        )

        return migration_task['ReplicationTask']['ReplicationTaskArn']

    def monitor_migration_progress(self, task_arn: str):
        """Monitor the migration task progress"""
        while True:
            response = self.dms_client.describe_replication_tasks(
                ReplicationTaskArns=[task_arn]
            )

            task = response['ReplicationTasks'][0]
            status = task['Status']

            if 'ReplicationTaskStats' in task:
                stats = task['ReplicationTaskStats']
                self.logger.info(f"""
                Migration Progress:
                Status: {status}
                Full Load Progress: {stats.get('FullLoadProgressPercent', 0)}%
                Tables Loaded: {stats.get('TablesLoaded', 0)}
                Tables Loading: {stats.get('TablesLoading', 0)}
                Tables Queued: {stats.get('TablesQueued', 0)}
                Tables Errored: {stats.get('TablesErrored', 0)}
                CDC Start Time: {stats.get('StartTime', 'N/A')}
                """)

            if status in ['stopped', 'failed', 'ready']:
                break

            import time
            time.sleep(30)  # Check every 30 seconds

        return status

    def validate_migration(self):
        """Validate the migration results"""
        source_conn = psycopg2.connect(**self.source_config)
        target_conn = psycopg2.connect(**self.target_config)

        validation_results = {}

        try:
            # Get list of tables to validate
            source_cursor = source_conn.cursor()
            source_cursor.execute("""
                SELECT table_name 
                FROM information_schema.tables 
                WHERE table_schema = 'public' 
                AND table_type = 'BASE TABLE'
                ORDER BY table_name
            """)
            tables = [row[0] for row in source_cursor.fetchall()]

            for table in tables:
                # Count validation
                source_cursor.execute(f"SELECT COUNT(*) FROM {table}")
                source_count = source_cursor.fetchone()[0]

                target_cursor = target_conn.cursor()
                target_cursor.execute(f"SELECT COUNT(*) FROM {table}")
                target_count = target_cursor.fetchone()[0]

                validation_results[table] = {
                    'source_count': source_count,
                    'target_count': target_count,
                    'count_match': source_count == target_count
                }

                # Sample data validation
                source_cursor.execute(f"SELECT * FROM {table} LIMIT 5")
                source_sample = source_cursor.fetchall()

                target_cursor.execute(f"SELECT * FROM {table} LIMIT 5")
                target_sample = target_cursor.fetchall()

                validation_results[table]['sample_match'] = source_sample == target_sample

        finally:
            source_conn.close()
            target_conn.close()

        return validation_results

def main():
    # Configuration
    source_config = {
        'host': 'onprem-postgres.company.local',
        'port': 5432,
        'database': 'production',
        'username': 'postgres',
        'password': 'source_password'
    }

    target_config = {
        'host': 'postgres-migration-target.cluster-xyz.us-east-1.rds.amazonaws.com',
        'port': 5432,
        'database': 'production',
        'username': 'postgres',
        'password': 'target_password'  # Retrieved from AWS Secrets Manager
    }

    migrator = OnPremToRDSMigrator(source_config, target_config)

    try:
        # Setup migration infrastructure
        print("Creating DMS replication instance...")
        replication_instance_arn = migrator.create_dms_replication_instance()

        print("Creating DMS endpoints...")
        source_endpoint, target_endpoint = migrator.create_dms_endpoints()

        print("Creating and starting migration task...")
        task_arn = migrator.create_migration_task(
            replication_instance_arn,
            source_endpoint['Endpoint']['EndpointArn'],
            target_endpoint['Endpoint']['EndpointArn']
        )

        print("Monitoring migration progress...")
        final_status = migrator.monitor_migration_progress(task_arn)

        if final_status == 'ready':
            print("Migration completed successfully!")

            print("Validating migration results...")
            validation_results = migrator.validate_migration()

            for table, results in validation_results.items():
                print(f"Table {table}: Count match = {results['count_match']}, Sample match = {results['sample_match']}")
        else:
            print(f"Migration failed with status: {final_status}")

    except Exception as e:
        print(f"Migration failed: {e}")
        raise

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Phase 3: Application Cutover Strategy

import boto3
import json
from typing import Dict, List
import time

class ApplicationCutoverManager:
    def __init__(self):
        self.route53_client = boto3.client('route53')
        self.elbv2_client = boto3.client('elbv2')
        self.ecs_client = boto3.client('ecs')

    def create_weighted_routing_policy(self, hosted_zone_id: str, 
                                     record_name: str, onprem_ip: str, rds_endpoint: str):
        """Create weighted routing to gradually shift traffic"""

        # Create record for on-premises (weight 100 initially)
        self.route53_client.change_resource_record_sets(
            HostedZoneId=hosted_zone_id,
            ChangeBatch={
                'Changes': [
                    {
                        'Action': 'CREATE',
                        'ResourceRecordSet': {
                            'Name': f'db-onprem.{record_name}',
                            'Type': 'CNAME',
                            'SetIdentifier': 'onprem-db',
                            'Weight': 100,
                            'TTL': 60,
                            'ResourceRecords': [
                                {'Value': onprem_ip}
                            ]
                        }
                    },
                    {
                        'Action': 'CREATE',
                        'ResourceRecordSet': {
                            'Name': f'db-rds.{record_name}',
                            'Type': 'CNAME',
                            'SetIdentifier': 'rds-db',
                            'Weight': 0,  # Start with 0% traffic
                            'TTL': 60,
                            'ResourceRecords': [
                                {'Value': rds_endpoint}
                            ]
                        }
                    }
                ]
            }
        )

    def update_traffic_weights(self, hosted_zone_id: str, record_name: str,
                              onprem_weight: int, rds_weight: int):
        """Update traffic distribution weights"""

        self.route53_client.change_resource_record_sets(
            HostedZoneId=hosted_zone_id,
            ChangeBatch={
                'Changes': [
                    {
                        'Action': 'UPSERT',
                        'ResourceRecordSet': {
                            'Name': f'db-onprem.{record_name}',
                            'Type': 'CNAME',
                            'SetIdentifier': 'onprem-db',
                            'Weight': onprem_weight,
                            'TTL': 60,
                            'ResourceRecords': [
                                {'Value': 'onprem-postgres.company.local'}
                            ]
                        }
                    },
                    {
                        'Action': 'UPSERT',
                        'ResourceRecordSet': {
                            'Name': f'db-rds.{record_name}',
                            'Type': 'CNAME',
                            'SetIdentifier': 'rds-db',
                            'Weight': rds_weight,
                            'TTL': 60,
                            'ResourceRecords': [
                                {'Value': 'postgres-migration-target.cluster-xyz.us-east-1.rds.amazonaws.com'}
                            ]
                        }
                    }
                ]
            }
        )

    def gradual_cutover(self, hosted_zone_id: str, record_name: str, 
                       validation_function, rollback_function):
        """Perform gradual traffic cutover with validation"""

        cutover_phases = [
            (95, 5),    # 5% to RDS
            (80, 20),   # 20% to RDS
            (50, 50),   # 50% to RDS
            (20, 80),   # 80% to RDS
            (0, 100)    # 100% to RDS
        ]

        for onprem_weight, rds_weight in cutover_phases:
            print(f"Shifting traffic: {rds_weight}% to RDS")

            # Update weights
            self.update_traffic_weights(hosted_zone_id, record_name, 
                                      onprem_weight, rds_weight)

            # Wait for DNS propagation
            time.sleep(180)  # 3 minutes

            # Validate system health
            if validation_function():
                print(f"βœ… Phase validated: {rds_weight}% traffic to RDS")
                time.sleep(600)  # Wait 10 minutes before next phase
            else:
                print(f"❌ Validation failed at {rds_weight}% traffic")
                print("Initiating rollback...")
                rollback_function()
                return False

        return True

    def emergency_rollback(self, hosted_zone_id: str, record_name: str):
        """Immediately route all traffic back to on-premises"""
        print("🚨 EMERGENCY ROLLBACK: Routing all traffic to on-premises")

        self.update_traffic_weights(hosted_zone_id, record_name, 100, 0)

        # Also update application configuration via Parameter Store
        ssm_client = boto3.client('ssm')
        ssm_client.put_parameter(
            Name='/app/database/endpoint',
            Value='onprem-postgres.company.local:5432',
            Type='String',
            Overwrite=True
        )

        # Restart ECS services to pick up new configuration
        self.restart_ecs_services()

    def restart_ecs_services(self):
        """Restart ECS services to pick up configuration changes"""
        clusters = self.ecs_client.list_clusters()['clusterArns']

        for cluster_arn in clusters:
            services = self.ecs_client.list_services(cluster=cluster_arn)['serviceArns']

            for service_arn in services:
                self.ecs_client.update_service(
                    cluster=cluster_arn,
                    service=service_arn,
                    forceNewDeployment=True
                )

def application_health_check() -> bool:
    """Validate application health during migration"""
    import requests

    try:
        # Check application endpoints
        health_endpoints = [
            'https://api.company.com/health',
            'https://app.company.com/api/status'
        ]

        for endpoint in health_endpoints:
            response = requests.get(endpoint, timeout=10)
            if response.status_code != 200:
                return False

        # Check database connection pool health
        # This would be implemented based on your specific monitoring system
        return True

    except Exception as e:
        print(f"Health check failed: {e}")
        return False

# Usage example
if __name__ == "__main__":
    cutover_manager = ApplicationCutoverManager()

    # Perform gradual cutover
    success = cutover_manager.gradual_cutover(
        hosted_zone_id='Z123456789',
        record_name='company.com',
        validation_function=application_health_check,
        rollback_function=lambda: cutover_manager.emergency_rollback('Z123456789', 'company.com')
    )

    if success:
        print("πŸŽ‰ Migration completed successfully!")
    else:
        print("πŸ’₯ Migration failed and rolled back")
Enter fullscreen mode Exit fullscreen mode

Scenario 5: Homogeneous Upgrade (MySQL 5.7 β†’ 8.0)

Business Case

Why Upgrade?

  • πŸ”’ Security Enhancements: SHA-2 authentication, improved SSL/TLS support
  • πŸš€ Performance Improvements: Up to 2x faster for read/write workloads
  • πŸ†• New Features: Window functions, CTEs, JSON improvements, descending indexes
  • πŸ“Š Better Analytics: Enhanced optimizer, histograms, invisible indexes

Upgrade Strategy

Architectural Diagram for Homogeneous Upgrade

Pre-Upgrade Compatibility Assessment

#!/bin/bash
# MySQL 5.7 to 8.0 Compatibility Check Script

echo "πŸ” MySQL 5.7 β†’ 8.0 Compatibility Assessment"
echo "============================================="

# Download MySQL Shell for upgrade checker
wget https://dev.mysql.com/get/Downloads/MySQL-Shell/mysql-shell-8.0.35-linux-glibc2.12-x86-64bit.tar.gz
tar -xzf mysql-shell-8.0.35-linux-glibc2.12-x86-64bit.tar.gz
export PATH=$PATH:$(pwd)/mysql-shell-8.0.35-linux-glibc2.12-x86-64bit/bin

# Run MySQL upgrade checker
mysqlsh --uri mysql://root:password@mysql57-host:3306 \
  --js -e "util.checkForServerUpgrade()"

# Manual compatibility checks
echo "
πŸ“‹ Manual Compatibility Checks:
===============================

1. Check for reserved words conflicts:
"
mysql -h mysql57-host -u root -p -e "
SELECT DISTINCT table_name 
FROM information_schema.tables 
WHERE table_name IN ('rank', 'dense_rank', 'percent_rank', 'cume_dist', 'ntile', 'lag', 'lead', 'first_value', 'last_value', 'nth_value')
  AND table_schema NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys');
"

echo "
2. Check for deprecated SQL modes:
"
mysql -h mysql57-host -u root -p -e "
SELECT @@sql_mode;
SHOW VARIABLES LIKE '%sql_mode%';
"

echo "
3. Check for deprecated authentication methods:
"
mysql -h mysql57-host -u root -p -e "
SELECT user, host, plugin 
FROM mysql.user 
WHERE plugin IN ('mysql_old_password', '');
"

echo "
4. Check for deprecated data types:
"
mysql -h mysql57-host -u root -p -e "
SELECT DISTINCT table_schema, table_name, column_name, data_type 
FROM information_schema.columns 
WHERE data_type IN ('year(2)', 'float(m,d)', 'double(m,d)')
  AND table_schema NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys');
"

echo "
5. Check for deprecated functions:
"
# This would require parsing application code or stored procedures
# for functions like PASSWORD(), OLD_PASSWORD(), etc.

echo "
6. Check foreign key constraint names length (must be ≀ 64 chars in 8.0):
"
mysql -h mysql57-host -u root -p -e "
SELECT constraint_schema, constraint_name, LENGTH(constraint_name) as name_length
FROM information_schema.table_constraints 
WHERE constraint_type = 'FOREIGN KEY' 
  AND LENGTH(constraint_name) > 64
  AND constraint_schema NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys');
"

echo "βœ… Compatibility assessment complete!"
Enter fullscreen mode Exit fullscreen mode

Automated Upgrade Implementation

import pymysql
import subprocess
import time
import logging
from typing import Dict, List, Tuple
import json

class MySQL57to80Upgrader:
    def __init__(self, source_config: Dict, target_config: Dict):
        self.source_config = source_config
        self.target_config = target_config
        self.logger = logging.getLogger(__name__)

    def setup_mysql80_replica(self) -> bool:
        """Set up MySQL 8.0 as a replica of MySQL 5.7"""

        try:
            # Connect to source MySQL 5.7
            source_conn = pymysql.connect(**self.source_config)
            source_cursor = source_conn.cursor()

            # Get current master status
            source_cursor.execute("SHOW MASTER STATUS")
            master_status = source_cursor.fetchone()
            binlog_file = master_status[0]
            binlog_pos = master_status[1]

            self.logger.info(f"Master status: {binlog_file}:{binlog_pos}")

            # Connect to target MySQL 8.0
            target_conn = pymysql.connect(**self.target_config)
            target_cursor = target_conn.cursor()

            # Configure replication
            replication_sql = f"""
            CHANGE MASTER TO
                MASTER_HOST='{self.source_config['host']}',
                MASTER_USER='replication_user',
                MASTER_PASSWORD='replication_password',
                MASTER_LOG_FILE='{binlog_file}',
                MASTER_LOG_POS={binlog_pos},
                MASTER_CONNECT_RETRY=10,
                MASTER_RETRY_COUNT=3600
            """

            target_cursor.execute(replication_sql)
            target_cursor.execute("START SLAVE")

            # Verify replication status
            time.sleep(5)
            target_cursor.execute("SHOW SLAVE STATUS")
            slave_status = target_cursor.fetchone()

            if slave_status and slave_status[10] == 'Yes' and slave_status[11] == 'Yes':
                self.logger.info("βœ… Replication established successfully")
                return True
            else:
                self.logger.error("❌ Replication setup failed")
                return False

        except Exception as e:
            self.logger.error(f"Replication setup error: {e}")
            return False
        finally:
            if 'source_conn' in locals():
                source_conn.close()
            if 'target_conn' in locals():
                target_conn.close()

    def wait_for_replication_sync(self, max_lag_seconds: int = 1) -> bool:
        """Wait for replication to catch up"""

        target_conn = pymysql.connect(**self.target_config)
        target_cursor = target_conn.cursor()

        try:
            for attempt in range(300):  # 5 minutes max wait
                target_cursor.execute("SHOW SLAVE STATUS")
                slave_status = target_cursor.fetchone()

                if not slave_status:
                    self.logger.error("No slave status available")
                    return False

                seconds_behind_master = slave_status[32]  # Seconds_Behind_Master column

                if seconds_behind_master is None:
                    self.logger.warning("Seconds_Behind_Master is NULL - checking IO/SQL threads")
                    io_running = slave_status[10]  # Slave_IO_Running
                    sql_running = slave_status[11]  # Slave_SQL_Running

                    if io_running != 'Yes' or sql_running != 'Yes':
                        self.logger.error(f"Replication threads not running: IO={io_running}, SQL={sql_running}")
                        return False
                    continue

                if seconds_behind_master <= max_lag_seconds:
                    self.logger.info(f"βœ… Replication synchronized (lag: {seconds_behind_master}s)")
                    return True

                self.logger.info(f"Waiting for sync... (lag: {seconds_behind_master}s)")
                time.sleep(1)

            self.logger.error("❌ Replication sync timeout")
            return False

        finally:
            target_conn.close()

    def perform_cutover(self, maintenance_window_minutes: int = 5) -> bool:
        """Perform the actual cutover to MySQL 8.0"""

        try:
            # Step 1: Stop application writes
            self.logger.info("🚦 Stopping application writes...")
            self.stop_application_writes()

            # Step 2: Wait for final replication sync
            self.logger.info("⏳ Waiting for final replication sync...")
            if not self.wait_for_replication_sync(max_lag_seconds=0):
                self.logger.error("Failed to achieve final sync")
                self.start_application_writes()  # Rollback
                return False

            # Step 3: Stop replication on MySQL 8.0
            target_conn = pymysql.connect(**self.target_config)
            target_cursor = target_conn.cursor()
            target_cursor.execute("STOP SLAVE")
            target_cursor.execute("RESET SLAVE ALL")

            # Step 4: Run mysql_upgrade on MySQL 8.0
            self.logger.info("πŸ”§ Running mysql_upgrade...")
            upgrade_result = subprocess.run([
                'mysql_upgrade',
                f'--host={self.target_config["host"]}',
                f'--user={self.target_config["user"]}',
                f'--password={self.target_config["password"]}',
                '--upgrade-system-tables',
                '--verbose'
            ], capture_output=True, text=True)

            if upgrade_result.returncode != 0:
                self.logger.error(f"mysql_upgrade failed: {upgrade_result.stderr}")
                return False

            # Step 5: Update application configuration to use MySQL 8.0
            self.logger.info("πŸ”„ Updating application configuration...")
            self.update_application_config()

            # Step 6: Restart applications
            self.logger.info("πŸš€ Restarting applications...")
            self.restart_applications()

            # Step 7: Validate functionality
            self.logger.info("βœ… Validating application functionality...")
            if not self.validate_application_health():
                self.logger.error("Application validation failed")
                return False

            self.logger.info("πŸŽ‰ MySQL 8.0 upgrade completed successfully!")
            return True

        except Exception as e:
            self.logger.error(f"Cutover failed: {e}")
            return False

    def stop_application_writes(self):
        """Stop application writes using various methods"""

        # Method 1: Update load balancer to mark database as read-only
        subprocess.run([
            'curl', '-X', 'PUT',
            'http://load-balancer-api/config/database/readonly',
            '-d', '{"readonly": true}'
        ])

        # Method 2: Set MySQL to read-only mode
        source_conn = pymysql.connect(**self.source_config)
        source_cursor = source_conn.cursor()
        source_cursor.execute("SET GLOBAL read_only = ON")
        source_cursor.execute("SET GLOBAL super_read_only = ON")
        source_conn.close()

        # Method 3: Use ProxySQL to redirect writes (if using ProxySQL)
        # This would be implemented based on your proxy configuration

    def start_application_writes(self):
        """Re-enable application writes (rollback scenario)"""

        # Re-enable writes on source MySQL 5.7
        source_conn = pymysql.connect(**self.source_config)
        source_cursor = source_conn.cursor()
        source_cursor.execute("SET GLOBAL read_only = OFF")
        source_cursor.execute("SET GLOBAL super_read_only = OFF")
        source_conn.close()

        # Update load balancer
        subprocess.run([
            'curl', '-X', 'PUT',
            'http://load-balancer-api/config/database/readonly',
            '-d', '{"readonly": false}'
        ])

    def update_application_config(self):
        """Update application configuration to point to MySQL 8.0"""

        # Example: Update Kubernetes ConfigMap
        config_update = {
            "apiVersion": "v1",
            "kind": "ConfigMap",
            "metadata": {"name": "app-config"},
            "data": {
                "database_host": self.target_config["host"],
                "database_port": str(self.target_config["port"])
            }
        }

        with open('/tmp/config-update.yaml', 'w') as f:
            import yaml
            yaml.dump(config_update, f)

        subprocess.run(['kubectl', 'apply', '-f', '/tmp/config-update.yaml'])

    def restart_applications(self):
        """Restart applications to pick up new configuration"""

        # Example: Rolling restart of Kubernetes deployments
        deployments = [
            'web-app',
            'api-service',
            'background-worker'
        ]

        for deployment in deployments:
            subprocess.run([
                'kubectl', 'rollout', 'restart', 
                f'deployment/{deployment}'
            ])

            # Wait for rollout to complete
            subprocess.run([
                'kubectl', 'rollout', 'status',
                f'deployment/{deployment}',
                '--timeout=300s'
            ])

    def validate_application_health(self) -> bool:
        """Validate that applications are working correctly with MySQL 8.0"""

        import requests

        health_checks = [
            'http://web-app/health',
            'http://api-service/health',
            'http://admin-panel/health'
        ]

        for endpoint in health_checks:
            try:
                response = requests.get(endpoint, timeout=10)
                if response.status_code != 200:
                    self.logger.error(f"Health check failed for {endpoint}: {response.status_code}")
                    return False
            except requests.RequestException as e:
                self.logger.error(f"Health check error for {endpoint}: {e}")
                return False

        # Test database connectivity and basic operations
        try:
            target_conn = pymysql.connect(**self.target_config)
            target_cursor = target_conn.cursor()

            # Test basic operations
            target_cursor.execute("SELECT 1")
            result = target_cursor.fetchone()

            if result[0] != 1:
                return False

            # Test a sample business query
            target_cursor.execute("SELECT COUNT(*) FROM users LIMIT 1")
            user_count = target_cursor.fetchone()[0]

            if user_count < 0:  # Basic sanity check
                return False

            target_conn.close()
            return True

        except Exception as e:
            self.logger.error(f"Database validation failed: {e}")
            return False

# Advanced MySQL 8.0 Feature Utilization
class MySQL80FeatureOptimizer:
    def __init__(self, connection_config: Dict):
        self.connection_config = connection_config

    def enable_new_features(self):
        """Enable and configure new MySQL 8.0 features"""

        conn = pymysql.connect(**self.connection_config)
        cursor = conn.cursor()

        try:
            # Enable histogram statistics for better query optimization
            self.logger.info("πŸ“Š Enabling histogram statistics...")
            cursor.execute("SET persist_only histogram_generation_max_mem_size = 20971520")

            # Create histograms for frequently queried columns
            tables_to_analyze = [
                ('users', 'created_at'),
                ('orders', 'order_date'),
                ('products', 'category_id'),
                ('sessions', 'user_id')
            ]

            for table, column in tables_to_analyze:
                cursor.execute(f"ANALYZE TABLE {table} UPDATE HISTOGRAM ON {column} WITH 100 BUCKETS")
                self.logger.info(f"Created histogram for {table}.{column}")

            # Enable invisible indexes for testing
            self.logger.info("πŸ” Creating invisible indexes for testing...")
            invisible_indexes = [
                "CREATE INDEX idx_users_email_invisible ON users(email) INVISIBLE",
                "CREATE INDEX idx_orders_status_invisible ON orders(status) INVISIBLE"
            ]

            for index_sql in invisible_indexes:
                try:
                    cursor.execute(index_sql)
                    self.logger.info(f"Created invisible index: {index_sql}")
                except pymysql.Error as e:
                    if "already exists" not in str(e):
                        self.logger.warning(f"Failed to create invisible index: {e}")

            # Configure new authentication
            self.logger.info("πŸ” Configuring caching_sha2_password authentication...")
            cursor.execute("SET persist default_authentication_plugin = 'caching_sha2_password'")

            # Enable JSON improvements
            self.logger.info("πŸ“„ Optimizing JSON columns...")
            cursor.execute("""
                SELECT table_schema, table_name, column_name 
                FROM information_schema.columns 
                WHERE data_type = 'json' 
                AND table_schema NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys')
            """)

            json_columns = cursor.fetchall()
            for schema, table, column in json_columns:
                # Create functional indexes on JSON paths
                try:
                    cursor.execute(f"""
                        CREATE INDEX idx_{table}_{column}_path 
                        ON {schema}.{table} ((CAST({column}->'$.id' AS UNSIGNED)))
                    """)
                    self.logger.info(f"Created JSON functional index on {schema}.{table}.{column}")
                except pymysql.Error as e:
                    if "already exists" not in str(e):
                        self.logger.warning(f"Failed to create JSON index: {e}")

            # Configure improved InnoDB settings
            self.logger.info("βš™οΈ  Configuring InnoDB optimizations...")
            innodb_settings = [
                "SET persist innodb_dedicated_server = ON",
                "SET persist innodb_redo_log_capacity = 2147483648",  # 2GB
                "SET persist innodb_parallel_read_threads = 4",
                "SET persist innodb_fsync_threshold = 0"  # Disable fsync threshold for better performance
            ]

            for setting in innodb_settings:
                try:
                    cursor.execute(setting)
                    self.logger.info(f"Applied setting: {setting}")
                except pymysql.Error as e:
                    self.logger.warning(f"Failed to apply setting {setting}: {e}")

            conn.commit()
            self.logger.info("βœ… MySQL 8.0 feature optimization complete!")

        finally:
            conn.close()

    def create_sample_ctes_and_window_functions(self):
        """Create sample views using new MySQL 8.0 features"""

        conn = pymysql.connect(**self.connection_config)
        cursor = conn.cursor()

        try:
            # Create a view using CTEs (Common Table Expressions)
            cte_view = """
            CREATE OR REPLACE VIEW user_order_analytics AS
            WITH monthly_orders AS (
                SELECT 
                    user_id,
                    DATE_FORMAT(order_date, '%Y-%m') as order_month,
                    COUNT(*) as order_count,
                    SUM(total_amount) as monthly_total
                FROM orders
                WHERE order_date >= DATE_SUB(NOW(), INTERVAL 12 MONTH)
                GROUP BY user_id, DATE_FORMAT(order_date, '%Y-%m')
            ),
            user_stats AS (
                SELECT 
                    user_id,
                    AVG(order_count) as avg_monthly_orders,
                    AVG(monthly_total) as avg_monthly_spending,
                    MIN(order_month) as first_order_month,
                    MAX(order_month) as last_order_month
                FROM monthly_orders
                GROUP BY user_id
            )
            SELECT 
                u.user_id,
                u.first_name,
                u.last_name,
                us.avg_monthly_orders,
                us.avg_monthly_spending,
                us.first_order_month,
                us.last_order_month,
                DATEDIFF(
                    STR_TO_DATE(CONCAT(us.last_order_month, '-01'), '%Y-%m-%d'),
                    STR_TO_DATE(CONCAT(us.first_order_month, '-01'), '%Y-%m-%d')
                ) / 30 as customer_lifetime_months
            FROM users u
            JOIN user_stats us ON u.user_id = us.user_id
            """

            cursor.execute(cte_view)

            # Create a view using window functions
            window_function_view = """
            CREATE OR REPLACE VIEW user_purchase_patterns AS
            SELECT 
                o.user_id,
                o.order_id,
                o.order_date,
                o.total_amount,
                ROW_NUMBER() OVER (PARTITION BY o.user_id ORDER BY o.order_date) as order_sequence,
                LAG(o.order_date) OVER (PARTITION BY o.user_id ORDER BY o.order_date) as previous_order_date,
                LEAD(o.order_date) OVER (PARTITION BY o.user_id ORDER BY o.order_date) as next_order_date,
                SUM(o.total_amount) OVER (PARTITION BY o.user_id ORDER BY o.order_date ROWS UNBOUNDED PRECEDING) as running_total,
                AVG(o.total_amount) OVER (PARTITION BY o.user_id ORDER BY o.order_date ROWS 2 PRECEDING) as moving_avg_3_orders,
                PERCENT_RANK() OVER (PARTITION BY DATE_FORMAT(o.order_date, '%Y-%m') ORDER BY o.total_amount) as monthly_spending_percentile,
                NTILE(10) OVER (PARTITION BY o.user_id ORDER BY o.total_amount DESC) as user_order_decile
            FROM orders o
            WHERE o.order_date >= DATE_SUB(NOW(), INTERVAL 12 MONTH)
            """

            cursor.execute(window_function_view)

            self.logger.info("βœ… Created advanced views using MySQL 8.0 features")

        finally:
            conn.close()

# Usage example
def main():
    source_config = {
        'host': 'mysql57-master.company.local',
        'port': 3306,
        'user': 'root',
        'password': 'mysql57_password',
        'database': 'production'
    }

    target_config = {
        'host': 'mysql80-replica.company.local',
        'port': 3306,
        'user': 'root',
        'password': 'mysql80_password',
        'database': 'production'
    }

    upgrader = MySQL57to80Upgrader(source_config, target_config)

    try:
        # Phase 1: Set up replication
        print("Phase 1: Setting up MySQL 8.0 as replica...")
        if not upgrader.setup_mysql80_replica():
            print("❌ Failed to set up replication")
            return

        # Phase 2: Wait for sync and monitor
        print("Phase 2: Monitoring replication sync...")
        if not upgrader.wait_for_replication_sync():
            print("❌ Replication sync failed")
            return

        # Phase 3: Perform cutover
        print("Phase 3: Performing cutover to MySQL 8.0...")
        if not upgrader.perform_cutover():
            print("❌ Cutover failed")
            return

        # Phase 4: Optimize for MySQL 8.0 features
        print("Phase 4: Enabling MySQL 8.0 advanced features...")
        optimizer = MySQL80FeatureOptimizer(target_config)
        optimizer.enable_new_features()
        optimizer.create_sample_ctes_and_window_functions()

        print("πŸŽ‰ MySQL 5.7 β†’ 8.0 upgrade completed successfully!")

    except Exception as e:
        print(f"πŸ’₯ Upgrade failed: {e}")

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Scenario 6: Database Consolidation

Business Case

Why Consolidate?

  • πŸ”„ Post-Acquisition Integration: Merge systems from acquired companies
  • πŸ“Š Single Source of Truth: Eliminate data silos and inconsistencies
  • πŸ› οΈ Operational Simplification: Reduce infrastructure complexity and costs
  • πŸ” Enhanced Analytics: Enable cross-system reporting and insights

Consolidation Architecture

Architectural Diagram for Database Consolidation

Comprehensive Data Consolidation Pipeline

import pandas as pd
import numpy as np
from sqlalchemy import create_engine, MetaData, Table
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import hashlib
import logging
from typing import Dict, List, Tuple, Any
from dataclasses import dataclass
import great_expectations as ge

@dataclass
class ConsolidationMapping:
    source_table: str
    target_table: str
    column_mappings: Dict[str, str]
    transformation_rules: Dict[str, str]
    deduplication_keys: List[str]
    data_quality_rules: List[str]

class DatabaseConsolidator:
    def __init__(self, source_configs: Dict[str, Dict], target_config: Dict):
        self.source_engines = {}
        for name, config in source_configs.items():
            self.source_engines[name] = create_engine(
                f"postgresql://{config['user']}:{config['password']}@{config['host']}:{config['port']}/{config['database']}"
            )

        self.target_engine = create_engine(
            f"postgresql://{target_config['user']}:{target_config['password']}@{target_config['host']}:{target_config['port']}/{target_config['database']}"
        )

        self.logger = logging.getLogger(__name__)

        # Define consolidation mappings
        self.consolidation_mappings = self.define_consolidation_mappings()

    def define_consolidation_mappings(self) -> List[ConsolidationMapping]:
        """Define how tables from different sources map to consolidated schema"""

        return [
            ConsolidationMapping(
                source_table="customers",
                target_table="unified_customers",
                column_mappings={
                    # Company A (PostgreSQL)
                    'company_a.customer_id': 'source_customer_id',
                    'company_a.first_name': 'first_name',
                    'company_a.last_name': 'last_name',
                    'company_a.email': 'email',
                    'company_a.phone': 'phone',
                    # Company B (MySQL) - different column names
                    'company_b.cust_id': 'source_customer_id',
                    'company_b.fname': 'first_name',
                    'company_b.lname': 'last_name',
                    'company_b.email_address': 'email',
                    'company_b.phone_number': 'phone',
                    # Company C (Oracle) - different structure
                    'company_c.customer_number': 'source_customer_id',
                    'company_c.full_name': 'full_name_to_split',
                    'company_c.contact_email': 'email',
                    'company_c.tel_number': 'phone'
                },
                transformation_rules={
                    'unified_customer_id': 'generate_uuid_from_source',
                    'source_system': 'extract_from_table_name',
                    'full_name_to_split': 'split_into_first_last_name',
                    'email': 'normalize_email',
                    'phone': 'normalize_phone_number',
                    'created_at': 'convert_to_utc'
                },
                deduplication_keys=['email', 'phone'],
                data_quality_rules=[
                    'email_format_valid',
                    'phone_format_valid',
                    'name_not_empty',
                    'no_test_data'
                ]
            ),

            ConsolidationMapping(
                source_table="products",
                target_table="unified_products",
                column_mappings={
                    'company_a.product_id': 'source_product_id',
                    'company_a.product_name': 'product_name',
                    'company_a.category': 'category',
                    'company_a.price': 'price',
                    'company_b.prod_id': 'source_product_id',
                    'company_b.name': 'product_name',
                    'company_b.cat': 'category',
                    'company_b.unit_price': 'price',
                    'company_c.item_code': 'source_product_id',
                    'company_c.description': 'product_name',
                    'company_c.product_category': 'category',
                    'company_c.selling_price': 'price'
                },
                transformation_rules={
                    'unified_product_id': 'generate_uuid_from_source',
                    'source_system': 'extract_from_table_name',
                    'category': 'normalize_category',
                    'price': 'convert_to_usd',
                    'product_name': 'clean_product_name'
                },
                deduplication_keys=['product_name', 'category', 'price'],
                data_quality_rules=[
                    'price_positive',
                    'product_name_not_empty',
                    'category_valid'
                ]
            )
        ]

    def extract_from_source(self, source_name: str, table_name: str) -> pd.DataFrame:
        """Extract data from source database"""

        try:
            query = f"SELECT * FROM {table_name}"
            df = pd.read_sql(query, self.source_engines[source_name])

            # Add metadata columns
            df['_source_system'] = source_name
            df['_extracted_at'] = datetime.utcnow()
            df['_source_table'] = table_name

            self.logger.info(f"Extracted {len(df)} rows from {source_name}.{table_name}")
            return df

        except Exception as e:
            self.logger.error(f"Failed to extract from {source_name}.{table_name}: {e}")
            raise

    def transform_data(self, df: pd.DataFrame, mapping: ConsolidationMapping) -> pd.DataFrame:
        """Apply transformations based on mapping rules"""

        transformed_df = df.copy()

        # Apply column mappings
        for source_col, target_col in mapping.column_mappings.items():
            if source_col in df.columns:
                transformed_df[target_col] = df[source_col]

        # Apply transformation rules
        for target_col, rule in mapping.transformation_rules.items():
            transformed_df[target_col] = self.apply_transformation_rule(transformed_df, rule, target_col)

        # Remove original columns that were mapped
        cols_to_remove = [col for col in mapping.column_mappings.keys() if col in transformed_df.columns]
        transformed_df = transformed_df.drop(columns=cols_to_remove, errors='ignore')

        return transformed_df

    def apply_transformation_rule(self, df: pd.DataFrame, rule: str, target_col: str) -> pd.Series:
        """Apply specific transformation rules"""

        if rule == 'generate_uuid_from_source':
            return df.apply(lambda row: self.generate_deterministic_uuid(
                row['_source_system'], row.get('source_customer_id', row.get('source_product_id', ''))
            ), axis=1)

        elif rule == 'extract_from_table_name':
            return df['_source_system']

        elif rule == 'split_into_first_last_name':
            # Handle full name splitting
            if 'full_name_to_split' in df.columns:
                names = df['full_name_to_split'].str.split(' ', n=1, expand=True)
                if target_col == 'first_name':
                    return names[0] if 0 in names.columns else ''
                elif target_col == 'last_name':
                    return names[1] if 1 in names.columns else ''

        elif rule == 'normalize_email':
            if 'email' in df.columns:
                return df['email'].str.lower().str.strip()

        elif rule == 'normalize_phone_number':
            if 'phone' in df.columns:
                # Remove non-digits and format as +1XXXXXXXXXX
                cleaned = df['phone'].astype(str).str.replace(r'[^\d]', '', regex=True)
                return cleaned.apply(lambda x: f"+1{x[-10:]}" if len(x) >= 10 else x)

        elif rule == 'normalize_category':
            if 'category' in df.columns:
                category_mappings = {
                    'electronics': 'Electronics',
                    'electronic': 'Electronics',
                    'tech': 'Electronics',
                    'books': 'Books',
                    'book': 'Books',
                    'clothing': 'Apparel',
                    'clothes': 'Apparel',
                    'apparel': 'Apparel'
                }
                return df['category'].str.lower().map(category_mappings).fillna(df['category'])

        elif rule == 'convert_to_usd':
            if 'price' in df.columns:
                # Implement currency conversion logic
                # This is simplified - in reality, you'd use exchange rates
                currency_multipliers = {
                    'company_a': 1.0,  # Already USD
                    'company_b': 1.0,  # Already USD  
                    'company_c': 0.85  # EUR to USD (example rate)
                }
                source_system = df['_source_system'].iloc[0] if len(df) > 0 else 'company_a'
                multiplier = currency_multipliers.get(source_system, 1.0)
                return df['price'].astype(float) * multiplier

        elif rule == 'clean_product_name':
            if 'product_name' in df.columns:
                return df['product_name'].str.strip().str.title()

        elif rule == 'convert_to_utc':
            if 'created_at' in df.columns:
                return pd.to_datetime(df['created_at']).dt.tz_localize(None)

        # Default case
        return pd.Series([''] * len(df))

    def generate_deterministic_uuid(self, source_system: str, source_id: str) -> str:
        """Generate consistent UUID from source system and ID"""
        combined = f"{source_system}:{source_id}"
        hash_object = hashlib.sha256(combined.encode())
        hex_dig = hash_object.hexdigest()
        # Format as UUID
        return f"{hex_dig[:8]}-{hex_dig[8:12]}-{hex_dig[12:16]}-{hex_dig[16:20]}-{hex_dig[20:32]}"

    def deduplicate_records(self, df: pd.DataFrame, dedup_keys: List[str]) -> pd.DataFrame:
        """Remove duplicate records based on deduplication keys"""

        if not dedup_keys or df.empty:
            return df

        # Create composite key for deduplication
        df['_dedup_key'] = df[dedup_keys].fillna('').astype(str).agg('|'.join, axis=1)

        # Sort by source system preference and extraction time to keep best record
        source_priority = {'company_a': 1, 'company_b': 2, 'company_c': 3}
        df['_source_priority'] = df['_source_system'].map(source_priority).fillna(99)

        df_sorted = df.sort_values(['_dedup_key', '_source_priority', '_extracted_at'])
        df_deduped = df_sorted.drop_duplicates(subset=['_dedup_key'], keep='first')

        # Clean up temporary columns
        df_deduped = df_deduped.drop(columns=['_dedup_key', '_source_priority'])

        duplicates_removed = len(df) - len(df_deduped)
        if duplicates_removed > 0:
            self.logger.info(f"Removed {duplicates_removed} duplicate records")

        return df_deduped

    def validate_data_quality(self, df: pd.DataFrame, quality_rules: List[str]) -> Tuple[pd.DataFrame, Dict]:
        """Validate data quality using Great Expectations"""

        validation_results = {}
        cleaned_df = df.copy()

        # Convert to Great Expectations dataset
        ge_df = ge.from_pandas(df)

        for rule in quality_rules:
            if rule == 'email_format_valid':
                result = ge_df.expect_column_values_to_match_regex(
                    'email', 
                    r'^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$'
                )
                if not result['success']:
                    invalid_emails = result['result']['unexpected_list']
                    self.logger.warning(f"Invalid emails found: {len(invalid_emails)}")
                    # Optionally filter out invalid emails
                    cleaned_df = cleaned_df[~cleaned_df['email'].isin(invalid_emails)]

                validation_results['email_format'] = result['success']

            elif rule == 'phone_format_valid':
                result = ge_df.expect_column_values_to_match_regex(
                    'phone',
                    r'^\+1\d{10}$'
                )
                validation_results['phone_format'] = result['success']

            elif rule == 'name_not_empty':
                result = ge_df.expect_column_values_to_not_be_null('first_name')
                validation_results['first_name_not_null'] = result['success']

                result = ge_df.expect_column_values_to_not_be_null('last_name') 
                validation_results['last_name_not_null'] = result['success']

            elif rule == 'no_test_data':
                # Filter out obvious test data
                test_patterns = ['test', 'demo', 'sample', 'example']
                for pattern in test_patterns:
                    if 'email' in cleaned_df.columns:
                        mask = cleaned_df['email'].str.contains(pattern, case=False, na=False)
                        test_records = mask.sum()
                        if test_records > 0:
                            self.logger.warning(f"Filtering {test_records} test records containing '{pattern}'")
                            cleaned_df = cleaned_df[~mask]

                validation_results['test_data_filtered'] = True

            elif rule == 'price_positive':
                if 'price' in cleaned_df.columns:
                    result = ge_df.expect_column_values_to_be_between('price', min_value=0, max_value=999999)
                    validation_results['price_positive'] = result['success']

                    # Filter out invalid prices
                    invalid_prices = cleaned_df['price'] <= 0
                    if invalid_prices.sum() > 0:
                        self.logger.warning(f"Filtering {invalid_prices.sum()} records with invalid prices")
                        cleaned_df = cleaned_df[~invalid_prices]

            elif rule == 'product_name_not_empty':
                if 'product_name' in cleaned_df.columns:
                    result = ge_df.expect_column_values_to_not_be_null('product_name')
                    validation_results['product_name_not_null'] = result['success']

            elif rule == 'category_valid':
                if 'category' in cleaned_df.columns:
                    valid_categories = ['Electronics', 'Books', 'Apparel', 'Home & Garden', 'Sports', 'Other']
                    result = ge_df.expect_column_values_to_be_in_set('category', valid_categories)
                    validation_results['category_valid'] = result['success']

        return cleaned_df, validation_results

    def load_to_target(self, df: pd.DataFrame, table_name: str):
        """Load transformed and validated data to target database"""

        try:
            # Use upsert logic to handle potential duplicates
            df.to_sql(
                table_name, 
                self.target_engine, 
                if_exists='append', 
                index=False,
                method='multi',
                chunksize=1000
            )

            self.logger.info(f"Loaded {len(df)} records to {table_name}")

        except Exception as e:
            self.logger.error(f"Failed to load data to {table_name}: {e}")
            raise

    def consolidate_table(self, mapping: ConsolidationMapping):
        """Consolidate a specific table from all sources"""

        consolidated_data = []

        for source_name in self.source_engines.keys():
            try:
                # Extract from source
                source_df = self.extract_from_source(source_name, mapping.source_table)

                if source_df.empty:
                    self.logger.info(f"No data found in {source_name}.{mapping.source_table}")
                    continue

                # Transform data
                transformed_df = self.transform_data(source_df, mapping)

                consolidated_data.append(transformed_df)

            except Exception as e:
                self.logger.warning(f"Failed to process {source_name}.{mapping.source_table}: {e}")
                continue

        if not consolidated_data:
            self.logger.warning(f"No data to consolidate for {mapping.target_table}")
            return

        # Combine all source data
        combined_df = pd.concat(consolidated_data, ignore_index=True)

        # Deduplicate records
        deduped_df = self.deduplicate_records(combined_df, mapping.deduplication_keys)

        # Validate data quality
        validated_df, quality_results = self.validate_data_quality(deduped_df, mapping.data_quality_rules)

        self.logger.info(f"Data quality results: {quality_results}")

        # Load to target
        self.load_to_target(validated_df, mapping.target_table)

        return {
            'source_records': len(combined_df),
            'after_deduplication': len(deduped_df),
            'final_records': len(validated_df),
            'quality_results': quality_results
        }

# Airflow DAG for orchestrating consolidation
def create_consolidation_dag():
    """Create Airflow DAG for database consolidation"""

    default_args = {
        'owner': 'data-engineering',
        'depends_on_past': False,
        'start_date': datetime(2024, 1, 1),
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
        'catchup': False
    }

    dag = DAG(
        'database_consolidation',
        default_args=default_args,
        description='Consolidate databases from acquired companies',
        schedule_interval='0 2 * * *',  # Daily at 2 AM
        max_active_runs=1,
        tags=['consolidation', 'etl', 'migration']
    )

    def run_consolidation(**context):
        source_configs = {
            'company_a': {
                'host': 'postgres-a.company.local',
                'port': 5432,
                'database': 'production',
                'user': 'readonly_user',
                'password': 'password_a'
            },
            'company_b': {
                'host': 'mysql-b.company.local',
                'port': 3306,
                'database': 'ecommerce',
                'user': 'readonly_user',
                'password': 'password_b'
            },
            'company_c': {
                'host': 'oracle-c.company.local',
                'port': 1521,
                'database': 'ORCL',
                'user': 'readonly_user',
                'password': 'password_c'
            }
        }

        target_config = {
            'host': 'consolidated-db.company.local',
            'port': 5432,
            'database': 'consolidated',
            'user': 'consolidation_user',
            'password': 'target_password'
        }

        consolidator = DatabaseConsolidator(source_configs, target_config)

        results = {}
        for mapping in consolidator.consolidation_mappings:
            result = consolidator.consolidate_table(mapping)
            results[mapping.target_table] = result

        # Log summary
        logging.info(f"Consolidation completed: {results}")

        return results

    # Define tasks
    consolidation_task = PythonOperator(
        task_id='run_consolidation',
        python_callable=run_consolidation,
        dag=dag
    )

    # Data quality checks
    quality_check_task = BashOperator(
        task_id='run_quality_checks',
        bash_command='python /opt/airflow/dags/scripts/quality_checks.py',
        dag=dag
    )

    # Update search index
    update_search_task = BashOperator(
        task_id='update_search_index',
        bash_command='python /opt/airflow/dags/scripts/update_elasticsearch.py',
        dag=dag
    )

    # Set task dependencies
    consolidation_task >> quality_check_task >> update_search_task

    return dag

# Create the DAG
consolidation_dag = create_consolidation_dag()

# Advanced consolidation with conflict resolution
class ConflictResolver:
    """Handle data conflicts during consolidation"""

    def __init__(self, priority_rules: Dict[str, List[str]]):
        self.priority_rules = priority_rules  # field -> [source1, source2, ...] in priority order

    def resolve_conflicts(self, records: List[Dict], conflict_fields: List[str]) -> Dict:
        """Resolve conflicts between multiple records of the same entity"""

        if len(records) <= 1:
            return records[0] if records else {}

        resolved_record = {}

        for field in conflict_fields:
            values = [(record.get(field), record.get('_source_system')) for record in records if record.get(field)]

            if not values:
                resolved_record[field] = None
                continue

            # Apply priority rules
            if field in self.priority_rules:
                priority_sources = self.priority_rules[field]
                for priority_source in priority_sources:
                    for value, source in values:
                        if source == priority_source:
                            resolved_record[field] = value
                            break
                    if field in resolved_record:
                        break
                else:
                    # No priority source found, use first available
                    resolved_record[field] = values[0][0]
            else:
                # No priority rule, use most recent or first available
                resolved_record[field] = values[0][0]

        # Copy non-conflicting fields from first record
        base_record = records[0]
        for key, value in base_record.items():
            if key not in conflict_fields and key not in resolved_record:
                resolved_record[key] = value

        return resolved_record
Enter fullscreen mode Exit fullscreen mode

πŸ” Observability & Monitoring

Zero-downtime migrations succeed through continuous visibility. Without proper observability, you're flying blind during the most critical phases.

Core Monitoring Stack

import prometheus_client
from prometheus_client import Counter, Histogram, Gauge
import time
import logging
from typing import Dict, Any
import psutil
import threading

class MigrationMetrics:
    def __init__(self):
        # Replication lag metrics
        self.replication_lag = Gauge(
            'migration_replication_lag_seconds',
            'Replication lag in seconds',
            ['source_db', 'target_db']
        )

        # Data validation metrics
        self.validation_errors = Counter(
            'migration_validation_errors_total',
            'Total number of data validation errors',
            ['table', 'error_type']
        )

        # Migration progress
        self.migration_progress = Gauge(
            'migration_progress_percent',
            'Migration progress percentage',
            ['phase', 'table']
        )

        # Performance metrics
        self.query_duration = Histogram(
            'migration_query_duration_seconds',
            'Query execution time',
            ['database', 'operation']
        )

        # CDC metrics
        self.cdc_events_processed = Counter(
            'migration_cdc_events_total',
            'Total CDC events processed',
            ['source_table', 'event_type']
        )

        # Error rates
        self.error_rate = Counter(
            'migration_errors_total',
            'Total migration errors',
            ['component', 'error_category']
        )

        # System resources
        self.cpu_usage = Gauge('migration_cpu_usage_percent', 'CPU usage percentage')
        self.memory_usage = Gauge('migration_memory_usage_bytes', 'Memory usage in bytes')
        self.disk_io = Counter('migration_disk_io_bytes_total', 'Total disk I/O bytes', ['direction'])

        # Start system monitoring thread
        self.monitoring_thread = threading.Thread(target=self._monitor_system_resources, daemon=True)
        self.monitoring_thread.start()

    def _monitor_system_resources(self):
        """Monitor system resources continuously"""
        while True:
            try:
                # CPU usage
                cpu_percent = psutil.cpu_percent(interval=1)
                self.cpu_usage.set(cpu_percent)

                # Memory usage
                memory = psutil.virtual_memory()
                self.memory_usage.set(memory.used)

                # Disk I/O
                disk_io = psutil.disk_io_counters()
                if disk_io:
                    self.disk_io.labels(direction='read').inc(disk_io.read_bytes)
                    self.disk_io.labels(direction='write').inc(disk_io.write_bytes)

                time.sleep(10)  # Monitor every 10 seconds

            except Exception as e:
                logging.error(f"System monitoring error: {e}")
                time.sleep(30)  # Wait longer on error

class MigrationValidator:
    """Comprehensive data validation during migration"""

    def __init__(self, source_config: Dict, target_config: Dict, metrics: MigrationMetrics):
        self.source_config = source_config
        self.target_config = target_config
        self.metrics = metrics
        self.logger = logging.getLogger(__name__)

    def validate_row_counts(self, tables: List[str]) -> Dict[str, Dict[str, int]]:
        """Compare row counts between source and target"""

        validation_results = {}

        source_conn = self._get_connection(self.source_config)
        target_conn = self._get_connection(self.target_config)

        try:
            for table in tables:
                with self.metrics.query_duration.labels(database='source', operation='count').time():
                    source_cursor = source_conn.cursor()
                    source_cursor.execute(f"SELECT COUNT(*) FROM {table}")
                    source_count = source_cursor.fetchone()[0]

                with self.metrics.query_duration.labels(database='target', operation='count').time():
                    target_cursor = target_conn.cursor()
                    target_cursor.execute(f"SELECT COUNT(*) FROM {table}")
                    target_count = target_cursor.fetchone()[0]

                validation_results[table] = {
                    'source_count': source_count,
                    'target_count': target_count,
                    'difference': abs(source_count - target_count),
                    'match': source_count == target_count
                }

                if not validation_results[table]['match']:
                    self.metrics.validation_errors.labels(
                        table=table, 
                        error_type='count_mismatch'
                    ).inc()
                    self.logger.error(
                        f"Row count mismatch for {table}: source={source_count}, target={target_count}"
                    )

        finally:
            source_conn.close()
            target_conn.close()

        return validation_results

    def validate_data_integrity(self, table: str, primary_key: str, sample_size: int = 1000) -> Dict[str, Any]:
        """Compare random sample of records between source and target"""

        source_conn = self._get_connection(self.source_config)
        target_conn = self._get_connection(self.target_config)

        try:
            # Get random sample of primary keys
            source_cursor = source_conn.cursor()
            source_cursor.execute(f"""
                SELECT {primary_key} FROM {table} 
                ORDER BY RANDOM() 
                LIMIT {sample_size}
            """)
            sample_keys = [row[0] for row in source_cursor.fetchall()]

            mismatches = []
            matches = 0

            for key in sample_keys:
                # Get source record
                source_cursor.execute(f"SELECT * FROM {table} WHERE {primary_key} = %s", (key,))
                source_record = source_cursor.fetchone()

                # Get target record
                target_cursor = target_conn.cursor()
                target_cursor.execute(f"SELECT * FROM {table} WHERE {primary_key} = %s", (key,))
                target_record = target_cursor.fetchone()

                if source_record != target_record:
                    mismatches.append({
                        'key': key,
                        'source': source_record,
                        'target': target_record
                    })
                else:
                    matches += 1

            integrity_score = (matches / len(sample_keys)) * 100 if sample_keys else 100

            if mismatches:
                self.metrics.validation_errors.labels(
                    table=table,
                    error_type='data_mismatch'
                ).inc(len(mismatches))

                self.logger.warning(f"Data integrity issues in {table}: {len(mismatches)} mismatches out of {len(sample_keys)} samples")

            return {
                'table': table,
                'sample_size': len(sample_keys),
                'matches': matches,
                'mismatches': len(mismatches),
                'integrity_score': integrity_score,
                'mismatch_details': mismatches[:10]  # First 10 mismatches for debugging
            }

        finally:
            source_conn.close()
            target_conn.close()

    def validate_schema_compatibility(self) -> Dict[str, Any]:
        """Validate schema compatibility between source and target"""

        source_conn = self._get_connection(self.source_config)
        target_conn = self._get_connection(self.target_config)

        try:
            schema_issues = []

            # Get table structures
            source_cursor = source_conn.cursor()
            source_cursor.execute("""
                SELECT table_name, column_name, data_type, is_nullable
                FROM information_schema.columns
                WHERE table_schema = 'public'
                ORDER BY table_name, ordinal_position
            """)
            source_schema = source_cursor.fetchall()

            target_cursor = target_conn.cursor()
            target_cursor.execute("""
                SELECT table_name, column_name, data_type, is_nullable
                FROM information_schema.columns
                WHERE table_schema = 'public'
                ORDER BY table_name, ordinal_position
            """)
            target_schema = target_cursor.fetchall()

            # Convert to dictionaries for comparison
            source_dict = {(row[0], row[1]): {'data_type': row[2], 'nullable': row[3]} for row in source_schema}
            target_dict = {(row[0], row[1]): {'data_type': row[2], 'nullable': row[3]} for row in target_schema}

            # Find missing columns in target
            missing_in_target = set(source_dict.keys()) - set(target_dict.keys())
            for table, column in missing_in_target:
                schema_issues.append({
                    'type': 'missing_column_in_target',
                    'table': table,
                    'column': column,
                    'source_type': source_dict[(table, column)]['data_type']
                })

            # Find extra columns in target
            extra_in_target = set(target_dict.keys()) - set(source_dict.keys())
            for table, column in extra_in_target:
                schema_issues.append({
                    'type': 'extra_column_in_target',
                    'table': table,
                    'column': column,
                    'target_type': target_dict[(table, column)]['data_type']
                })

            # Find data type mismatches
            common_columns = set(source_dict.keys()) & set(target_dict.keys())
            for table, column in common_columns:
                source_type = source_dict[(table, column)]['data_type']
                target_type = target_dict[(table, column)]['data_type']

                if source_type != target_type:
                    schema_issues.append({
                        'type': 'data_type_mismatch',
                        'table': table,
                        'column': column,
                        'source_type': source_type,
                        'target_type': target_type
                    })

            return {
                'compatible': len(schema_issues) == 0,
                'issues': schema_issues,
                'source_tables': len(set(row[0] for row in source_schema)),
                'target_tables': len(set(row[0] for row in target_schema))
            }

        finally:
            source_conn.close()
            target_conn.close()

    def _get_connection(self, config: Dict):
        """Get database connection based on config"""
        import psycopg2
        return psycopg2.connect(
            host=config['host'],
            port=config['port'],
            database=config['database'],
            user=config['user'],
            password=config['password']
        )

# Real-time monitoring dashboard with Grafana
class GrafanaDashboardExporter:
    """Export migration metrics to Grafana dashboard"""

    @staticmethod
    def generate_dashboard_json() -> Dict[str, Any]:
        """Generate Grafana dashboard configuration"""

        return {
            "dashboard": {
                "id": None,
                "title": "Database Migration Monitoring",
                "tags": ["migration", "database"],
                "timezone": "browser",
                "panels": [
                    {
                        "id": 1,
                        "title": "Replication Lag",
                        "type": "graph",
                        "targets": [
                            {
                                "expr": "migration_replication_lag_seconds",
                                "legendFormat": "{{source_db}} -> {{target_db}}"
                            }
                        ],
                        "yAxes": [
                            {
                                "label": "Seconds",
                                "min": 0
                            }
                        ],
                        "alert": {
                            "conditions": [
                                {
                                    "query": {"params": ["A", "5m", "now"]},
                                    "reducer": {"params": [], "type": "last"},
                                    "evaluator": {"params": [30], "type": "gt"}
                                }
                            ],
                            "executionErrorState": "alerting",
                            "for": "5m",
                            "frequency": "10s",
                            "handler": 1,
                            "name": "High Replication Lag",
                            "noDataState": "no_data",
                            "notifications": []
                        }
                    },
                    {
                        "id": 2,
                        "title": "Migration Progress",
                        "type": "singlestat",
                        "targets": [
                            {
                                "expr": "migration_progress_percent",
                                "legendFormat": "{{phase}}"
                            }
                        ],
                        "valueName": "current",
                        "format": "percent",
                        "thresholds": "50,80"
                    },
                    {
                        "id": 3,
                        "title": "Validation Errors",
                        "type": "graph",
                        "targets": [
                            {
                                "expr": "rate(migration_validation_errors_total[5m])",
                                "legendFormat": "{{table}} - {{error_type}}"
                            }
                        ]
                    },
                    {
                        "id": 4,
                        "title": "CDC Events Rate",
                        "type": "graph",
                        "targets": [
                            {
                                "expr": "rate(migration_cdc_events_total[1m])",
                                "legendFormat": "{{source_table}} - {{event_type}}"
                            }
                        ]
                    },
                    {
                        "id": 5,
                        "title": "System Resources",
                        "type": "graph",
                        "targets": [
                            {
                                "expr": "migration_cpu_usage_percent",
                                "legendFormat": "CPU Usage %"
                            },
                            {
                                "expr": "migration_memory_usage_bytes / 1024 / 1024 / 1024",
                                "legendFormat": "Memory Usage GB"
                            }
                        ]
                    }
                ],
                "time": {
                    "from": "now-1h",
                    "to": "now"
                },
                "refresh": "5s"
            }
        }
Enter fullscreen mode Exit fullscreen mode

πŸ›‘οΈ Risk Mitigation & Rollback Strategies

Every migration is a high-stakes operation. Having bulletproof rollback strategies isn't optionalβ€”it's mission-critical.

Rollback Strategy Matrix

Migration Phase Primary Rollback Method Secondary Method RTO Target
Schema Migration Database restore from backup Manual DDL rollback scripts 15 minutes
Bulk Data Load Stop ETL, restore target DB Truncate target tables 30 minutes
CDC Phase Stop CDC, resume from checkpoint Replay from backup 10 minutes
Dual Write Disable target writes via feature flag DNS failover to source 2 minutes
Read Traffic Feature flag flip (0% target reads) Load balancer reconfiguration 30 seconds
Full Cutover DNS failover + feature flags Emergency database swap 5 minutes

Automated Rollback Implementation

import time
import boto3
import logging
from enum import Enum
from typing import Dict, List, Optional
from dataclasses import dataclass
import json

class RollbackTrigger(Enum):
    MANUAL = "manual"
    ERROR_RATE_HIGH = "error_rate_high"
    REPLICATION_LAG_HIGH = "replication_lag_high"
    VALIDATION_FAILED = "validation_failed"
    PERFORMANCE_DEGRADED = "performance_degraded"

@dataclass
class RollbackPlan:
    trigger: RollbackTrigger
    steps: List[str]
    estimated_time_seconds: int
    prerequisites: List[str]
    validation_checks: List[str]

class EmergencyRollbackSystem:
    """Automated rollback system with multiple trigger conditions"""

    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.logger = logging.getLogger(__name__)
        self.route53_client = boto3.client('route53')
        self.elbv2_client = boto3.client('elbv2')
        self.ssm_client = boto3.client('ssm')

        # Define rollback plans
        self.rollback_plans = {
            RollbackTrigger.ERROR_RATE_HIGH: RollbackPlan(
                trigger=RollbackTrigger.ERROR_RATE_HIGH,
                steps=[
                    "disable_dual_writes",
                    "route_reads_to_source", 
                    "stop_cdc_pipeline",
                    "notify_stakeholders"
                ],
                estimated_time_seconds=120,
                prerequisites=["source_db_healthy", "dns_records_ready"],
                validation_checks=["error_rate_normal", "source_db_responsive"]
            ),

            RollbackTrigger.REPLICATION_LAG_HIGH: RollbackPlan(
                trigger=RollbackTrigger.REPLICATION_LAG_HIGH,
                steps=[
                    "pause_application_writes",
                    "wait_for_replication_catchup",
                    "resume_or_rollback_decision"
                ],
                estimated_time_seconds=300,
                prerequisites=["cdc_pipeline_running"],
                validation_checks=["replication_lag_acceptable"]
            ),

            RollbackTrigger.MANUAL: RollbackPlan(
                trigger=RollbackTrigger.MANUAL,
                steps=[
                    "confirm_rollback_decision",
                    "execute_complete_rollback",
                    "validate_source_functionality",
                    "cleanup_target_resources"
                ],
                estimated_time_seconds=600,
                prerequisites=["admin_confirmation"],
                validation_checks=["all_systems_operational"]
            )
        }

    def execute_rollback(self, trigger: RollbackTrigger, context: Dict[str, Any] = None):
        """Execute rollback based on trigger type"""

        rollback_plan = self.rollback_plans.get(trigger)
        if not rollback_plan:
            raise ValueError(f"No rollback plan defined for trigger: {trigger}")

        self.logger.critical(f"🚨 EXECUTING ROLLBACK: {trigger.value}")
        self.logger.info(f"Estimated rollback time: {rollback_plan.estimated_time_seconds} seconds")

        # Check prerequisites
        for prerequisite in rollback_plan.prerequisites:
            if not self._check_prerequisite(prerequisite):
                self.logger.error(f"Prerequisite failed: {prerequisite}")
                raise Exception(f"Cannot execute rollback - prerequisite failed: {prerequisite}")

        rollback_start = time.time()

        try:
            # Execute rollback steps
            for step in rollback_plan.steps:
                self.logger.info(f"Executing rollback step: {step}")
                self._execute_rollback_step(step, context)

            # Validate rollback success
            for validation in rollback_plan.validation_checks:
                if not self._validate_rollback_step(validation):
                    self.logger.error(f"Rollback validation failed: {validation}")
                    raise Exception(f"Rollback validation failed: {validation}")

            rollback_duration = time.time() - rollback_start
            self.logger.info(f"βœ… Rollback completed successfully in {rollback_duration:.1f} seconds")

            return {
                'success': True,
                'duration_seconds': rollback_duration,
                'trigger': trigger.value,
                'steps_executed': rollback_plan.steps
            }

        except Exception as e:
            rollback_duration = time.time() - rollback_start
            self.logger.error(f"❌ Rollback failed after {rollback_duration:.1f} seconds: {e}")

            # Attempt emergency procedures
            self._execute_emergency_procedures()

            return {
                'success': False,
                'duration_seconds': rollback_duration,
                'error': str(e),
                'trigger': trigger.value
            }

    def _check_prerequisite(self, prerequisite: str) -> bool:
        """Check if prerequisite is satisfied"""

        if prerequisite == "source_db_healthy":
            return self._check_database_health(self.config['source_db'])

        elif prerequisite == "dns_records_ready":
            return self._check_dns_records()

        elif prerequisite == "cdc_pipeline_running":
            return self._check_cdc_pipeline_status()

        elif prerequisite == "admin_confirmation":
            # In real implementation, this would check for admin approval
            return True

        return False

    def _execute_rollback_step(self, step: str, context: Dict[str, Any] = None):
        """Execute individual rollback step"""

        if step == "disable_dual_writes":
            # Set feature flag to disable writes to target database
            self.ssm_client.put_parameter(
                Name='/migration/dual_write_enabled',
                Value='false',
                Type='String',
                Overwrite=True
            )
            time.sleep(5)  # Wait for applications to pick up change

        elif step == "route_reads_to_source":
            # Update Route53 weighted routing to send 100% traffic to source
            self.route53_client.change_resource_record_sets(
                HostedZoneId=self.config['hosted_zone_id'],
                ChangeBatch={
                    'Changes': [
                        {
                            'Action': 'UPSERT',
                            'ResourceRecordSet': {
                                'Name': self.config['db_endpoint_name'],
                                'Type': 'CNAME',
                                'SetIdentifier': 'source-db',
                                'Weight': 100,
                                'TTL': 30,  # Short TTL for fast propagation
                                'ResourceRecords': [
                                    {'Value': self.config['source_db']['endpoint']}
                                ]
                            }
                        },
                        {
                            'Action': 'UPSERT',
                            'ResourceRecordSet': {
                                'Name': self.config['db_endpoint_name'],
                                'Type': 'CNAME',
                                'SetIdentifier': 'target-db',
                                'Weight': 0,  # Zero weight
                                'TTL': 30,
                                'ResourceRecords': [
                                    {'Value': self.config['target_db']['endpoint']}
                                ]
                            }
                        }
                    ]
                }
            )

        elif step == "stop_cdc_pipeline":
            # Stop CDC pipeline (implementation depends on CDC tool)
            if self.config.get('cdc_type') == 'debezium':
                self._stop_debezium_connector()
            elif self.config.get('cdc_type') == 'aws_dms':
                self._stop_dms_task()

        elif step == "notify_stakeholders":
            # Send notifications to stakeholders
            self._send_rollback_notification(context)

        elif step == "pause_application_writes":
            # Temporarily pause application writes
            self.ssm_client.put_parameter(
                Name='/migration/writes_enabled',
                Value='false',
                Type='String',
                Overwrite=True
            )

        elif step == "wait_for_replication_catchup":
            # Wait for replication to catch up
            timeout = 300  # 5 minutes
            start_time = time.time()

            while time.time() - start_time < timeout:
                lag = self._get_replication_lag()
                if lag < 1:  # Less than 1 second lag
                    break
                time.sleep(5)
            else:
                raise Exception("Replication failed to catch up within timeout")

        elif step == "resume_or_rollback_decision":
            # In real implementation, this would involve human decision or automated logic
            lag = self._get_replication_lag()
            if lag > 5:
                raise Exception("Replication lag still too high, continuing with rollback")

        # Add more rollback steps as needed

    def _validate_rollback_step(self, validation: str) -> bool:
        """Validate that rollback step was successful"""

        if validation == "error_rate_normal":
            # Check if error rate has returned to normal
            current_error_rate = self._get_current_error_rate()
            return current_error_rate < self.config.get('error_rate_threshold', 0.01)

        elif validation == "source_db_responsive":
            return self._check_database_health(self.config['source_db'])

        elif validation == "replication_lag_acceptable":
            lag = self._get_replication_lag()
            return lag < self.config.get('max_acceptable_lag', 10)

        elif validation == "all_systems_operational":
            return (self._check_database_health(self.config['source_db']) and
                   self._check_application_health())

        return True

    def _execute_emergency_procedures(self):
        """Execute emergency procedures when normal rollback fails"""

        self.logger.critical("πŸ†˜ EXECUTING EMERGENCY PROCEDURES")

        try:
            # Emergency DNS failover
            self.route53_client.change_resource_record_sets(
                HostedZoneId=self.config['hosted_zone_id'],
                ChangeBatch={
                    'Changes': [
                        {
                            'Action': 'UPSERT',
                            'ResourceRecordSet': {
                                'Name': self.config['emergency_failover_record'],
                                'Type': 'A',
                                'TTL': 60,
                                'ResourceRecords': [
                                    {'Value': self.config['emergency_ip']}
                                ]
                            }
                        }
                    ]
                }
            )

            # Set all systems to read-only mode
            self.ssm_client.put_parameter(
                Name='/emergency/read_only_mode',
                Value='true',
                Type='String',
                Overwrite=True
            )

            # Send emergency alerts
            self._send_emergency_alert()

        except Exception as e:
            self.logger.critical(f"Emergency procedures failed: {e}")

    def _get_current_error_rate(self) -> float:
        """Get current application error rate"""
        # Implementation would query monitoring system
        # This is a placeholder
        return 0.005

    def _get_replication_lag(self) -> float:
        """Get current replication lag in seconds"""
        # Implementation would query replication metrics
        # This is a placeholder
        return 2.3

    def _check_database_health(self, db_config: Dict) -> bool:
        """Check if database is healthy and responsive"""
        try:
            import psycopg2
            conn = psycopg2.connect(
                host=db_config['host'],
                port=db_config['port'],
                database=db_config['database'],
                user=db_config['health_check_user'],
                password=db_config['health_check_password'],
                connect_timeout=10
            )
            cursor = conn.cursor()
            cursor.execute("SELECT 1")
            result = cursor.fetchone()
            conn.close()
            return result[0] == 1
        except Exception as e:
            self.logger.error(f"Database health check failed: {e}")
            return False

    def _check_application_health(self) -> bool:
        """Check if applications are healthy"""
        import requests

        health_endpoints = self.config.get('health_check_endpoints', [])

        for endpoint in health_endpoints:
            try:
                response = requests.get(endpoint, timeout=10)
                if response.status_code != 200:
                    return False
            except requests.RequestException:
                return False

        return True

    def _send_rollback_notification(self, context: Dict[str, Any] = None):
        """Send rollback notification to stakeholders"""

        message = {
            'severity': 'HIGH',
            'event': 'DATABASE_MIGRATION_ROLLBACK',
            'timestamp': datetime.utcnow().isoformat(),
            'context': context or {},
            'next_steps': 'Engineering team investigating. All systems operational on source database.'
        }

        # Send to Slack, PagerDuty, email, etc.
        self.logger.info(f"Rollback notification sent: {message}")

# Automated monitoring and triggering system
class MigrationMonitor:
    """Continuously monitor migration health and trigger rollbacks if needed"""

    def __init__(self, rollback_system: EmergencyRollbackSystem, thresholds: Dict[str, float]):
        self.rollback_system = rollback_system
        self.thresholds = thresholds
        self.monitoring_active = True
        self.logger = logging.getLogger(__name__)

    def start_monitoring(self):
        """Start continuous monitoring"""

        import threading

        monitor_thread = threading.Thread(target=self._monitoring_loop, daemon=True)
        monitor_thread.start()

        self.logger.info("Migration monitoring started")

    def _monitoring_loop(self):
        """Main monitoring loop"""

        while self.monitoring_active:
            try:
                # Check error rate
                error_rate = self._get_error_rate()
                if error_rate > self.thresholds.get('error_rate', 0.05):
                    self.logger.warning(f"High error rate detected: {error_rate}")
                    if error_rate > self.thresholds.get('critical_error_rate', 0.10):
                        self._trigger_rollback(RollbackTrigger.ERROR_RATE_HIGH, {'error_rate': error_rate})

                # Check replication lag
                lag = self._get_replication_lag()
                if lag > self.thresholds.get('replication_lag', 30):
                    self.logger.warning(f"High replication lag detected: {lag}s")
                    if lag > self.thresholds.get('critical_replication_lag', 120):
                        self._trigger_rollback(RollbackTrigger.REPLICATION_LAG_HIGH, {'lag_seconds': lag})

                # Check data validation failures
                validation_failures = self._get_validation_failures()
                if validation_failures > self.thresholds.get('validation_failures', 10):
                    self.logger.warning(f"High validation failures: {validation_failures}")
                    if validation_failures > self.thresholds.get('critical_validation_failures', 50):
                        self._trigger_rollback(RollbackTrigger.VALIDATION_FAILED, {'failures': validation_failures})

                time.sleep(10)  # Check every 10 seconds

            except Exception as e:
                self.logger.error(f"Monitoring error: {e}")
                time.sleep(30)  # Longer wait on error

    def _trigger_rollback(self, trigger: RollbackTrigger, context: Dict[str, Any]):
        """Trigger automated rollback"""

        self.logger.critical(f"AUTOMATED ROLLBACK TRIGGERED: {trigger.value}")

        # Stop monitoring to prevent multiple rollback attempts
        self.monitoring_active = False

        try:
            result = self.rollback_system.execute_rollback(trigger, context)
            if result['success']:
                self.logger.info("Automated rollback completed successfully")
            else:
                self.logger.error("Automated rollback failed - manual intervention required")
        except Exception as e:
            self.logger.critical(f"Automated rollback execution failed: {e}")

Enter fullscreen mode Exit fullscreen mode

πŸ“š Operational Playbook

Pre-Migration Checklist

Infrastructure Readiness

  • [ ] Source database backup completed and verified
  • [ ] Target database provisioned with appropriate sizing
  • [ ] Network connectivity established (VPN/private links)
  • [ ] Security groups and firewall rules configured
  • [ ] Monitoring and alerting systems deployed
  • [ ] Rollback infrastructure tested and ready

Application Readiness

  • [ ] Feature flags implemented for traffic routing
  • [ ] Database connection pools configured for both databases
  • [ ] Application health checks updated
  • [ ] Load balancer configuration prepared
  • [ ] Circuit breakers configured for database failures

Team Readiness

  • [ ] Migration runbook reviewed by all team members
  • [ ] Rollback procedures tested in staging environment
  • [ ] Communication channels established (Slack, conference bridge)
  • [ ] On-call schedules arranged for migration window
  • [ ] Stakeholders notified of migration timeline

Migration Day Execution

T-2 Hours: Final Preparations

# Final backup of source database
pg_dump -h source-db -U postgres -d production > final_backup_$(date +%Y%m%d_%H%M%S).sql

# Verify target database connectivity
psql -h target-db -U postgres -d production -c "SELECT version();"

# Check replication lag (should be < 1 second)
SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp())) AS lag_seconds;

# Verify application health
curl -f http://app-health-check/status

# Start enhanced monitoring
kubectl apply -f migration-monitoring.yaml
Enter fullscreen mode Exit fullscreen mode

T-30 Minutes: Begin Migration

# Enable dual-write mode
aws ssm put-parameter --name "/migration/dual_write_enabled" --value "true" --overwrite

# Wait for applications to pick up configuration
sleep 60

# Verify dual writes are working
tail -f /var/log/application.log | grep "dual_write"
Enter fullscreen mode Exit fullscreen mode

T-15 Minutes: Start Traffic Shift

# Shift 10% of read traffic to target
python migration_scripts/update_traffic_weights.py --read-target-percent 10

# Monitor for 5 minutes
python migration_scripts/monitor_health.py --duration 300

# Shift to 50% if healthy
python migration_scripts/update_traffic_weights.py --read-target-percent 50
Enter fullscreen mode Exit fullscreen mode

T-5 Minutes: Final Cutover

# Stop writes to source database
aws ssm put-parameter --name "/migration/source_writes_enabled" --value "false" --overwrite

# Wait for final replication sync
python migration_scripts/wait_for_sync.py --max-lag-seconds 1

# Shift 100% traffic to target
python migration_scripts/update_traffic_weights.py --read-target-percent 100 --write-target-percent 100

# Verify migration success
python migration_scripts/validate_migration.py --full-check
Enter fullscreen mode Exit fullscreen mode

Post-Migration Activities

Immediate (T+1 Hour)

  • [ ] Run comprehensive data validation
  • [ ] Monitor application error rates and performance
  • [ ] Verify all critical business functions
  • [ ] Update DNS records to point to target database
  • [ ] Document any issues encountered and resolutions

Short-term (T+24 Hours)

  • [ ] Optimize target database configuration and indexes
  • [ ] Review and tune application connection pools
  • [ ] Analyze query performance and identify slow queries
  • [ ] Update backup and maintenance schedules
  • [ ] Conduct retrospective with migration team

Long-term (T+1 Week)

  • [ ] Decommission source database infrastructure
  • [ ] Update disaster recovery procedures
  • [ ] Archive migration logs and documentation
  • [ ] Update monitoring dashboards and alerts
  • [ ] Plan for next migration (if applicable)

🎯 Best Practices & Lessons Learned

The Golden Rules of Zero-Downtime Migration

1. Trust but Verify Everything

  • Assume every component can fail
  • Validate data at every stage
  • Test rollback procedures before you need them
  • Monitor everything that can be monitored

2. Gradual is Better than Big Bang

  • Bulk load β†’ CDC β†’ Dual writes β†’ Gradual cutover
  • Start with read-only workloads
  • Use percentage-based traffic shifting
  • Canary releases for critical features

3. Observability is Non-Negotiable

  • Real-time replication lag monitoring
  • Application-level health checks
  • Business metrics validation (revenue, user activity)
  • Automated alerting with clear escalation paths

4. Plan for Failure Scenarios

  • Source database failure during migration
  • Network partitions between databases
  • Application bugs exposed by new database
  • Partial data corruption or inconsistency

Common Pitfalls and How to Avoid Them

Schema Evolution During Migration

-- Problem: Schema changes applied to source but not target
ALTER TABLE users ADD COLUMN created_by_ip INET;

-- Solution: Coordinate schema changes across both databases
-- Always apply schema changes to target first, then source
Enter fullscreen mode Exit fullscreen mode

Character Encoding Issues

# Problem: Different character encodings between databases
# Source: latin1, Target: utf8

# Solution: Handle encoding conversion explicitly
def convert_encoding(text):
    if isinstance(text, str):
        return text.encode('latin1').decode('utf8', errors='replace')
    return text
Enter fullscreen mode Exit fullscreen mode

Time Zone Confusion

-- Problem: Timestamps stored differently across databases
-- Source: America/New_York, Target: UTC

-- Solution: Normalize all timestamps to UTC
UPDATE migrated_table 
SET created_at = created_at AT TIME ZONE 'America/New_York' AT TIME ZONE 'UTC';
Enter fullscreen mode Exit fullscreen mode

Foreign Key Constraint Violations

-- Problem: Data loaded out of order causes FK violations

-- Solution: Disable FK checks during bulk load, re-enable after
SET foreign_key_checks = 0;
-- Load data
SET foreign_key_checks = 1;
Enter fullscreen mode Exit fullscreen mode

Performance Optimization Techniques

Parallel Processing

# Use multiple workers for bulk data migration
from concurrent.futures import ThreadPoolExecutor
from sqlalchemy import create_engine

def migrate_table_chunk(table_name, offset, limit):
    source_engine = create_engine(SOURCE_DB_URL, pool_size=20)
    target_engine = create_engine(TARGET_DB_URL, pool_size=20)

    df = pd.read_sql(f"""
        SELECT * FROM {table_name} 
        ORDER BY id OFFSET {offset} LIMIT {limit}
    """, source_engine)

    df.to_sql(table_name, target_engine, if_exists='append', index=False)

# Process large tables in parallel chunks
with ThreadPoolExecutor(max_workers=8) as executor:
    chunk_size = 10000
    total_rows = get_table_row_count('large_table')

    futures = []
    for offset in range(0, total_rows, chunk_size):
        future = executor.submit(migrate_table_chunk, 'large_table', offset, chunk_size)
        futures.append(future)

    # Wait for all chunks to complete
    for future in futures:
        future.result()
Enter fullscreen mode Exit fullscreen mode

Index Management

-- Drop indexes before bulk load, recreate after
DROP INDEX CONCURRENTLY idx_users_email;
DROP INDEX CONCURRENTLY idx_users_created_at;

-- Bulk load data
COPY users FROM '/path/to/data.csv' WITH CSV HEADER;

-- Recreate indexes
CREATE INDEX CONCURRENTLY idx_users_email ON users(email);
CREATE INDEX CONCURRENTLY idx_users_created_at ON users(created_at);

-- Analyze tables for query planner
ANALYZE users;
Enter fullscreen mode Exit fullscreen mode

Connection Pooling

# Optimize connection pooling for migration workload
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

migration_engine = create_engine(
    DATABASE_URL,
    poolclass=QueuePool,
    pool_size=50,          # Larger pool for migration
    max_overflow=100,      # Allow bursts
    pool_pre_ping=True,    # Validate connections
    pool_recycle=3600      # Recycle connections hourly
)
Enter fullscreen mode Exit fullscreen mode

Real-World War Stories

The Case of the Vanishing Microseconds

PostgreSQL stores timestamps with microsecond precision, but MySQL only supports millisecond precision by default. During a PostgreSQL β†’ MySQL migration, we discovered that high-frequency trading timestamps were being truncated, causing order processing logic to fail.

Solution: Used MySQL 8.0's DATETIME(6) type for microsecond precision and added explicit precision handling in the ETL pipeline.

The Unicode Normalization Nightmare

Migrating user-generated content from MySQL (utf8) to PostgreSQL (utf8) should have been straightforward. However, some emoji and special characters were stored using different Unicode normalization forms, causing duplicate key violations on unique constraints.

Solution: Implemented Unicode normalization (NFC) during the transformation phase and added data quality checks for non-ASCII characters.

The Phantom Read Replica Lag

During a critical migration, replication lag suddenly spiked to 10 minutes despite no apparent load increase. The culprit was a single analytical query running on the source database that created a massive transaction, blocking replication.

Solution: Implemented query monitoring on source databases and automatic query killing for long-running transactions during migration windows.


πŸ” Observability & Validation

No migration succeeds without visibility. You need:

  • Lag monitoring β†’ replication delay, CDC offsets
  • Data validation jobs β†’ nightly row counts, hash comparisons
  • Application metrics β†’ error rates, query latency
  • User-level canaries β†’ test user accounts to verify correctness

Advanced Validation Strategies

Business Logic Validation

def validate_business_metrics():
    """Validate critical business metrics across databases"""

    # Compare daily revenue calculations
    source_revenue = get_daily_revenue(source_db, date.today())
    target_revenue = get_daily_revenue(target_db, date.today())

    revenue_diff = abs(source_revenue - target_revenue)
    if revenue_diff > 100:  # $100 threshold
        alert_finance_team(f"Revenue calculation mismatch: ${revenue_diff}")

    # Compare user activity metrics
    source_active_users = get_active_user_count(source_db)
    target_active_users = get_active_user_count(target_db)

    if abs(source_active_users - target_active_users) > 10:
        alert_product_team("Active user count mismatch")
Enter fullscreen mode Exit fullscreen mode

Automated Regression Testing

class MigrationRegressionTests:
    """Automated tests to run during migration phases"""

    def test_user_authentication(self):
        """Test user login functionality"""
        test_users = get_test_user_accounts()

        for user in test_users:
            response = authenticate_user(user.email, user.password)
            assert response.success, f"Authentication failed for {user.email}"

    def test_order_processing(self):
        """Test complete order processing workflow"""
        test_order = create_test_order()

        # Place order
        order_response = place_order(test_order)
        assert order_response.success

        # Verify in database
        db_order = get_order_by_id(order_response.order_id)
        assert db_order.status == 'pending'

        # Process payment
        payment_response = process_payment(order_response.order_id)
        assert payment_response.success

        # Verify order completion
        final_order = get_order_by_id(order_response.order_id)
        assert final_order.status == 'completed'
Enter fullscreen mode Exit fullscreen mode

πŸ›‘οΈ Rollback Strategies

Always assume failure. Options:

  1. Read-Only Freeze β†’ stop new writes, replay missed events, retry
  2. Feature-Flag Flip β†’ route reads/writes back to old DB instantly
  3. Dual DB Window β†’ keep old DB warm for X days before decommission

Decision Matrix for Rollback Triggers

Condition Automatic Rollback Manual Decision Continue
Error rate > 10% βœ… Immediate
Replication lag > 2 minutes βœ… Assess cause
Data validation failures > 100 βœ… Within 5 minutes
Performance degradation > 50% βœ… Business decision
Single user report βœ… Investigate

🎯 Final Thoughts

Zero-downtime migration is not a one-night project. It's an iterative engineering effort involving:

  • Bulk + incremental sync (ETL + CDC)
  • Dual writes for safety
  • Feature flags for cutover
  • Continuous validation
  • A tested rollback plan

The actual migration isn't the hard part β€” the hard part is:

  • Handling edge cases (schema mismatch, nulls, encoding issues)
  • Keeping the business running while you operate two DBs at once
  • Convincing management to invest in testing and observability

But done right β†’ your users won't even notice that your backend just swapped its heart out mid-flight.

Key Success Factors

Technical Excellence

  • Comprehensive testing in staging environments
  • Robust monitoring and alerting systems
  • Well-defined rollback procedures
  • Gradual, percentage-based cutover strategies

Organizational Alignment

  • Executive sponsorship and clear success metrics
  • Cross-functional team with defined roles and responsibilities
  • Regular stakeholder communication and status updates
  • Post-migration retrospectives and knowledge sharing

Risk Management

  • Conservative timelines with built-in buffer periods
  • Multiple validation checkpoints throughout the process
  • Automated rollback triggers for critical failure scenarios
  • Comprehensive disaster recovery and business continuity planning

Remember: Every successful migration starts with acknowledging that something will go wrong. The teams that succeed are those who prepare for failure, not those who assume success.


This guide represents battle-tested patterns from dozens of production migrations across various industries and scales. Each migration is unique, but the fundamental principles remain constant: prepare thoroughly, monitor continuously, and always have a way back.

Top comments (1)

Collapse
 
biru_mishra_4 profile image
Biru mishra

Helpful πŸ˜ƒ