Mission Critical: Migrating a production database is like replacing the engine of a plane mid-flight β users continue using your application while you swap the core infrastructure beneath it. One mistake can result in downtime, data corruption, or significant revenue loss.
The gold standard: Zero Downtime Migration (ZDM).
Table of Contents
- Why Migrate Databases?
- The Universal 5-Phase Migration Framework
- Migration Scenario Deep Dives
- Observability & Monitoring
- Risk Mitigation & Rollback Strategies
- Operational Playbook
- Best Practices & Lessons Learned
π― Why Migrate Databases?
Database migrations aren't undertaken lightly. Here are the primary business and technical drivers:
Business Drivers
- π° Cost Optimization: Oracle licenses can cost millions annually
- π Scalability Requirements: Handle billions of records across distributed systems
- β‘ Performance Improvements: Modern databases offer superior query performance
- π Vendor Independence: Avoid lock-in with proprietary solutions
- βοΈ Cloud Migration: Leverage managed services and elastic scaling
Technical Drivers
- π‘οΈ Security & Compliance: Modern encryption, audit trails, regulatory requirements
- π Feature Requirements: ACID transactions, complex queries, or flexible schemas
- π§© System Consolidation: Merge databases after acquisitions or reorganizations
- π Operational Excellence: Automated backups, patching, monitoring
ποΈ The Universal 5-Phase Migration Framework
Every successful zero-downtime migration follows this battle-tested pattern, regardless of source and target database types:
Phase 1: Preparation & Planning
The foundation of any successful migration. Poor planning is the #1 cause of migration failures.
Component | Description | Tools/Techniques | Duration |
---|---|---|---|
Schema Mapping | Analyze source vs target structure compatibility | AWS SCT, ora2pg, custom analysis scripts | 1-2 weeks |
Tool Selection | Choose CDC, replication, and ETL technologies | Debezium, DMS, GoldenGate, Kafka Connect | 3-5 days |
Rollback Strategy | Design failure recovery procedures | Feature flags, DNS switching, backup restoration | 1 week |
SLA Definition | Define acceptable latency and downtime thresholds | Business stakeholder meetings | 2-3 days |
Critical Planning Questions:
- What is the maximum acceptable replication lag?
- How will schema differences be handled?
- What is the rollback time requirement?
- Which application components need updates?
Phase 2: Bulk Load (Historical Data)
Transfer existing data from source to target database. This is typically the longest phase.
Implementation Strategies by Migration Type:
Migration Type | Recommended Approach | Tools | Typical Duration |
---|---|---|---|
PostgreSQL β Cassandra | Spark with JDBC + Cassandra connectors | Spark, DataStax drivers | 2-7 days |
MongoDB β MySQL | mongoexport + custom ETL scripts | Python, Pandas, SQLAlchemy | 1-3 days |
Oracle β PostgreSQL | pgloader or AWS DMS | pgloader, DMS, ora2pg | 3-10 days |
On-Prem β Cloud | Native dump/restore + parallel processing | pg_dump, aws s3 cp, parallel | 1-5 days |
Phase 3: Change Data Capture (CDC)
Real-time synchronization of ongoing changes while the bulk load completes and during the dual-write phase.
CDC Tool Selection Matrix:
Source DB | Target DB | Recommended Tool | Complexity | Latency | Cost |
---|---|---|---|---|---|
PostgreSQL | Cassandra | Debezium + Kafka + Custom Sink | Medium | <1s | $ |
MySQL | MongoDB | Debezium + Kafka + MongoDB Connector | Low | <2s | $ |
Oracle | PostgreSQL | Oracle GoldenGate + DMS | High | <5s | $$$ |
SQL Server | Azure SQL | Native CDC + Azure Data Factory | Low | <3s | $$ |
MongoDB | Elasticsearch | MongoDB Change Streams + Logstash | Medium | <2s | $ |
Phase 4: Dual Writes (Safety Layer)
Write to both databases simultaneously to ensure data consistency and provide a safety net during cutover.
import logging
from typing import Any, Dict, Optional
from dataclasses import dataclass
from enum import Enum
class WriteResult(Enum):
SUCCESS = "success"
SOURCE_FAILED = "source_failed"
TARGET_FAILED = "target_failed"
BOTH_FAILED = "both_failed"
@dataclass
class DatabaseOperation:
operation_type: str
table: str
data: Dict[str, Any]
primary_key: Optional[str] = None
class DatabaseMigrationProxy:
def __init__(self, source_db, target_db, feature_flag_service, metrics_service):
self.source_db = source_db
self.target_db = target_db
self.feature_flags = feature_flag_service
self.metrics = metrics_service
self.logger = logging.getLogger(__name__)
def execute_write(self, operation: DatabaseOperation) -> WriteResult:
"""Execute write operation with dual-write support and comprehensive error handling"""
source_success = False
target_success = False
try:
# Primary write (source database - source of truth)
self.logger.debug(f"Executing {operation.operation_type} on source DB")
source_result = self._execute_on_source(operation)
source_success = True
self.metrics.increment('source_write_success')
except Exception as e:
self.logger.error(f"Source write failed: {e}")
self.metrics.increment('source_write_failure')
return WriteResult.SOURCE_FAILED
# Secondary write (target database) - only if enabled via feature flag
if self.feature_flags.is_enabled('dual_write_mode'):
try:
self.logger.debug(f"Executing {operation.operation_type} on target DB")
transformed_operation = self._transform_for_target(operation)
target_result = self._execute_on_target(transformed_operation)
target_success = True
self.metrics.increment('target_write_success')
except Exception as e:
self.logger.warning(f"Target write failed (non-blocking): {e}")
self.metrics.increment('target_write_failure')
# Target failure is non-blocking during dual-write phase
self._enqueue_for_retry(operation)
# Log dual-write results for monitoring
if source_success and target_success:
self.metrics.increment('dual_write_success')
return WriteResult.SUCCESS
elif source_success:
return WriteResult.SUCCESS # Target failure is acceptable during migration
else:
return WriteResult.SOURCE_FAILED
def execute_read(self, query: str, params: Dict[str, Any] = None):
"""Route read operations based on migration phase"""
read_percentage = self.feature_flags.get_percentage('read_from_target')
if self._should_read_from_target(read_percentage):
try:
self.logger.debug("Reading from target database")
result = self.target_db.execute(query, params)
self.metrics.increment('target_read_success')
return result
except Exception as e:
self.logger.error(f"Target read failed, falling back to source: {e}")
self.metrics.increment('target_read_failure')
# Fallback to source on target read failure
self.logger.debug("Reading from source database")
result = self.source_db.execute(query, params)
self.metrics.increment('source_read_success')
return result
def _execute_on_source(self, operation: DatabaseOperation):
"""Execute operation on source database"""
if operation.operation_type == 'INSERT':
return self.source_db.insert(operation.table, operation.data)
elif operation.operation_type == 'UPDATE':
return self.source_db.update(operation.table, operation.data, operation.primary_key)
elif operation.operation_type == 'DELETE':
return self.source_db.delete(operation.table, operation.primary_key)
def _execute_on_target(self, operation: DatabaseOperation):
"""Execute operation on target database with schema transformation"""
if operation.operation_type == 'INSERT':
return self.target_db.insert(operation.table, operation.data)
elif operation.operation_type == 'UPDATE':
return self.target_db.update(operation.table, operation.data, operation.primary_key)
elif operation.operation_type == 'DELETE':
return self.target_db.delete(operation.table, operation.primary_key)
def _transform_for_target(self, operation: DatabaseOperation) -> DatabaseOperation:
"""Transform operation for target database schema"""
# Example transformation logic (customize based on your schema differences)
transformed_data = operation.data.copy()
# Handle column name changes
column_mappings = {
'user_id': 'id',
'created_date': 'created_at',
'modified_date': 'updated_at'
}
for old_col, new_col in column_mappings.items():
if old_col in transformed_data:
transformed_data[new_col] = transformed_data.pop(old_col)
# Handle data type conversions
if 'created_at' in transformed_data and isinstance(transformed_data['created_at'], str):
from datetime import datetime
transformed_data['created_at'] = datetime.fromisoformat(transformed_data['created_at'])
return DatabaseOperation(
operation_type=operation.operation_type,
table=operation.table,
data=transformed_data,
primary_key=operation.primary_key
)
def _should_read_from_target(self, percentage: int) -> bool:
"""Determine if read should be routed to target based on percentage rollout"""
import random
return random.randint(1, 100) <= percentage
def _enqueue_for_retry(self, operation: DatabaseOperation):
"""Enqueue failed target writes for retry processing"""
# Implementation would depend on your retry mechanism
# Could use Redis queue, database table, or message queue
self.logger.info(f"Enqueued operation for retry: {operation}")
Phase 5: Cutover & Verification
The final phase where traffic is gradually shifted to the target database with comprehensive validation.
π Migration Scenario Deep Dives
Scenario 1: SQL β NoSQL (PostgreSQL to Cassandra)
Business Case
Why Migrate?
- π Horizontal Scalability: Handle billions of IoT sensor readings across distributed commodity hardware
- β‘ High Write Throughput: Support 100K+ writes/second for real-time analytics
- π§© Flexible Schema Evolution: Dynamic user profiles and metadata that change frequently
- π Global Distribution: Multi-region deployment with local read/write capabilities
Architecture Overview
Implementation Deep Dive
Step 1: Schema Design & Transformation
PostgreSQL normalized tables need to be denormalized for Cassandra's wide-column model:
-- PostgreSQL Source Schema
CREATE TABLE users (
user_id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE user_sessions (
session_id UUID PRIMARY KEY,
user_id INTEGER REFERENCES users(user_id),
device_info JSONB,
created_at TIMESTAMP DEFAULT NOW(),
last_activity TIMESTAMP DEFAULT NOW()
);
CREATE TABLE user_events (
event_id UUID PRIMARY KEY,
user_id INTEGER REFERENCES users(user_id),
event_type VARCHAR(50),
event_data JSONB,
created_at TIMESTAMP DEFAULT NOW()
);
-- Cassandra Target Schema (Denormalized)
CREATE KEYSPACE user_data WITH REPLICATION = {
'class': 'NetworkTopologyStrategy',
'datacenter1': 3,
'datacenter2': 3
};
-- User profile table (optimized for user lookup)
CREATE TABLE user_data.users (
user_id BIGINT,
email TEXT,
name TEXT,
created_at TIMESTAMP,
session_count COUNTER,
last_login TIMESTAMP,
PRIMARY KEY (user_id)
);
-- User events table (optimized for time-series queries)
CREATE TABLE user_data.user_events_by_user (
user_id BIGINT,
event_date DATE,
event_time TIMESTAMP,
event_id UUID,
event_type TEXT,
event_data TEXT,
device_info TEXT,
PRIMARY KEY ((user_id, event_date), event_time, event_id)
) WITH CLUSTERING ORDER BY (event_time DESC);
-- Events by type table (optimized for analytics)
CREATE TABLE user_data.events_by_type (
event_type TEXT,
event_date DATE,
event_time TIMESTAMP,
user_id BIGINT,
event_id UUID,
event_data TEXT,
PRIMARY KEY ((event_type, event_date), event_time, event_id)
) WITH CLUSTERING ORDER BY (event_time DESC);
Step 2: Bulk Migration with Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, current_timestamp, date_format
from pyspark.sql.types import *
import json
def create_spark_session():
return SparkSession.builder \
.appName("PostgreSQL_to_Cassandra_Migration") \
.config("spark.cassandra.connection.host", "cassandra-cluster-1,cassandra-cluster-2") \
.config("spark.cassandra.connection.port", "9042") \
.config("spark.cassandra.auth.username", "cassandra_user") \
.config("spark.cassandra.auth.password", "secure_password") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.getOrCreate()
def migrate_users_table(spark):
"""Migrate users table with data transformation"""
# Extract from PostgreSQL with optimized partitioning
users_df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://pg-primary:5432/production") \
.option("dbtable", "users") \
.option("user", "migration_user") \
.option("password", "secure_password") \
.option("driver", "org.postgresql.Driver") \
.option("numPartitions", "10") \
.option("partitionColumn", "user_id") \
.option("lowerBound", "1") \
.option("upperBound", "1000000") \
.load()
# Transform for Cassandra
cassandra_users_df = users_df.select(
col("user_id").cast(LongType()).alias("user_id"),
col("email").alias("email"),
col("name").alias("name"),
col("created_at").alias("created_at"),
lit(0).alias("session_count"), # Initialize counter
lit(None).cast(TimestampType()).alias("last_login")
)
# Write to Cassandra with batch optimization
cassandra_users_df.write \
.format("org.apache.spark.sql.cassandra") \
.options(table="users", keyspace="user_data") \
.option("spark.cassandra.output.batch.size.rows", "1000") \
.option("spark.cassandra.output.concurrent.writes", "10") \
.mode("append") \
.save()
return users_df.count()
def migrate_events_with_denormalization(spark):
"""Migrate events with denormalization for time-series optimization"""
# Join users and events for denormalization
query = """
(SELECT
e.event_id,
e.user_id,
u.email,
u.name,
e.event_type,
e.event_data,
s.device_info,
e.created_at
FROM user_events e
JOIN users u ON e.user_id = u.user_id
LEFT JOIN user_sessions s ON s.user_id = e.user_id
ORDER BY e.created_at) as events_with_user_data
"""
events_df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://pg-primary:5432/production") \
.option("dbtable", query) \
.option("user", "migration_user") \
.option("password", "secure_password") \
.load()
# Transform for Cassandra time-series tables
events_by_user_df = events_df.select(
col("user_id").cast(LongType()),
date_format(col("created_at"), "yyyy-MM-dd").cast(DateType()).alias("event_date"),
col("created_at").alias("event_time"),
col("event_id"),
col("event_type"),
col("event_data").cast(StringType()),
when(col("device_info").isNotNull(), col("device_info").cast(StringType())).alias("device_info")
)
# Write to user-centric events table
events_by_user_df.write \
.format("org.apache.spark.sql.cassandra") \
.options(table="user_events_by_user", keyspace="user_data") \
.option("spark.cassandra.output.batch.size.rows", "500") \
.mode("append") \
.save()
# Write to type-centric events table for analytics
events_by_type_df = events_df.select(
col("event_type"),
date_format(col("created_at"), "yyyy-MM-dd").cast(DateType()).alias("event_date"),
col("created_at").alias("event_time"),
col("user_id").cast(LongType()),
col("event_id"),
col("event_data").cast(StringType())
)
events_by_type_df.write \
.format("org.apache.spark.sql.cassandra") \
.options(table="events_by_type", keyspace="user_data") \
.option("spark.cassandra.output.batch.size.rows", "500") \
.mode("append") \
.save()
def main():
spark = create_spark_session()
try:
print("Starting PostgreSQL to Cassandra migration...")
# Migrate users table
user_count = migrate_users_table(spark)
print(f"Migrated {user_count} users")
# Migrate events with denormalization
migrate_events_with_denormalization(spark)
print("Events migration completed")
print("Bulk migration completed successfully!")
except Exception as e:
print(f"Migration failed: {e}")
raise
finally:
spark.stop()
if __name__ == "__main__":
main()
Step 3: Real-time CDC with Debezium
# Debezium PostgreSQL Connector Configuration
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: postgres-cassandra-connector
labels:
strimzi.io/cluster: migration-cluster
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 3
config:
database.hostname: pg-primary.production.local
database.port: 5432
database.user: debezium_user
database.password: secure_password
database.dbname: production
database.server.name: production-postgres
# Table filtering
table.include.list: public.users,public.user_sessions,public.user_events
# Change event routing
transforms: route
transforms.route.type: io.debezium.transforms.ByLogicalTableRouter
transforms.route.topic.regex: production-postgres.public.(.*)
transforms.route.topic.replacement: postgres.events.$1
# Snapshot configuration
snapshot.mode: initial
slot.name: debezium_migration_slot
# Performance tuning
max.batch.size: 2048
max.queue.size: 8192
# Schema evolution
include.schema.changes: true
schema.history.internal.kafka.topic: schema-changes.production-postgres
schema.history.internal.kafka.bootstrap.servers: kafka-cluster:9092
Step 4: Custom Kafka β Cassandra Sink Connector
@Component
public class CassandraSinkConnector extends SinkConnector {
private Map<String, String> configProps;
@Override
public void start(Map<String, String> props) {
this.configProps = props;
}
@Override
public Class<? extends Task> taskClass() {
return CassandraSinkTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
taskConfigs.add(new HashMap<>(configProps));
}
return taskConfigs;
}
}
@Component
public class CassandraSinkTask extends SinkTask {
private CqlSession cassandraSession;
private Map<String, PreparedStatement> preparedStatements;
@Override
public void start(Map<String, String> props) {
// Initialize Cassandra connection
cassandraSession = CqlSession.builder()
.addContactPoint(new InetSocketAddress(props.get("cassandra.host"), 9042))
.withAuthCredentials(props.get("cassandra.username"), props.get("cassandra.password"))
.withKeyspace("user_data")
.build();
// Prepare statements for each table
prepareStatements();
}
@Override
public void put(Collection<SinkRecord> records) {
for (SinkRecord record : records) {
try {
processRecord(record);
} catch (Exception e) {
log.error("Failed to process record: {}", record, e);
// Implement retry logic or DLQ
handleFailedRecord(record, e);
}
}
}
private void processRecord(SinkRecord record) {
String tableName = extractTableName(record.topic());
Struct payload = (Struct) record.value();
String operation = payload.getString("op"); // c=create, u=update, d=delete
switch (operation) {
case "c":
case "u":
handleUpsert(tableName, payload.getStruct("after"));
break;
case "d":
handleDelete(tableName, payload.getStruct("before"));
break;
}
}
private void handleUpsert(String tableName, Struct data) {
switch (tableName) {
case "users":
upsertUser(data);
break;
case "user_events":
insertUserEvent(data);
break;
case "user_sessions":
upsertUserSession(data);
break;
}
}
private void upsertUser(Struct userData) {
PreparedStatement stmt = preparedStatements.get("upsert_user");
cassandraSession.execute(stmt.bind(
userData.getInt64("user_id"),
userData.getString("email"),
userData.getString("name"),
Instant.ofEpochMilli(userData.getInt64("created_at"))
));
}
private void insertUserEvent(Struct eventData) {
// Insert into both user-centric and type-centric tables for denormalization
Long userId = eventData.getInt64("user_id");
String eventType = eventData.getString("event_type");
Instant eventTime = Instant.ofEpochMilli(eventData.getInt64("created_at"));
LocalDate eventDate = eventTime.atZone(ZoneOffset.UTC).toLocalDate();
// Insert into user_events_by_user table
PreparedStatement userEventStmt = preparedStatements.get("insert_user_event");
cassandraSession.execute(userEventStmt.bind(
userId,
eventDate,
eventTime,
UUID.fromString(eventData.getString("event_id")),
eventType,
eventData.getString("event_data"),
eventData.getString("device_info")
));
// Insert into events_by_type table for analytics
PreparedStatement typeEventStmt = preparedStatements.get("insert_event_by_type");
cassandraSession.execute(typeEventStmt.bind(
eventType,
eventDate,
eventTime,
userId,
UUID.fromString(eventData.getString("event_id")),
eventData.getString("event_data")
));
}
private void prepareStatements() {
preparedStatements = new HashMap<>();
preparedStatements.put("upsert_user",
cassandraSession.prepare(
"INSERT INTO users (user_id, email, name, created_at) VALUES (?, ?, ?, ?)"
));
preparedStatements.put("insert_user_event",
cassandraSession.prepare(
"INSERT INTO user_events_by_user (user_id, event_date, event_time, event_id, event_type, event_data, device_info) VALUES (?, ?, ?, ?, ?, ?, ?)"
));
preparedStatements.put("insert_event_by_type",
cassandraSession.prepare(
"INSERT INTO events_by_type (event_type, event_date, event_time, user_id, event_id, event_data) VALUES (?, ?, ?, ?, ?, ?)"
));
}
}
Scenario 2: NoSQL β SQL (MongoDB to MySQL)
Business Case
Why Migrate?
- π° ACID Transactions: Critical for financial operations, order processing, payment systems
- π Complex Analytical Queries: JOINs, aggregations, window functions for business intelligence
- π‘οΈ Data Integrity Enforcement: Foreign key constraints, check constraints, schema validation
- π Mature Ecosystem: Better tooling for reporting, data warehousing, and analytics
Schema Transformation Strategy
MongoDB's document-oriented structure needs careful normalization for relational databases:
// MongoDB Document Structure (Before)
{
"_id": ObjectId("507f1f77bcf86cd799439011"),
"customer_id": 12345,
"profile": {
"first_name": "Alice",
"last_name": "Johnson",
"email": "alice@example.com",
"phone": "+1-555-0123",
"address": {
"street": "123 Main St",
"city": "San Francisco",
"state": "CA",
"zip_code": "94105"
},
"preferences": {
"marketing_emails": true,
"sms_notifications": false,
"theme": "dark"
}
},
"orders": [
{
"order_id": 1001,
"order_date": ISODate("2024-03-15T10:30:00Z"),
"total_amount": 299.99,
"status": "completed",
"items": [
{
"product_id": "PROD123",
"product_name": "Wireless Headphones",
"quantity": 1,
"unit_price": 199.99,
"category": "Electronics"
},
{
"product_id": "PROD456",
"product_name": "Phone Case",
"quantity": 2,
"unit_price": 50.00,
"category": "Accessories"
}
],
"shipping": {
"method": "express",
"cost": 15.99,
"tracking_number": "TRACK123456"
}
}
],
"payment_methods": [
{
"method_id": "pm_001",
"type": "credit_card",
"last_four": "4242",
"expires": "12/25",
"is_default": true
}
]
}
-- MySQL Normalized Schema (After)
-- Customer core information
CREATE TABLE customers (
customer_id BIGINT PRIMARY KEY,
first_name VARCHAR(100) NOT NULL,
last_name VARCHAR(100) NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
phone VARCHAR(20),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_email (email),
INDEX idx_phone (phone)
) ENGINE=InnoDB;
-- Customer addresses (normalized)
CREATE TABLE customer_addresses (
address_id BIGINT AUTO_INCREMENT PRIMARY KEY,
customer_id BIGINT NOT NULL,
address_type ENUM('billing', 'shipping', 'both') DEFAULT 'both',
street VARCHAR(255) NOT NULL,
city VARCHAR(100) NOT NULL,
state VARCHAR(50) NOT NULL,
zip_code VARCHAR(10) NOT NULL,
country VARCHAR(50) DEFAULT 'US',
is_active BOOLEAN DEFAULT TRUE,
FOREIGN KEY (customer_id) REFERENCES customers(customer_id) ON DELETE CASCADE,
INDEX idx_customer_addresses (customer_id)
) ENGINE=InnoDB;
-- Customer preferences (JSON for flexibility)
CREATE TABLE customer_preferences (
customer_id BIGINT PRIMARY KEY,
marketing_emails BOOLEAN DEFAULT FALSE,
sms_notifications BOOLEAN DEFAULT FALSE,
theme ENUM('light', 'dark', 'auto') DEFAULT 'light',
preferences_json JSON, -- Store additional preferences as JSON
FOREIGN KEY (customer_id) REFERENCES customers(customer_id) ON DELETE CASCADE
) ENGINE=InnoDB;
-- Orders table
CREATE TABLE orders (
order_id BIGINT PRIMARY KEY,
customer_id BIGINT NOT NULL,
order_date TIMESTAMP NOT NULL,
total_amount DECIMAL(12, 2) NOT NULL,
status ENUM('pending', 'processing', 'shipped', 'completed', 'cancelled') NOT NULL,
shipping_method VARCHAR(50),
shipping_cost DECIMAL(8, 2),
tracking_number VARCHAR(100),
FOREIGN KEY (customer_id) REFERENCES customers(customer_id),
INDEX idx_customer_orders (customer_id, order_date),
INDEX idx_order_date (order_date),
INDEX idx_order_status (status)
) ENGINE=InnoDB;
-- Order items table
CREATE TABLE order_items (
item_id BIGINT AUTO_INCREMENT PRIMARY KEY,
order_id BIGINT NOT NULL,
product_id VARCHAR(50) NOT NULL,
product_name VARCHAR(255) NOT NULL,
category VARCHAR(100),
quantity INT NOT NULL,
unit_price DECIMAL(10, 2) NOT NULL,
total_price DECIMAL(10, 2) GENERATED ALWAYS AS (quantity * unit_price) STORED,
FOREIGN KEY (order_id) REFERENCES orders(order_id) ON DELETE CASCADE,
INDEX idx_order_items (order_id),
INDEX idx_product (product_id),
INDEX idx_category (category)
) ENGINE=InnoDB;
-- Payment methods table
CREATE TABLE payment_methods (
method_id VARCHAR(50) PRIMARY KEY,
customer_id BIGINT NOT NULL,
payment_type ENUM('credit_card', 'debit_card', 'paypal', 'apple_pay') NOT NULL,
last_four VARCHAR(4),
expires VARCHAR(7), -- MM/YY format
is_default BOOLEAN DEFAULT FALSE,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (customer_id) REFERENCES customers(customer_id) ON DELETE CASCADE,
INDEX idx_customer_payment_methods (customer_id)
) ENGINE=InnoDB;
-- Ensure only one default payment method per customer
CREATE TRIGGER ensure_one_default_payment_method
BEFORE INSERT ON payment_methods
FOR EACH ROW
BEGIN
IF NEW.is_default = TRUE THEN
UPDATE payment_methods
SET is_default = FALSE
WHERE customer_id = NEW.customer_id AND is_default = TRUE;
END IF;
END;
Advanced ETL Pipeline
import pymongo
import mysql.connector
from mysql.connector import Error
import json
import logging
from datetime import datetime
from typing import Dict, List, Any, Optional
import pandas as pd
from dataclasses import dataclass
@dataclass
class MigrationStats:
customers_processed: int = 0
orders_processed: int = 0
items_processed: int = 0
errors: int = 0
class MongoToMySQLMigrator:
def __init__(self, mongo_config: Dict, mysql_config: Dict):
self.mongo_client = pymongo.MongoClient(mongo_config['connection_string'])
self.mongo_db = self.mongo_client[mongo_config['database']]
self.mysql_conn = mysql.connector.connect(**mysql_config)
self.mysql_cursor = self.mysql_conn.cursor()
self.stats = MigrationStats()
self.logger = logging.getLogger(__name__)
# Prepare SQL statements
self.prepare_sql_statements()
def prepare_sql_statements(self):
"""Prepare all SQL statements for better performance"""
self.sql_statements = {
'insert_customer': """
INSERT INTO customers (customer_id, first_name, last_name, email, phone)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
first_name = VALUES(first_name),
last_name = VALUES(last_name),
email = VALUES(email),
phone = VALUES(phone)
""",
'insert_address': """
INSERT INTO customer_addresses
(customer_id, address_type, street, city, state, zip_code, country)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""",
'insert_preferences': """
INSERT INTO customer_preferences
(customer_id, marketing_emails, sms_notifications, theme, preferences_json)
VALUES (%s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
marketing_emails = VALUES(marketing_emails),
sms_notifications = VALUES(sms_notifications),
theme = VALUES(theme),
preferences_json = VALUES(preferences_json)
""",
'insert_order': """
INSERT INTO orders
(order_id, customer_id, order_date, total_amount, status,
shipping_method, shipping_cost, tracking_number)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
total_amount = VALUES(total_amount),
status = VALUES(status)
""",
'insert_order_item': """
INSERT INTO order_items
(order_id, product_id, product_name, category, quantity, unit_price)
VALUES (%s, %s, %s, %s, %s, %s)
""",
'insert_payment_method': """
INSERT INTO payment_methods
(method_id, customer_id, payment_type, last_four, expires, is_default)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
is_default = VALUES(is_default),
is_active = VALUES(is_active)
"""
}
def migrate_customers(self, batch_size: int = 1000):
"""Migrate customers with batch processing"""
collection = self.mongo_db.customers
total_docs = collection.count_documents({})
self.logger.info(f"Starting migration of {total_docs} customer documents")
batch = []
processed = 0
try:
for doc in collection.find().batch_size(batch_size):
try:
batch.append(doc)
if len(batch) >= batch_size:
self.process_customer_batch(batch)
processed += len(batch)
self.logger.info(f"Processed {processed}/{total_docs} customers")
batch = []
except Exception as e:
self.logger.error(f"Error processing customer {doc.get('customer_id')}: {e}")
self.stats.errors += 1
# Process remaining batch
if batch:
self.process_customer_batch(batch)
processed += len(batch)
self.mysql_conn.commit()
self.logger.info(f"Customer migration completed. Processed: {processed}, Errors: {self.stats.errors}")
except Exception as e:
self.mysql_conn.rollback()
self.logger.error(f"Batch processing failed: {e}")
raise
def process_customer_batch(self, customer_batch: List[Dict]):
"""Process a batch of customers with all related data"""
for doc in customer_batch:
try:
# Extract customer data
customer_id = doc['customer_id']
profile = doc.get('profile', {})
# Insert customer
self.mysql_cursor.execute(self.sql_statements['insert_customer'], (
customer_id,
profile.get('first_name'),
profile.get('last_name'),
profile.get('email'),
profile.get('phone')
))
# Insert address if present
if 'address' in profile:
address = profile['address']
self.mysql_cursor.execute(self.sql_statements['insert_address'], (
customer_id,
'both', # default address type
address.get('street'),
address.get('city'),
address.get('state'),
address.get('zip_code'),
address.get('country', 'US')
))
# Insert preferences
preferences = profile.get('preferences', {})
self.mysql_cursor.execute(self.sql_statements['insert_preferences'], (
customer_id,
preferences.get('marketing_emails', False),
preferences.get('sms_notifications', False),
preferences.get('theme', 'light'),
json.dumps(preferences) if preferences else None
))
# Process orders
orders = doc.get('orders', [])
for order in orders:
self.process_order(customer_id, order)
# Process payment methods
payment_methods = doc.get('payment_methods', [])
for pm in payment_methods:
self.mysql_cursor.execute(self.sql_statements['insert_payment_method'], (
pm.get('method_id'),
customer_id,
pm.get('type'),
pm.get('last_four'),
pm.get('expires'),
pm.get('is_default', False)
))
self.stats.customers_processed += 1
except Exception as e:
self.logger.error(f"Error processing customer {customer_id}: {e}")
self.stats.errors += 1
def process_order(self, customer_id: int, order: Dict):
"""Process individual order with items"""
order_id = order['order_id']
# Parse order date
order_date = order['order_date']
if isinstance(order_date, str):
order_date = datetime.fromisoformat(order_date.replace('Z', '+00:00'))
# Insert order
shipping = order.get('shipping', {})
self.mysql_cursor.execute(self.sql_statements['insert_order'], (
order_id,
customer_id,
order_date,
order['total_amount'],
order['status'],
shipping.get('method'),
shipping.get('cost'),
shipping.get('tracking_number')
))
# Insert order items
items = order.get('items', [])
for item in items:
self.mysql_cursor.execute(self.sql_statements['insert_order_item'], (
order_id,
item['product_id'],
item['product_name'],
item.get('category'),
item['quantity'],
item['unit_price']
))
self.stats.items_processed += 1
self.stats.orders_processed += 1
def validate_migration(self) -> Dict[str, Any]:
"""Validate migration results"""
validation_results = {}
# Count validation
mongo_customer_count = self.mongo_db.customers.count_documents({})
self.mysql_cursor.execute("SELECT COUNT(*) FROM customers")
mysql_customer_count = self.mysql_cursor.fetchone()[0]
validation_results['customer_count_match'] = mongo_customer_count == mysql_customer_count
validation_results['mongo_customers'] = mongo_customer_count
validation_results['mysql_customers'] = mysql_customer_count
# Sample data validation
self.mysql_cursor.execute("""
SELECT c.customer_id, c.email, COUNT(o.order_id) as order_count
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.email
LIMIT 10
""")
sample_customers = self.mysql_cursor.fetchall()
validation_results['sample_customers'] = sample_customers
return validation_results
def close_connections(self):
"""Clean up database connections"""
if self.mysql_cursor:
self.mysql_cursor.close()
if self.mysql_conn:
self.mysql_conn.close()
if self.mongo_client:
self.mongo_client.close()
# Usage example
def main():
mongo_config = {
'connection_string': 'mongodb://mongo-cluster:27017',
'database': 'ecommerce'
}
mysql_config = {
'host': 'mysql-primary',
'user': 'migration_user',
'password': 'secure_password',
'database': 'ecommerce_normalized',
'autocommit': False,
'use_unicode': True,
'charset': 'utf8mb4'
}
migrator = MongoToMySQLMigrator(mongo_config, mysql_config)
try:
# Run migration
migrator.migrate_customers(batch_size=500)
# Validate results
results = migrator.validate_migration()
print(f"Migration validation: {results}")
finally:
migrator.close_connections()
if __name__ == "__main__":
main()
Scenario 3: SQL β SQL (Oracle to PostgreSQL)
Business Case
Why Migrate?
- πΈ Cost Reduction: Oracle licenses can cost $47,500 per processor + annual support fees
- β‘ Modern Features: Advanced JSON support, full-text search, extensions ecosystem
- π Vendor Independence: Avoid Oracle's aggressive licensing audits and lock-in
- π Cloud Native: Better integration with Kubernetes and cloud platforms
Migration Architecture
Schema Conversion Deep Dive
Oracle vs PostgreSQL Data Type Mapping:
Oracle Type | PostgreSQL Type | Conversion Notes |
---|---|---|
VARCHAR2(n) |
VARCHAR(n) |
Direct mapping |
NUMBER |
NUMERIC |
Precision may need adjustment |
NUMBER(*) |
BIGINT |
For integer values |
DATE |
TIMESTAMP |
Oracle DATE includes time |
CLOB |
TEXT |
Large object handling |
BLOB |
BYTEA |
Binary data storage |
RAW(n) |
BYTEA |
Binary string data |
XMLTYPE |
XML |
Native XML support |
Advanced Schema Conversion Script:
-- Oracle source schema example
CREATE TABLE CUSTOMERS (
CUSTOMER_ID NUMBER(10) PRIMARY KEY,
FIRST_NAME VARCHAR2(50) NOT NULL,
LAST_NAME VARCHAR2(100) NOT NULL,
EMAIL VARCHAR2(255) UNIQUE,
PHONE_NUMBER VARCHAR2(20),
BIRTH_DATE DATE,
CREATED_DATE DATE DEFAULT SYSDATE,
ACCOUNT_BALANCE NUMBER(15,2) DEFAULT 0,
CUSTOMER_DATA CLOB,
PROFILE_IMAGE BLOB,
STATUS NUMBER(1) CHECK (STATUS IN (0,1,2)),
LAST_LOGIN_IP RAW(16)
);
-- Create sequence for auto-increment
CREATE SEQUENCE customer_seq START WITH 1 INCREMENT BY 1;
-- Create trigger for auto-increment
CREATE OR REPLACE TRIGGER customer_trigger
BEFORE INSERT ON CUSTOMERS
FOR EACH ROW
BEGIN
IF :NEW.CUSTOMER_ID IS NULL THEN
:NEW.CUSTOMER_ID := customer_seq.NEXTVAL;
END IF;
END;
-- Oracle partitioned table
CREATE TABLE ORDER_HISTORY (
ORDER_ID NUMBER(15) PRIMARY KEY,
CUSTOMER_ID NUMBER(10) REFERENCES CUSTOMERS(CUSTOMER_ID),
ORDER_DATE DATE NOT NULL,
TOTAL_AMOUNT NUMBER(12,2),
ORDER_STATUS VARCHAR2(20)
)
PARTITION BY RANGE (ORDER_DATE)
(
PARTITION orders_2023 VALUES LESS THAN (DATE '2024-01-01'),
PARTITION orders_2024 VALUES LESS THAN (DATE '2025-01-01'),
PARTITION orders_future VALUES LESS THAN (MAXVALUE)
);
-- Converted PostgreSQL schema
CREATE TABLE customers (
customer_id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
first_name VARCHAR(50) NOT NULL,
last_name VARCHAR(100) NOT NULL,
email VARCHAR(255) UNIQUE,
phone_number VARCHAR(20),
birth_date DATE,
created_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
account_balance NUMERIC(15,2) DEFAULT 0,
customer_data TEXT,
profile_image BYTEA,
status SMALLINT CHECK (status IN (0,1,2)),
last_login_ip INET, -- PostgreSQL native IP type
-- PostgreSQL specific indexes
CONSTRAINT customers_email_check CHECK (email ~* '^[A-Za-z0-9._%-]+@[A-Za-z0-9.-]+[.][A-Za-z]+)
);
-- PostgreSQL native partitioning (v10+)
CREATE TABLE order_history (
order_id BIGINT GENERATED ALWAYS AS IDENTITY,
customer_id BIGINT REFERENCES customers(customer_id),
order_date DATE NOT NULL,
total_amount NUMERIC(12,2),
order_status VARCHAR(20),
PRIMARY KEY (order_id, order_date) -- Include partition key in PK
) PARTITION BY RANGE (order_date);
-- Create partitions
CREATE TABLE order_history_2023 PARTITION OF order_history
FOR VALUES FROM ('2023-01-01') TO ('2024-01-01');
CREATE TABLE order_history_2024 PARTITION OF order_history
FOR VALUES FROM ('2024-01-01') TO ('2025-01-01');
CREATE TABLE order_history_future PARTITION OF order_history
FOR VALUES FROM ('2025-01-01') TO (UNBOUNDED);
-- PostgreSQL specific optimizations
CREATE INDEX CONCURRENTLY idx_customers_email_hash ON customers USING HASH (email);
CREATE INDEX CONCURRENTLY idx_customers_created_date_brin ON customers USING BRIN (created_date);
CREATE INDEX CONCURRENTLY idx_order_history_customer_date ON order_history (customer_id, order_date DESC);
-- Enable row-level security (PostgreSQL feature)
ALTER TABLE customers ENABLE ROW LEVEL SECURITY;
-- Create materialized view for analytics (PostgreSQL feature)
CREATE MATERIALIZED VIEW customer_order_summary AS
SELECT
c.customer_id,
c.first_name,
c.last_name,
COUNT(oh.order_id) as total_orders,
SUM(oh.total_amount) as total_spent,
MAX(oh.order_date) as last_order_date
FROM customers c
LEFT JOIN order_history oh ON c.customer_id = oh.customer_id
GROUP BY c.customer_id, c.first_name, c.last_name;
-- Auto-refresh materialized view
CREATE OR REPLACE FUNCTION refresh_customer_summary()
RETURNS TRIGGER AS $
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY customer_order_summary;
RETURN NULL;
END;
$ LANGUAGE plpgsql;
CREATE TRIGGER refresh_customer_summary_trigger
AFTER INSERT OR UPDATE OR DELETE ON order_history
FOR EACH STATEMENT
EXECUTE FUNCTION refresh_customer_summary();
pgloader Configuration
LOAD DATABASE
FROM oracle://oracle_user:password@oracle-host:1521/ORCL
INTO postgresql://pg_user:password@postgres-host:5432/migrated_db
WITH
include drop, -- Drop existing tables
create tables, -- Create table structure
create indexes, -- Recreate indexes
reset sequences, -- Reset identity sequences
workers = 8, -- Parallel workers
concurrency = 4 -- Concurrent connections per worker
SET
work_mem to '256MB',
maintenance_work_mem to '512MB'
CAST
type datetime to timestamptz drop default drop not null using zero-dates-to-null,
type date drop not null drop default using zero-dates-to-null,
column CUSTOMERS.CREATED_DATE to timestamptz using zero-dates-to-null,
column ORDER_HISTORY.ORDER_DATE to date using zero-dates-to-null
EXCLUDING TABLE NAMES MATCHING ~/TEMP_/, ~/^BIN/
INCLUDING ONLY TABLE NAMES MATCHING
~/CUSTOMERS/, ~/ORDER_HISTORY/, ~/PRODUCTS/, ~/CATEGORIES/
BEFORE LOAD DO
$ CREATE SCHEMA IF NOT EXISTS migrated_schema; $,
$ SET search_path TO migrated_schema, public; $
AFTER LOAD DO
$ CREATE INDEX CONCURRENTLY idx_customers_email ON customers(email); $,
$ CREATE INDEX CONCURRENTLY idx_orders_date ON order_history USING BRIN(order_date); $,
$ ANALYZE; $;
Scenario 4: On-Premises β Cloud (PostgreSQL to AWS RDS)
Business Case
Why Migrate?
- βοΈ Elastic Scalability: Auto-scaling based on demand, no hardware procurement delays
- π§ Managed Operations: Automated backups, patching, monitoring, high availability
- π° OPEX vs CAPEX: Pay-as-you-go model vs large upfront hardware investments
- π Global Reach: Deploy in multiple regions with low-latency access
- π‘οΈ Enhanced Security: AWS security services, encryption at rest/in transit
Migration Architecture
Step-by-Step Migration Process
Phase 1: Infrastructure Setup & Network Configuration
#!/bin/bash
# AWS Infrastructure Setup Script
# Create VPC for secure migration
aws ec2 create-vpc --cidr-block 10.0.0.0/16 --tag-specifications 'ResourceType=vpc,Tags=[{Key=Name,Value=migration-vpc}]'
# Create subnets for Multi-AZ RDS deployment
aws ec2 create-subnet --vpc-id vpc-12345678 --cidr-block 10.0.1.0/24 --availability-zone us-east-1a
aws ec2 create-subnet --vpc-id vpc-12345678 --cidr-block 10.0.2.0/24 --availability-zone us-east-1b
# Create DB subnet group
aws rds create-db-subnet-group \
--db-subnet-group-name migration-subnet-group \
--db-subnet-group-description "Subnet group for PostgreSQL migration" \
--subnet-ids subnet-12345678 subnet-87654321
# Create security group for RDS
aws ec2 create-security-group \
--group-name rds-migration-sg \
--description "Security group for PostgreSQL RDS migration"
# Allow PostgreSQL port from on-premises network
aws ec2 authorize-security-group-ingress \
--group-id sg-12345678 \
--protocol tcp \
--port 5432 \
--cidr 192.168.0.0/16 # Your on-premises CIDR
# Set up Site-to-Site VPN
aws ec2 create-vpn-gateway --type ipsec.1
aws ec2 create-customer-gateway \
--type ipsec.1 \
--public-ip YOUR_PUBLIC_IP \
--bgp-asn 65000
# Create RDS instance with appropriate configuration
aws rds create-db-instance \
--db-instance-identifier postgres-migration-target \
--db-instance-class db.r5.2xlarge \
--engine postgres \
--engine-version 15.4 \
--master-username postgres \
--master-user-password $(aws secretsmanager get-random-password --password-length 32 --exclude-characters "\"@/\\" --output text --query RandomPassword) \
--allocated-storage 1000 \
--storage-type gp3 \
--storage-encrypted \
--kms-key-id alias/aws/rds \
--vpc-security-group-ids sg-12345678 \
--db-subnet-group-name migration-subnet-group \
--multi-az \
--backup-retention-period 35 \
--preferred-backup-window "03:00-04:00" \
--preferred-maintenance-window "sun:04:00-sun:05:00" \
--enable-performance-insights \
--performance-insights-retention-period 7 \
--monitoring-interval 60 \
--monitoring-role-arn arn:aws:iam::ACCOUNT:role/rds-monitoring-role \
--enable-cloudwatch-logs-exports postgresql \
--deletion-protection
Phase 2: Schema and Initial Data Migration
import boto3
import psycopg2
import subprocess
import logging
from datetime import datetime, timedelta
import json
from typing import Dict, List
class OnPremToRDSMigrator:
def __init__(self, source_config: Dict, target_config: Dict):
self.source_config = source_config
self.target_config = target_config
self.dms_client = boto3.client('dms')
self.s3_client = boto3.client('s3')
self.logger = logging.getLogger(__name__)
def create_dms_replication_instance(self):
"""Create DMS replication instance for migration"""
response = self.dms_client.create_replication_instance(
ReplicationInstanceIdentifier='postgres-migration-instance',
ReplicationInstanceClass='dms.r5.xlarge',
AllocatedStorage=100,
VpcSecurityGroupIds=['sg-12345678'],
ReplicationSubnetGroupIdentifier='migration-subnet-group',
MultiAZ=True,
EngineVersion='3.4.7',
PubliclyAccessible=False,
Tags=[
{
'Key': 'Project',
'Value': 'DatabaseMigration'
},
{
'Key': 'Environment',
'Value': 'Production'
}
]
)
return response['ReplicationInstance']['ReplicationInstanceArn']
def create_dms_endpoints(self):
"""Create source and target endpoints for DMS"""
# Source endpoint (on-premises PostgreSQL)
source_endpoint = self.dms_client.create_endpoint(
EndpointIdentifier='postgres-onprem-source',
EndpointType='source',
EngineName='postgres',
Username=self.source_config['username'],
Password=self.source_config['password'],
ServerName=self.source_config['host'],
Port=self.source_config['port'],
DatabaseName=self.source_config['database'],
PostgreSQLSettings={
'AfterConnectScript': 'SET search_path TO public;',
'CaptureDdls': True,
'MaxFileSize': 32768,
'DatabaseName': self.source_config['database'],
'DdlArtifactsSchema': 'public',
'ExecuteTimeout': 60,
'FailTasksOnLobTruncation': True,
'HeartbeatEnable': True,
'HeartbeatSchema': 'public',
'HeartbeatFrequency': 5000,
'Password': self.source_config['password'],
'Port': self.source_config['port'],
'ServerName': self.source_config['host'],
'Username': self.source_config['username'],
'SlotName': 'dms_migration_slot'
}
)
# Target endpoint (RDS PostgreSQL)
target_endpoint = self.dms_client.create_endpoint(
EndpointIdentifier='postgres-rds-target',
EndpointType='target',
EngineName='postgres',
Username=self.target_config['username'],
Password=self.target_config['password'],
ServerName=self.target_config['host'],
Port=self.target_config['port'],
DatabaseName=self.target_config['database'],
PostgreSQLSettings={
'AfterConnectScript': 'SET search_path TO public;',
'DatabaseName': self.target_config['database'],
'Password': self.target_config['password'],
'Port': self.target_config['port'],
'ServerName': self.target_config['host'],
'Username': self.target_config['username']
}
)
return source_endpoint, target_endpoint
def create_migration_task(self, replication_instance_arn: str,
source_endpoint_arn: str, target_endpoint_arn: str):
"""Create and start the migration task"""
table_mappings = {
"rules": [
{
"rule-type": "selection",
"rule-id": "1",
"rule-name": "1",
"object-locator": {
"schema-name": "public",
"table-name": "%"
},
"rule-action": "include",
"filters": []
},
{
"rule-type": "transformation",
"rule-id": "2",
"rule-name": "2",
"rule-target": "schema",
"object-locator": {
"schema-name": "public"
},
"rule-action": "rename",
"value": "public"
}
]
}
migration_task = self.dms_client.create_replication_task(
ReplicationTaskIdentifier='postgres-migration-full-load-cdc',
SourceEndpointArn=source_endpoint_arn,
TargetEndpointArn=target_endpoint_arn,
ReplicationInstanceArn=replication_instance_arn,
MigrationType='full-load-and-cdc',
TableMappings=json.dumps(table_mappings),
ReplicationTaskSettings=json.dumps({
"TargetMetadata": {
"TargetSchema": "",
"SupportLobs": True,
"FullLobMode": False,
"LobChunkSize": 0,
"LimitedSizeLobMode": True,
"LobMaxSize": 32,
"InlineLobMaxSize": 0,
"LoadMaxFileSize": 0,
"ParallelLoadThreads": 0,
"ParallelLoadBufferSize": 0,
"BatchApplyEnabled": True,
"TaskRecoveryTableEnabled": False,
"ParallelApplyThreads": 0,
"ParallelApplyBufferSize": 0,
"ParallelApplyQueuesPerThread": 0
},
"FullLoadSettings": {
"TargetTablePrepMode": "DROP_AND_CREATE",
"CreatePkAfterFullLoad": False,
"StopTaskCachedChangesApplied": False,
"StopTaskCachedChangesNotApplied": False,
"MaxFullLoadSubTasks": 8,
"TransactionConsistencyTimeout": 600,
"CommitRate": 10000
},
"Logging": {
"EnableLogging": True,
"LogComponents": [
{
"Id": "SOURCE_UNLOAD",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},
{
"Id": "TARGET_LOAD",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},
{
"Id": "SOURCE_CAPTURE",
"Severity": "LOGGER_SEVERITY_DEFAULT"
},
{
"Id": "TARGET_APPLY",
"Severity": "LOGGER_SEVERITY_DEFAULT"
}
]
},
"ControlTablesSettings": {
"historyTimeslotInMinutes": 5,
"ControlSchema": "",
"HistoryTimeslotInMinutes": 5,
"HistoryTableEnabled": True,
"SuspendedTablesTableEnabled": True,
"StatusTableEnabled": True
},
"StreamBufferSettings": {
"StreamBufferCount": 3,
"StreamBufferSizeInMB": 8,
"CtrlStreamBufferSizeInMB": 5
},
"ChangeProcessingDdlHandlingPolicy": {
"HandleSourceTableDropped": True,
"HandleSourceTableTruncated": True,
"HandleSourceTableAltered": True
},
"ErrorBehavior": {
"DataErrorPolicy": "LOG_ERROR",
"DataTruncationErrorPolicy": "LOG_ERROR",
"DataErrorEscalationPolicy": "SUSPEND_TABLE",
"DataErrorEscalationCount": 0,
"TableErrorPolicy": "SUSPEND_TABLE",
"TableErrorEscalationPolicy": "STOP_TASK",
"TableErrorEscalationCount": 0,
"RecoverableErrorCount": -1,
"RecoverableErrorInterval": 5,
"RecoverableErrorThrottling": True,
"RecoverableErrorThrottlingMax": 1800,
"RecoverableErrorStopRetryAfterThrottlingMax": True,
"ApplyErrorDeletePolicy": "IGNORE_RECORD",
"ApplyErrorInsertPolicy": "LOG_ERROR",
"ApplyErrorUpdatePolicy": "LOG_ERROR",
"ApplyErrorEscalationPolicy": "LOG_ERROR",
"ApplyErrorEscalationCount": 0,
"ApplyErrorFailOnTruncationDdl": False,
"FullLoadIgnoreConflicts": True,
"FailOnTransactionConsistencyBreached": False,
"FailOnNoTablesCaptured": True
}
})
)
# Start the migration task
self.dms_client.start_replication_task(
ReplicationTaskArn=migration_task['ReplicationTask']['ReplicationTaskArn'],
StartReplicationTaskType='start-replication'
)
return migration_task['ReplicationTask']['ReplicationTaskArn']
def monitor_migration_progress(self, task_arn: str):
"""Monitor the migration task progress"""
while True:
response = self.dms_client.describe_replication_tasks(
ReplicationTaskArns=[task_arn]
)
task = response['ReplicationTasks'][0]
status = task['Status']
if 'ReplicationTaskStats' in task:
stats = task['ReplicationTaskStats']
self.logger.info(f"""
Migration Progress:
Status: {status}
Full Load Progress: {stats.get('FullLoadProgressPercent', 0)}%
Tables Loaded: {stats.get('TablesLoaded', 0)}
Tables Loading: {stats.get('TablesLoading', 0)}
Tables Queued: {stats.get('TablesQueued', 0)}
Tables Errored: {stats.get('TablesErrored', 0)}
CDC Start Time: {stats.get('StartTime', 'N/A')}
""")
if status in ['stopped', 'failed', 'ready']:
break
import time
time.sleep(30) # Check every 30 seconds
return status
def validate_migration(self):
"""Validate the migration results"""
source_conn = psycopg2.connect(**self.source_config)
target_conn = psycopg2.connect(**self.target_config)
validation_results = {}
try:
# Get list of tables to validate
source_cursor = source_conn.cursor()
source_cursor.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'
ORDER BY table_name
""")
tables = [row[0] for row in source_cursor.fetchall()]
for table in tables:
# Count validation
source_cursor.execute(f"SELECT COUNT(*) FROM {table}")
source_count = source_cursor.fetchone()[0]
target_cursor = target_conn.cursor()
target_cursor.execute(f"SELECT COUNT(*) FROM {table}")
target_count = target_cursor.fetchone()[0]
validation_results[table] = {
'source_count': source_count,
'target_count': target_count,
'count_match': source_count == target_count
}
# Sample data validation
source_cursor.execute(f"SELECT * FROM {table} LIMIT 5")
source_sample = source_cursor.fetchall()
target_cursor.execute(f"SELECT * FROM {table} LIMIT 5")
target_sample = target_cursor.fetchall()
validation_results[table]['sample_match'] = source_sample == target_sample
finally:
source_conn.close()
target_conn.close()
return validation_results
def main():
# Configuration
source_config = {
'host': 'onprem-postgres.company.local',
'port': 5432,
'database': 'production',
'username': 'postgres',
'password': 'source_password'
}
target_config = {
'host': 'postgres-migration-target.cluster-xyz.us-east-1.rds.amazonaws.com',
'port': 5432,
'database': 'production',
'username': 'postgres',
'password': 'target_password' # Retrieved from AWS Secrets Manager
}
migrator = OnPremToRDSMigrator(source_config, target_config)
try:
# Setup migration infrastructure
print("Creating DMS replication instance...")
replication_instance_arn = migrator.create_dms_replication_instance()
print("Creating DMS endpoints...")
source_endpoint, target_endpoint = migrator.create_dms_endpoints()
print("Creating and starting migration task...")
task_arn = migrator.create_migration_task(
replication_instance_arn,
source_endpoint['Endpoint']['EndpointArn'],
target_endpoint['Endpoint']['EndpointArn']
)
print("Monitoring migration progress...")
final_status = migrator.monitor_migration_progress(task_arn)
if final_status == 'ready':
print("Migration completed successfully!")
print("Validating migration results...")
validation_results = migrator.validate_migration()
for table, results in validation_results.items():
print(f"Table {table}: Count match = {results['count_match']}, Sample match = {results['sample_match']}")
else:
print(f"Migration failed with status: {final_status}")
except Exception as e:
print(f"Migration failed: {e}")
raise
if __name__ == "__main__":
main()
Phase 3: Application Cutover Strategy
import boto3
import json
from typing import Dict, List
import time
class ApplicationCutoverManager:
def __init__(self):
self.route53_client = boto3.client('route53')
self.elbv2_client = boto3.client('elbv2')
self.ecs_client = boto3.client('ecs')
def create_weighted_routing_policy(self, hosted_zone_id: str,
record_name: str, onprem_ip: str, rds_endpoint: str):
"""Create weighted routing to gradually shift traffic"""
# Create record for on-premises (weight 100 initially)
self.route53_client.change_resource_record_sets(
HostedZoneId=hosted_zone_id,
ChangeBatch={
'Changes': [
{
'Action': 'CREATE',
'ResourceRecordSet': {
'Name': f'db-onprem.{record_name}',
'Type': 'CNAME',
'SetIdentifier': 'onprem-db',
'Weight': 100,
'TTL': 60,
'ResourceRecords': [
{'Value': onprem_ip}
]
}
},
{
'Action': 'CREATE',
'ResourceRecordSet': {
'Name': f'db-rds.{record_name}',
'Type': 'CNAME',
'SetIdentifier': 'rds-db',
'Weight': 0, # Start with 0% traffic
'TTL': 60,
'ResourceRecords': [
{'Value': rds_endpoint}
]
}
}
]
}
)
def update_traffic_weights(self, hosted_zone_id: str, record_name: str,
onprem_weight: int, rds_weight: int):
"""Update traffic distribution weights"""
self.route53_client.change_resource_record_sets(
HostedZoneId=hosted_zone_id,
ChangeBatch={
'Changes': [
{
'Action': 'UPSERT',
'ResourceRecordSet': {
'Name': f'db-onprem.{record_name}',
'Type': 'CNAME',
'SetIdentifier': 'onprem-db',
'Weight': onprem_weight,
'TTL': 60,
'ResourceRecords': [
{'Value': 'onprem-postgres.company.local'}
]
}
},
{
'Action': 'UPSERT',
'ResourceRecordSet': {
'Name': f'db-rds.{record_name}',
'Type': 'CNAME',
'SetIdentifier': 'rds-db',
'Weight': rds_weight,
'TTL': 60,
'ResourceRecords': [
{'Value': 'postgres-migration-target.cluster-xyz.us-east-1.rds.amazonaws.com'}
]
}
}
]
}
)
def gradual_cutover(self, hosted_zone_id: str, record_name: str,
validation_function, rollback_function):
"""Perform gradual traffic cutover with validation"""
cutover_phases = [
(95, 5), # 5% to RDS
(80, 20), # 20% to RDS
(50, 50), # 50% to RDS
(20, 80), # 80% to RDS
(0, 100) # 100% to RDS
]
for onprem_weight, rds_weight in cutover_phases:
print(f"Shifting traffic: {rds_weight}% to RDS")
# Update weights
self.update_traffic_weights(hosted_zone_id, record_name,
onprem_weight, rds_weight)
# Wait for DNS propagation
time.sleep(180) # 3 minutes
# Validate system health
if validation_function():
print(f"β
Phase validated: {rds_weight}% traffic to RDS")
time.sleep(600) # Wait 10 minutes before next phase
else:
print(f"β Validation failed at {rds_weight}% traffic")
print("Initiating rollback...")
rollback_function()
return False
return True
def emergency_rollback(self, hosted_zone_id: str, record_name: str):
"""Immediately route all traffic back to on-premises"""
print("π¨ EMERGENCY ROLLBACK: Routing all traffic to on-premises")
self.update_traffic_weights(hosted_zone_id, record_name, 100, 0)
# Also update application configuration via Parameter Store
ssm_client = boto3.client('ssm')
ssm_client.put_parameter(
Name='/app/database/endpoint',
Value='onprem-postgres.company.local:5432',
Type='String',
Overwrite=True
)
# Restart ECS services to pick up new configuration
self.restart_ecs_services()
def restart_ecs_services(self):
"""Restart ECS services to pick up configuration changes"""
clusters = self.ecs_client.list_clusters()['clusterArns']
for cluster_arn in clusters:
services = self.ecs_client.list_services(cluster=cluster_arn)['serviceArns']
for service_arn in services:
self.ecs_client.update_service(
cluster=cluster_arn,
service=service_arn,
forceNewDeployment=True
)
def application_health_check() -> bool:
"""Validate application health during migration"""
import requests
try:
# Check application endpoints
health_endpoints = [
'https://api.company.com/health',
'https://app.company.com/api/status'
]
for endpoint in health_endpoints:
response = requests.get(endpoint, timeout=10)
if response.status_code != 200:
return False
# Check database connection pool health
# This would be implemented based on your specific monitoring system
return True
except Exception as e:
print(f"Health check failed: {e}")
return False
# Usage example
if __name__ == "__main__":
cutover_manager = ApplicationCutoverManager()
# Perform gradual cutover
success = cutover_manager.gradual_cutover(
hosted_zone_id='Z123456789',
record_name='company.com',
validation_function=application_health_check,
rollback_function=lambda: cutover_manager.emergency_rollback('Z123456789', 'company.com')
)
if success:
print("π Migration completed successfully!")
else:
print("π₯ Migration failed and rolled back")
Scenario 5: Homogeneous Upgrade (MySQL 5.7 β 8.0)
Business Case
Why Upgrade?
- π Security Enhancements: SHA-2 authentication, improved SSL/TLS support
- π Performance Improvements: Up to 2x faster for read/write workloads
- π New Features: Window functions, CTEs, JSON improvements, descending indexes
- π Better Analytics: Enhanced optimizer, histograms, invisible indexes
Upgrade Strategy
Pre-Upgrade Compatibility Assessment
#!/bin/bash
# MySQL 5.7 to 8.0 Compatibility Check Script
echo "π MySQL 5.7 β 8.0 Compatibility Assessment"
echo "============================================="
# Download MySQL Shell for upgrade checker
wget https://dev.mysql.com/get/Downloads/MySQL-Shell/mysql-shell-8.0.35-linux-glibc2.12-x86-64bit.tar.gz
tar -xzf mysql-shell-8.0.35-linux-glibc2.12-x86-64bit.tar.gz
export PATH=$PATH:$(pwd)/mysql-shell-8.0.35-linux-glibc2.12-x86-64bit/bin
# Run MySQL upgrade checker
mysqlsh --uri mysql://root:password@mysql57-host:3306 \
--js -e "util.checkForServerUpgrade()"
# Manual compatibility checks
echo "
π Manual Compatibility Checks:
===============================
1. Check for reserved words conflicts:
"
mysql -h mysql57-host -u root -p -e "
SELECT DISTINCT table_name
FROM information_schema.tables
WHERE table_name IN ('rank', 'dense_rank', 'percent_rank', 'cume_dist', 'ntile', 'lag', 'lead', 'first_value', 'last_value', 'nth_value')
AND table_schema NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys');
"
echo "
2. Check for deprecated SQL modes:
"
mysql -h mysql57-host -u root -p -e "
SELECT @@sql_mode;
SHOW VARIABLES LIKE '%sql_mode%';
"
echo "
3. Check for deprecated authentication methods:
"
mysql -h mysql57-host -u root -p -e "
SELECT user, host, plugin
FROM mysql.user
WHERE plugin IN ('mysql_old_password', '');
"
echo "
4. Check for deprecated data types:
"
mysql -h mysql57-host -u root -p -e "
SELECT DISTINCT table_schema, table_name, column_name, data_type
FROM information_schema.columns
WHERE data_type IN ('year(2)', 'float(m,d)', 'double(m,d)')
AND table_schema NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys');
"
echo "
5. Check for deprecated functions:
"
# This would require parsing application code or stored procedures
# for functions like PASSWORD(), OLD_PASSWORD(), etc.
echo "
6. Check foreign key constraint names length (must be β€ 64 chars in 8.0):
"
mysql -h mysql57-host -u root -p -e "
SELECT constraint_schema, constraint_name, LENGTH(constraint_name) as name_length
FROM information_schema.table_constraints
WHERE constraint_type = 'FOREIGN KEY'
AND LENGTH(constraint_name) > 64
AND constraint_schema NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys');
"
echo "β
Compatibility assessment complete!"
Automated Upgrade Implementation
import pymysql
import subprocess
import time
import logging
from typing import Dict, List, Tuple
import json
class MySQL57to80Upgrader:
def __init__(self, source_config: Dict, target_config: Dict):
self.source_config = source_config
self.target_config = target_config
self.logger = logging.getLogger(__name__)
def setup_mysql80_replica(self) -> bool:
"""Set up MySQL 8.0 as a replica of MySQL 5.7"""
try:
# Connect to source MySQL 5.7
source_conn = pymysql.connect(**self.source_config)
source_cursor = source_conn.cursor()
# Get current master status
source_cursor.execute("SHOW MASTER STATUS")
master_status = source_cursor.fetchone()
binlog_file = master_status[0]
binlog_pos = master_status[1]
self.logger.info(f"Master status: {binlog_file}:{binlog_pos}")
# Connect to target MySQL 8.0
target_conn = pymysql.connect(**self.target_config)
target_cursor = target_conn.cursor()
# Configure replication
replication_sql = f"""
CHANGE MASTER TO
MASTER_HOST='{self.source_config['host']}',
MASTER_USER='replication_user',
MASTER_PASSWORD='replication_password',
MASTER_LOG_FILE='{binlog_file}',
MASTER_LOG_POS={binlog_pos},
MASTER_CONNECT_RETRY=10,
MASTER_RETRY_COUNT=3600
"""
target_cursor.execute(replication_sql)
target_cursor.execute("START SLAVE")
# Verify replication status
time.sleep(5)
target_cursor.execute("SHOW SLAVE STATUS")
slave_status = target_cursor.fetchone()
if slave_status and slave_status[10] == 'Yes' and slave_status[11] == 'Yes':
self.logger.info("β
Replication established successfully")
return True
else:
self.logger.error("β Replication setup failed")
return False
except Exception as e:
self.logger.error(f"Replication setup error: {e}")
return False
finally:
if 'source_conn' in locals():
source_conn.close()
if 'target_conn' in locals():
target_conn.close()
def wait_for_replication_sync(self, max_lag_seconds: int = 1) -> bool:
"""Wait for replication to catch up"""
target_conn = pymysql.connect(**self.target_config)
target_cursor = target_conn.cursor()
try:
for attempt in range(300): # 5 minutes max wait
target_cursor.execute("SHOW SLAVE STATUS")
slave_status = target_cursor.fetchone()
if not slave_status:
self.logger.error("No slave status available")
return False
seconds_behind_master = slave_status[32] # Seconds_Behind_Master column
if seconds_behind_master is None:
self.logger.warning("Seconds_Behind_Master is NULL - checking IO/SQL threads")
io_running = slave_status[10] # Slave_IO_Running
sql_running = slave_status[11] # Slave_SQL_Running
if io_running != 'Yes' or sql_running != 'Yes':
self.logger.error(f"Replication threads not running: IO={io_running}, SQL={sql_running}")
return False
continue
if seconds_behind_master <= max_lag_seconds:
self.logger.info(f"β
Replication synchronized (lag: {seconds_behind_master}s)")
return True
self.logger.info(f"Waiting for sync... (lag: {seconds_behind_master}s)")
time.sleep(1)
self.logger.error("β Replication sync timeout")
return False
finally:
target_conn.close()
def perform_cutover(self, maintenance_window_minutes: int = 5) -> bool:
"""Perform the actual cutover to MySQL 8.0"""
try:
# Step 1: Stop application writes
self.logger.info("π¦ Stopping application writes...")
self.stop_application_writes()
# Step 2: Wait for final replication sync
self.logger.info("β³ Waiting for final replication sync...")
if not self.wait_for_replication_sync(max_lag_seconds=0):
self.logger.error("Failed to achieve final sync")
self.start_application_writes() # Rollback
return False
# Step 3: Stop replication on MySQL 8.0
target_conn = pymysql.connect(**self.target_config)
target_cursor = target_conn.cursor()
target_cursor.execute("STOP SLAVE")
target_cursor.execute("RESET SLAVE ALL")
# Step 4: Run mysql_upgrade on MySQL 8.0
self.logger.info("π§ Running mysql_upgrade...")
upgrade_result = subprocess.run([
'mysql_upgrade',
f'--host={self.target_config["host"]}',
f'--user={self.target_config["user"]}',
f'--password={self.target_config["password"]}',
'--upgrade-system-tables',
'--verbose'
], capture_output=True, text=True)
if upgrade_result.returncode != 0:
self.logger.error(f"mysql_upgrade failed: {upgrade_result.stderr}")
return False
# Step 5: Update application configuration to use MySQL 8.0
self.logger.info("π Updating application configuration...")
self.update_application_config()
# Step 6: Restart applications
self.logger.info("π Restarting applications...")
self.restart_applications()
# Step 7: Validate functionality
self.logger.info("β
Validating application functionality...")
if not self.validate_application_health():
self.logger.error("Application validation failed")
return False
self.logger.info("π MySQL 8.0 upgrade completed successfully!")
return True
except Exception as e:
self.logger.error(f"Cutover failed: {e}")
return False
def stop_application_writes(self):
"""Stop application writes using various methods"""
# Method 1: Update load balancer to mark database as read-only
subprocess.run([
'curl', '-X', 'PUT',
'http://load-balancer-api/config/database/readonly',
'-d', '{"readonly": true}'
])
# Method 2: Set MySQL to read-only mode
source_conn = pymysql.connect(**self.source_config)
source_cursor = source_conn.cursor()
source_cursor.execute("SET GLOBAL read_only = ON")
source_cursor.execute("SET GLOBAL super_read_only = ON")
source_conn.close()
# Method 3: Use ProxySQL to redirect writes (if using ProxySQL)
# This would be implemented based on your proxy configuration
def start_application_writes(self):
"""Re-enable application writes (rollback scenario)"""
# Re-enable writes on source MySQL 5.7
source_conn = pymysql.connect(**self.source_config)
source_cursor = source_conn.cursor()
source_cursor.execute("SET GLOBAL read_only = OFF")
source_cursor.execute("SET GLOBAL super_read_only = OFF")
source_conn.close()
# Update load balancer
subprocess.run([
'curl', '-X', 'PUT',
'http://load-balancer-api/config/database/readonly',
'-d', '{"readonly": false}'
])
def update_application_config(self):
"""Update application configuration to point to MySQL 8.0"""
# Example: Update Kubernetes ConfigMap
config_update = {
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {"name": "app-config"},
"data": {
"database_host": self.target_config["host"],
"database_port": str(self.target_config["port"])
}
}
with open('/tmp/config-update.yaml', 'w') as f:
import yaml
yaml.dump(config_update, f)
subprocess.run(['kubectl', 'apply', '-f', '/tmp/config-update.yaml'])
def restart_applications(self):
"""Restart applications to pick up new configuration"""
# Example: Rolling restart of Kubernetes deployments
deployments = [
'web-app',
'api-service',
'background-worker'
]
for deployment in deployments:
subprocess.run([
'kubectl', 'rollout', 'restart',
f'deployment/{deployment}'
])
# Wait for rollout to complete
subprocess.run([
'kubectl', 'rollout', 'status',
f'deployment/{deployment}',
'--timeout=300s'
])
def validate_application_health(self) -> bool:
"""Validate that applications are working correctly with MySQL 8.0"""
import requests
health_checks = [
'http://web-app/health',
'http://api-service/health',
'http://admin-panel/health'
]
for endpoint in health_checks:
try:
response = requests.get(endpoint, timeout=10)
if response.status_code != 200:
self.logger.error(f"Health check failed for {endpoint}: {response.status_code}")
return False
except requests.RequestException as e:
self.logger.error(f"Health check error for {endpoint}: {e}")
return False
# Test database connectivity and basic operations
try:
target_conn = pymysql.connect(**self.target_config)
target_cursor = target_conn.cursor()
# Test basic operations
target_cursor.execute("SELECT 1")
result = target_cursor.fetchone()
if result[0] != 1:
return False
# Test a sample business query
target_cursor.execute("SELECT COUNT(*) FROM users LIMIT 1")
user_count = target_cursor.fetchone()[0]
if user_count < 0: # Basic sanity check
return False
target_conn.close()
return True
except Exception as e:
self.logger.error(f"Database validation failed: {e}")
return False
# Advanced MySQL 8.0 Feature Utilization
class MySQL80FeatureOptimizer:
def __init__(self, connection_config: Dict):
self.connection_config = connection_config
def enable_new_features(self):
"""Enable and configure new MySQL 8.0 features"""
conn = pymysql.connect(**self.connection_config)
cursor = conn.cursor()
try:
# Enable histogram statistics for better query optimization
self.logger.info("π Enabling histogram statistics...")
cursor.execute("SET persist_only histogram_generation_max_mem_size = 20971520")
# Create histograms for frequently queried columns
tables_to_analyze = [
('users', 'created_at'),
('orders', 'order_date'),
('products', 'category_id'),
('sessions', 'user_id')
]
for table, column in tables_to_analyze:
cursor.execute(f"ANALYZE TABLE {table} UPDATE HISTOGRAM ON {column} WITH 100 BUCKETS")
self.logger.info(f"Created histogram for {table}.{column}")
# Enable invisible indexes for testing
self.logger.info("π Creating invisible indexes for testing...")
invisible_indexes = [
"CREATE INDEX idx_users_email_invisible ON users(email) INVISIBLE",
"CREATE INDEX idx_orders_status_invisible ON orders(status) INVISIBLE"
]
for index_sql in invisible_indexes:
try:
cursor.execute(index_sql)
self.logger.info(f"Created invisible index: {index_sql}")
except pymysql.Error as e:
if "already exists" not in str(e):
self.logger.warning(f"Failed to create invisible index: {e}")
# Configure new authentication
self.logger.info("π Configuring caching_sha2_password authentication...")
cursor.execute("SET persist default_authentication_plugin = 'caching_sha2_password'")
# Enable JSON improvements
self.logger.info("π Optimizing JSON columns...")
cursor.execute("""
SELECT table_schema, table_name, column_name
FROM information_schema.columns
WHERE data_type = 'json'
AND table_schema NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys')
""")
json_columns = cursor.fetchall()
for schema, table, column in json_columns:
# Create functional indexes on JSON paths
try:
cursor.execute(f"""
CREATE INDEX idx_{table}_{column}_path
ON {schema}.{table} ((CAST({column}->'$.id' AS UNSIGNED)))
""")
self.logger.info(f"Created JSON functional index on {schema}.{table}.{column}")
except pymysql.Error as e:
if "already exists" not in str(e):
self.logger.warning(f"Failed to create JSON index: {e}")
# Configure improved InnoDB settings
self.logger.info("βοΈ Configuring InnoDB optimizations...")
innodb_settings = [
"SET persist innodb_dedicated_server = ON",
"SET persist innodb_redo_log_capacity = 2147483648", # 2GB
"SET persist innodb_parallel_read_threads = 4",
"SET persist innodb_fsync_threshold = 0" # Disable fsync threshold for better performance
]
for setting in innodb_settings:
try:
cursor.execute(setting)
self.logger.info(f"Applied setting: {setting}")
except pymysql.Error as e:
self.logger.warning(f"Failed to apply setting {setting}: {e}")
conn.commit()
self.logger.info("β
MySQL 8.0 feature optimization complete!")
finally:
conn.close()
def create_sample_ctes_and_window_functions(self):
"""Create sample views using new MySQL 8.0 features"""
conn = pymysql.connect(**self.connection_config)
cursor = conn.cursor()
try:
# Create a view using CTEs (Common Table Expressions)
cte_view = """
CREATE OR REPLACE VIEW user_order_analytics AS
WITH monthly_orders AS (
SELECT
user_id,
DATE_FORMAT(order_date, '%Y-%m') as order_month,
COUNT(*) as order_count,
SUM(total_amount) as monthly_total
FROM orders
WHERE order_date >= DATE_SUB(NOW(), INTERVAL 12 MONTH)
GROUP BY user_id, DATE_FORMAT(order_date, '%Y-%m')
),
user_stats AS (
SELECT
user_id,
AVG(order_count) as avg_monthly_orders,
AVG(monthly_total) as avg_monthly_spending,
MIN(order_month) as first_order_month,
MAX(order_month) as last_order_month
FROM monthly_orders
GROUP BY user_id
)
SELECT
u.user_id,
u.first_name,
u.last_name,
us.avg_monthly_orders,
us.avg_monthly_spending,
us.first_order_month,
us.last_order_month,
DATEDIFF(
STR_TO_DATE(CONCAT(us.last_order_month, '-01'), '%Y-%m-%d'),
STR_TO_DATE(CONCAT(us.first_order_month, '-01'), '%Y-%m-%d')
) / 30 as customer_lifetime_months
FROM users u
JOIN user_stats us ON u.user_id = us.user_id
"""
cursor.execute(cte_view)
# Create a view using window functions
window_function_view = """
CREATE OR REPLACE VIEW user_purchase_patterns AS
SELECT
o.user_id,
o.order_id,
o.order_date,
o.total_amount,
ROW_NUMBER() OVER (PARTITION BY o.user_id ORDER BY o.order_date) as order_sequence,
LAG(o.order_date) OVER (PARTITION BY o.user_id ORDER BY o.order_date) as previous_order_date,
LEAD(o.order_date) OVER (PARTITION BY o.user_id ORDER BY o.order_date) as next_order_date,
SUM(o.total_amount) OVER (PARTITION BY o.user_id ORDER BY o.order_date ROWS UNBOUNDED PRECEDING) as running_total,
AVG(o.total_amount) OVER (PARTITION BY o.user_id ORDER BY o.order_date ROWS 2 PRECEDING) as moving_avg_3_orders,
PERCENT_RANK() OVER (PARTITION BY DATE_FORMAT(o.order_date, '%Y-%m') ORDER BY o.total_amount) as monthly_spending_percentile,
NTILE(10) OVER (PARTITION BY o.user_id ORDER BY o.total_amount DESC) as user_order_decile
FROM orders o
WHERE o.order_date >= DATE_SUB(NOW(), INTERVAL 12 MONTH)
"""
cursor.execute(window_function_view)
self.logger.info("β
Created advanced views using MySQL 8.0 features")
finally:
conn.close()
# Usage example
def main():
source_config = {
'host': 'mysql57-master.company.local',
'port': 3306,
'user': 'root',
'password': 'mysql57_password',
'database': 'production'
}
target_config = {
'host': 'mysql80-replica.company.local',
'port': 3306,
'user': 'root',
'password': 'mysql80_password',
'database': 'production'
}
upgrader = MySQL57to80Upgrader(source_config, target_config)
try:
# Phase 1: Set up replication
print("Phase 1: Setting up MySQL 8.0 as replica...")
if not upgrader.setup_mysql80_replica():
print("β Failed to set up replication")
return
# Phase 2: Wait for sync and monitor
print("Phase 2: Monitoring replication sync...")
if not upgrader.wait_for_replication_sync():
print("β Replication sync failed")
return
# Phase 3: Perform cutover
print("Phase 3: Performing cutover to MySQL 8.0...")
if not upgrader.perform_cutover():
print("β Cutover failed")
return
# Phase 4: Optimize for MySQL 8.0 features
print("Phase 4: Enabling MySQL 8.0 advanced features...")
optimizer = MySQL80FeatureOptimizer(target_config)
optimizer.enable_new_features()
optimizer.create_sample_ctes_and_window_functions()
print("π MySQL 5.7 β 8.0 upgrade completed successfully!")
except Exception as e:
print(f"π₯ Upgrade failed: {e}")
if __name__ == "__main__":
main()
Scenario 6: Database Consolidation
Business Case
Why Consolidate?
- π Post-Acquisition Integration: Merge systems from acquired companies
- π Single Source of Truth: Eliminate data silos and inconsistencies
- π οΈ Operational Simplification: Reduce infrastructure complexity and costs
- π Enhanced Analytics: Enable cross-system reporting and insights
Consolidation Architecture
Comprehensive Data Consolidation Pipeline
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, MetaData, Table
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import hashlib
import logging
from typing import Dict, List, Tuple, Any
from dataclasses import dataclass
import great_expectations as ge
@dataclass
class ConsolidationMapping:
source_table: str
target_table: str
column_mappings: Dict[str, str]
transformation_rules: Dict[str, str]
deduplication_keys: List[str]
data_quality_rules: List[str]
class DatabaseConsolidator:
def __init__(self, source_configs: Dict[str, Dict], target_config: Dict):
self.source_engines = {}
for name, config in source_configs.items():
self.source_engines[name] = create_engine(
f"postgresql://{config['user']}:{config['password']}@{config['host']}:{config['port']}/{config['database']}"
)
self.target_engine = create_engine(
f"postgresql://{target_config['user']}:{target_config['password']}@{target_config['host']}:{target_config['port']}/{target_config['database']}"
)
self.logger = logging.getLogger(__name__)
# Define consolidation mappings
self.consolidation_mappings = self.define_consolidation_mappings()
def define_consolidation_mappings(self) -> List[ConsolidationMapping]:
"""Define how tables from different sources map to consolidated schema"""
return [
ConsolidationMapping(
source_table="customers",
target_table="unified_customers",
column_mappings={
# Company A (PostgreSQL)
'company_a.customer_id': 'source_customer_id',
'company_a.first_name': 'first_name',
'company_a.last_name': 'last_name',
'company_a.email': 'email',
'company_a.phone': 'phone',
# Company B (MySQL) - different column names
'company_b.cust_id': 'source_customer_id',
'company_b.fname': 'first_name',
'company_b.lname': 'last_name',
'company_b.email_address': 'email',
'company_b.phone_number': 'phone',
# Company C (Oracle) - different structure
'company_c.customer_number': 'source_customer_id',
'company_c.full_name': 'full_name_to_split',
'company_c.contact_email': 'email',
'company_c.tel_number': 'phone'
},
transformation_rules={
'unified_customer_id': 'generate_uuid_from_source',
'source_system': 'extract_from_table_name',
'full_name_to_split': 'split_into_first_last_name',
'email': 'normalize_email',
'phone': 'normalize_phone_number',
'created_at': 'convert_to_utc'
},
deduplication_keys=['email', 'phone'],
data_quality_rules=[
'email_format_valid',
'phone_format_valid',
'name_not_empty',
'no_test_data'
]
),
ConsolidationMapping(
source_table="products",
target_table="unified_products",
column_mappings={
'company_a.product_id': 'source_product_id',
'company_a.product_name': 'product_name',
'company_a.category': 'category',
'company_a.price': 'price',
'company_b.prod_id': 'source_product_id',
'company_b.name': 'product_name',
'company_b.cat': 'category',
'company_b.unit_price': 'price',
'company_c.item_code': 'source_product_id',
'company_c.description': 'product_name',
'company_c.product_category': 'category',
'company_c.selling_price': 'price'
},
transformation_rules={
'unified_product_id': 'generate_uuid_from_source',
'source_system': 'extract_from_table_name',
'category': 'normalize_category',
'price': 'convert_to_usd',
'product_name': 'clean_product_name'
},
deduplication_keys=['product_name', 'category', 'price'],
data_quality_rules=[
'price_positive',
'product_name_not_empty',
'category_valid'
]
)
]
def extract_from_source(self, source_name: str, table_name: str) -> pd.DataFrame:
"""Extract data from source database"""
try:
query = f"SELECT * FROM {table_name}"
df = pd.read_sql(query, self.source_engines[source_name])
# Add metadata columns
df['_source_system'] = source_name
df['_extracted_at'] = datetime.utcnow()
df['_source_table'] = table_name
self.logger.info(f"Extracted {len(df)} rows from {source_name}.{table_name}")
return df
except Exception as e:
self.logger.error(f"Failed to extract from {source_name}.{table_name}: {e}")
raise
def transform_data(self, df: pd.DataFrame, mapping: ConsolidationMapping) -> pd.DataFrame:
"""Apply transformations based on mapping rules"""
transformed_df = df.copy()
# Apply column mappings
for source_col, target_col in mapping.column_mappings.items():
if source_col in df.columns:
transformed_df[target_col] = df[source_col]
# Apply transformation rules
for target_col, rule in mapping.transformation_rules.items():
transformed_df[target_col] = self.apply_transformation_rule(transformed_df, rule, target_col)
# Remove original columns that were mapped
cols_to_remove = [col for col in mapping.column_mappings.keys() if col in transformed_df.columns]
transformed_df = transformed_df.drop(columns=cols_to_remove, errors='ignore')
return transformed_df
def apply_transformation_rule(self, df: pd.DataFrame, rule: str, target_col: str) -> pd.Series:
"""Apply specific transformation rules"""
if rule == 'generate_uuid_from_source':
return df.apply(lambda row: self.generate_deterministic_uuid(
row['_source_system'], row.get('source_customer_id', row.get('source_product_id', ''))
), axis=1)
elif rule == 'extract_from_table_name':
return df['_source_system']
elif rule == 'split_into_first_last_name':
# Handle full name splitting
if 'full_name_to_split' in df.columns:
names = df['full_name_to_split'].str.split(' ', n=1, expand=True)
if target_col == 'first_name':
return names[0] if 0 in names.columns else ''
elif target_col == 'last_name':
return names[1] if 1 in names.columns else ''
elif rule == 'normalize_email':
if 'email' in df.columns:
return df['email'].str.lower().str.strip()
elif rule == 'normalize_phone_number':
if 'phone' in df.columns:
# Remove non-digits and format as +1XXXXXXXXXX
cleaned = df['phone'].astype(str).str.replace(r'[^\d]', '', regex=True)
return cleaned.apply(lambda x: f"+1{x[-10:]}" if len(x) >= 10 else x)
elif rule == 'normalize_category':
if 'category' in df.columns:
category_mappings = {
'electronics': 'Electronics',
'electronic': 'Electronics',
'tech': 'Electronics',
'books': 'Books',
'book': 'Books',
'clothing': 'Apparel',
'clothes': 'Apparel',
'apparel': 'Apparel'
}
return df['category'].str.lower().map(category_mappings).fillna(df['category'])
elif rule == 'convert_to_usd':
if 'price' in df.columns:
# Implement currency conversion logic
# This is simplified - in reality, you'd use exchange rates
currency_multipliers = {
'company_a': 1.0, # Already USD
'company_b': 1.0, # Already USD
'company_c': 0.85 # EUR to USD (example rate)
}
source_system = df['_source_system'].iloc[0] if len(df) > 0 else 'company_a'
multiplier = currency_multipliers.get(source_system, 1.0)
return df['price'].astype(float) * multiplier
elif rule == 'clean_product_name':
if 'product_name' in df.columns:
return df['product_name'].str.strip().str.title()
elif rule == 'convert_to_utc':
if 'created_at' in df.columns:
return pd.to_datetime(df['created_at']).dt.tz_localize(None)
# Default case
return pd.Series([''] * len(df))
def generate_deterministic_uuid(self, source_system: str, source_id: str) -> str:
"""Generate consistent UUID from source system and ID"""
combined = f"{source_system}:{source_id}"
hash_object = hashlib.sha256(combined.encode())
hex_dig = hash_object.hexdigest()
# Format as UUID
return f"{hex_dig[:8]}-{hex_dig[8:12]}-{hex_dig[12:16]}-{hex_dig[16:20]}-{hex_dig[20:32]}"
def deduplicate_records(self, df: pd.DataFrame, dedup_keys: List[str]) -> pd.DataFrame:
"""Remove duplicate records based on deduplication keys"""
if not dedup_keys or df.empty:
return df
# Create composite key for deduplication
df['_dedup_key'] = df[dedup_keys].fillna('').astype(str).agg('|'.join, axis=1)
# Sort by source system preference and extraction time to keep best record
source_priority = {'company_a': 1, 'company_b': 2, 'company_c': 3}
df['_source_priority'] = df['_source_system'].map(source_priority).fillna(99)
df_sorted = df.sort_values(['_dedup_key', '_source_priority', '_extracted_at'])
df_deduped = df_sorted.drop_duplicates(subset=['_dedup_key'], keep='first')
# Clean up temporary columns
df_deduped = df_deduped.drop(columns=['_dedup_key', '_source_priority'])
duplicates_removed = len(df) - len(df_deduped)
if duplicates_removed > 0:
self.logger.info(f"Removed {duplicates_removed} duplicate records")
return df_deduped
def validate_data_quality(self, df: pd.DataFrame, quality_rules: List[str]) -> Tuple[pd.DataFrame, Dict]:
"""Validate data quality using Great Expectations"""
validation_results = {}
cleaned_df = df.copy()
# Convert to Great Expectations dataset
ge_df = ge.from_pandas(df)
for rule in quality_rules:
if rule == 'email_format_valid':
result = ge_df.expect_column_values_to_match_regex(
'email',
r'^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$'
)
if not result['success']:
invalid_emails = result['result']['unexpected_list']
self.logger.warning(f"Invalid emails found: {len(invalid_emails)}")
# Optionally filter out invalid emails
cleaned_df = cleaned_df[~cleaned_df['email'].isin(invalid_emails)]
validation_results['email_format'] = result['success']
elif rule == 'phone_format_valid':
result = ge_df.expect_column_values_to_match_regex(
'phone',
r'^\+1\d{10}$'
)
validation_results['phone_format'] = result['success']
elif rule == 'name_not_empty':
result = ge_df.expect_column_values_to_not_be_null('first_name')
validation_results['first_name_not_null'] = result['success']
result = ge_df.expect_column_values_to_not_be_null('last_name')
validation_results['last_name_not_null'] = result['success']
elif rule == 'no_test_data':
# Filter out obvious test data
test_patterns = ['test', 'demo', 'sample', 'example']
for pattern in test_patterns:
if 'email' in cleaned_df.columns:
mask = cleaned_df['email'].str.contains(pattern, case=False, na=False)
test_records = mask.sum()
if test_records > 0:
self.logger.warning(f"Filtering {test_records} test records containing '{pattern}'")
cleaned_df = cleaned_df[~mask]
validation_results['test_data_filtered'] = True
elif rule == 'price_positive':
if 'price' in cleaned_df.columns:
result = ge_df.expect_column_values_to_be_between('price', min_value=0, max_value=999999)
validation_results['price_positive'] = result['success']
# Filter out invalid prices
invalid_prices = cleaned_df['price'] <= 0
if invalid_prices.sum() > 0:
self.logger.warning(f"Filtering {invalid_prices.sum()} records with invalid prices")
cleaned_df = cleaned_df[~invalid_prices]
elif rule == 'product_name_not_empty':
if 'product_name' in cleaned_df.columns:
result = ge_df.expect_column_values_to_not_be_null('product_name')
validation_results['product_name_not_null'] = result['success']
elif rule == 'category_valid':
if 'category' in cleaned_df.columns:
valid_categories = ['Electronics', 'Books', 'Apparel', 'Home & Garden', 'Sports', 'Other']
result = ge_df.expect_column_values_to_be_in_set('category', valid_categories)
validation_results['category_valid'] = result['success']
return cleaned_df, validation_results
def load_to_target(self, df: pd.DataFrame, table_name: str):
"""Load transformed and validated data to target database"""
try:
# Use upsert logic to handle potential duplicates
df.to_sql(
table_name,
self.target_engine,
if_exists='append',
index=False,
method='multi',
chunksize=1000
)
self.logger.info(f"Loaded {len(df)} records to {table_name}")
except Exception as e:
self.logger.error(f"Failed to load data to {table_name}: {e}")
raise
def consolidate_table(self, mapping: ConsolidationMapping):
"""Consolidate a specific table from all sources"""
consolidated_data = []
for source_name in self.source_engines.keys():
try:
# Extract from source
source_df = self.extract_from_source(source_name, mapping.source_table)
if source_df.empty:
self.logger.info(f"No data found in {source_name}.{mapping.source_table}")
continue
# Transform data
transformed_df = self.transform_data(source_df, mapping)
consolidated_data.append(transformed_df)
except Exception as e:
self.logger.warning(f"Failed to process {source_name}.{mapping.source_table}: {e}")
continue
if not consolidated_data:
self.logger.warning(f"No data to consolidate for {mapping.target_table}")
return
# Combine all source data
combined_df = pd.concat(consolidated_data, ignore_index=True)
# Deduplicate records
deduped_df = self.deduplicate_records(combined_df, mapping.deduplication_keys)
# Validate data quality
validated_df, quality_results = self.validate_data_quality(deduped_df, mapping.data_quality_rules)
self.logger.info(f"Data quality results: {quality_results}")
# Load to target
self.load_to_target(validated_df, mapping.target_table)
return {
'source_records': len(combined_df),
'after_deduplication': len(deduped_df),
'final_records': len(validated_df),
'quality_results': quality_results
}
# Airflow DAG for orchestrating consolidation
def create_consolidation_dag():
"""Create Airflow DAG for database consolidation"""
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'catchup': False
}
dag = DAG(
'database_consolidation',
default_args=default_args,
description='Consolidate databases from acquired companies',
schedule_interval='0 2 * * *', # Daily at 2 AM
max_active_runs=1,
tags=['consolidation', 'etl', 'migration']
)
def run_consolidation(**context):
source_configs = {
'company_a': {
'host': 'postgres-a.company.local',
'port': 5432,
'database': 'production',
'user': 'readonly_user',
'password': 'password_a'
},
'company_b': {
'host': 'mysql-b.company.local',
'port': 3306,
'database': 'ecommerce',
'user': 'readonly_user',
'password': 'password_b'
},
'company_c': {
'host': 'oracle-c.company.local',
'port': 1521,
'database': 'ORCL',
'user': 'readonly_user',
'password': 'password_c'
}
}
target_config = {
'host': 'consolidated-db.company.local',
'port': 5432,
'database': 'consolidated',
'user': 'consolidation_user',
'password': 'target_password'
}
consolidator = DatabaseConsolidator(source_configs, target_config)
results = {}
for mapping in consolidator.consolidation_mappings:
result = consolidator.consolidate_table(mapping)
results[mapping.target_table] = result
# Log summary
logging.info(f"Consolidation completed: {results}")
return results
# Define tasks
consolidation_task = PythonOperator(
task_id='run_consolidation',
python_callable=run_consolidation,
dag=dag
)
# Data quality checks
quality_check_task = BashOperator(
task_id='run_quality_checks',
bash_command='python /opt/airflow/dags/scripts/quality_checks.py',
dag=dag
)
# Update search index
update_search_task = BashOperator(
task_id='update_search_index',
bash_command='python /opt/airflow/dags/scripts/update_elasticsearch.py',
dag=dag
)
# Set task dependencies
consolidation_task >> quality_check_task >> update_search_task
return dag
# Create the DAG
consolidation_dag = create_consolidation_dag()
# Advanced consolidation with conflict resolution
class ConflictResolver:
"""Handle data conflicts during consolidation"""
def __init__(self, priority_rules: Dict[str, List[str]]):
self.priority_rules = priority_rules # field -> [source1, source2, ...] in priority order
def resolve_conflicts(self, records: List[Dict], conflict_fields: List[str]) -> Dict:
"""Resolve conflicts between multiple records of the same entity"""
if len(records) <= 1:
return records[0] if records else {}
resolved_record = {}
for field in conflict_fields:
values = [(record.get(field), record.get('_source_system')) for record in records if record.get(field)]
if not values:
resolved_record[field] = None
continue
# Apply priority rules
if field in self.priority_rules:
priority_sources = self.priority_rules[field]
for priority_source in priority_sources:
for value, source in values:
if source == priority_source:
resolved_record[field] = value
break
if field in resolved_record:
break
else:
# No priority source found, use first available
resolved_record[field] = values[0][0]
else:
# No priority rule, use most recent or first available
resolved_record[field] = values[0][0]
# Copy non-conflicting fields from first record
base_record = records[0]
for key, value in base_record.items():
if key not in conflict_fields and key not in resolved_record:
resolved_record[key] = value
return resolved_record
π Observability & Monitoring
Zero-downtime migrations succeed through continuous visibility. Without proper observability, you're flying blind during the most critical phases.
Core Monitoring Stack
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge
import time
import logging
from typing import Dict, Any
import psutil
import threading
class MigrationMetrics:
def __init__(self):
# Replication lag metrics
self.replication_lag = Gauge(
'migration_replication_lag_seconds',
'Replication lag in seconds',
['source_db', 'target_db']
)
# Data validation metrics
self.validation_errors = Counter(
'migration_validation_errors_total',
'Total number of data validation errors',
['table', 'error_type']
)
# Migration progress
self.migration_progress = Gauge(
'migration_progress_percent',
'Migration progress percentage',
['phase', 'table']
)
# Performance metrics
self.query_duration = Histogram(
'migration_query_duration_seconds',
'Query execution time',
['database', 'operation']
)
# CDC metrics
self.cdc_events_processed = Counter(
'migration_cdc_events_total',
'Total CDC events processed',
['source_table', 'event_type']
)
# Error rates
self.error_rate = Counter(
'migration_errors_total',
'Total migration errors',
['component', 'error_category']
)
# System resources
self.cpu_usage = Gauge('migration_cpu_usage_percent', 'CPU usage percentage')
self.memory_usage = Gauge('migration_memory_usage_bytes', 'Memory usage in bytes')
self.disk_io = Counter('migration_disk_io_bytes_total', 'Total disk I/O bytes', ['direction'])
# Start system monitoring thread
self.monitoring_thread = threading.Thread(target=self._monitor_system_resources, daemon=True)
self.monitoring_thread.start()
def _monitor_system_resources(self):
"""Monitor system resources continuously"""
while True:
try:
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
self.cpu_usage.set(cpu_percent)
# Memory usage
memory = psutil.virtual_memory()
self.memory_usage.set(memory.used)
# Disk I/O
disk_io = psutil.disk_io_counters()
if disk_io:
self.disk_io.labels(direction='read').inc(disk_io.read_bytes)
self.disk_io.labels(direction='write').inc(disk_io.write_bytes)
time.sleep(10) # Monitor every 10 seconds
except Exception as e:
logging.error(f"System monitoring error: {e}")
time.sleep(30) # Wait longer on error
class MigrationValidator:
"""Comprehensive data validation during migration"""
def __init__(self, source_config: Dict, target_config: Dict, metrics: MigrationMetrics):
self.source_config = source_config
self.target_config = target_config
self.metrics = metrics
self.logger = logging.getLogger(__name__)
def validate_row_counts(self, tables: List[str]) -> Dict[str, Dict[str, int]]:
"""Compare row counts between source and target"""
validation_results = {}
source_conn = self._get_connection(self.source_config)
target_conn = self._get_connection(self.target_config)
try:
for table in tables:
with self.metrics.query_duration.labels(database='source', operation='count').time():
source_cursor = source_conn.cursor()
source_cursor.execute(f"SELECT COUNT(*) FROM {table}")
source_count = source_cursor.fetchone()[0]
with self.metrics.query_duration.labels(database='target', operation='count').time():
target_cursor = target_conn.cursor()
target_cursor.execute(f"SELECT COUNT(*) FROM {table}")
target_count = target_cursor.fetchone()[0]
validation_results[table] = {
'source_count': source_count,
'target_count': target_count,
'difference': abs(source_count - target_count),
'match': source_count == target_count
}
if not validation_results[table]['match']:
self.metrics.validation_errors.labels(
table=table,
error_type='count_mismatch'
).inc()
self.logger.error(
f"Row count mismatch for {table}: source={source_count}, target={target_count}"
)
finally:
source_conn.close()
target_conn.close()
return validation_results
def validate_data_integrity(self, table: str, primary_key: str, sample_size: int = 1000) -> Dict[str, Any]:
"""Compare random sample of records between source and target"""
source_conn = self._get_connection(self.source_config)
target_conn = self._get_connection(self.target_config)
try:
# Get random sample of primary keys
source_cursor = source_conn.cursor()
source_cursor.execute(f"""
SELECT {primary_key} FROM {table}
ORDER BY RANDOM()
LIMIT {sample_size}
""")
sample_keys = [row[0] for row in source_cursor.fetchall()]
mismatches = []
matches = 0
for key in sample_keys:
# Get source record
source_cursor.execute(f"SELECT * FROM {table} WHERE {primary_key} = %s", (key,))
source_record = source_cursor.fetchone()
# Get target record
target_cursor = target_conn.cursor()
target_cursor.execute(f"SELECT * FROM {table} WHERE {primary_key} = %s", (key,))
target_record = target_cursor.fetchone()
if source_record != target_record:
mismatches.append({
'key': key,
'source': source_record,
'target': target_record
})
else:
matches += 1
integrity_score = (matches / len(sample_keys)) * 100 if sample_keys else 100
if mismatches:
self.metrics.validation_errors.labels(
table=table,
error_type='data_mismatch'
).inc(len(mismatches))
self.logger.warning(f"Data integrity issues in {table}: {len(mismatches)} mismatches out of {len(sample_keys)} samples")
return {
'table': table,
'sample_size': len(sample_keys),
'matches': matches,
'mismatches': len(mismatches),
'integrity_score': integrity_score,
'mismatch_details': mismatches[:10] # First 10 mismatches for debugging
}
finally:
source_conn.close()
target_conn.close()
def validate_schema_compatibility(self) -> Dict[str, Any]:
"""Validate schema compatibility between source and target"""
source_conn = self._get_connection(self.source_config)
target_conn = self._get_connection(self.target_config)
try:
schema_issues = []
# Get table structures
source_cursor = source_conn.cursor()
source_cursor.execute("""
SELECT table_name, column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = 'public'
ORDER BY table_name, ordinal_position
""")
source_schema = source_cursor.fetchall()
target_cursor = target_conn.cursor()
target_cursor.execute("""
SELECT table_name, column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_schema = 'public'
ORDER BY table_name, ordinal_position
""")
target_schema = target_cursor.fetchall()
# Convert to dictionaries for comparison
source_dict = {(row[0], row[1]): {'data_type': row[2], 'nullable': row[3]} for row in source_schema}
target_dict = {(row[0], row[1]): {'data_type': row[2], 'nullable': row[3]} for row in target_schema}
# Find missing columns in target
missing_in_target = set(source_dict.keys()) - set(target_dict.keys())
for table, column in missing_in_target:
schema_issues.append({
'type': 'missing_column_in_target',
'table': table,
'column': column,
'source_type': source_dict[(table, column)]['data_type']
})
# Find extra columns in target
extra_in_target = set(target_dict.keys()) - set(source_dict.keys())
for table, column in extra_in_target:
schema_issues.append({
'type': 'extra_column_in_target',
'table': table,
'column': column,
'target_type': target_dict[(table, column)]['data_type']
})
# Find data type mismatches
common_columns = set(source_dict.keys()) & set(target_dict.keys())
for table, column in common_columns:
source_type = source_dict[(table, column)]['data_type']
target_type = target_dict[(table, column)]['data_type']
if source_type != target_type:
schema_issues.append({
'type': 'data_type_mismatch',
'table': table,
'column': column,
'source_type': source_type,
'target_type': target_type
})
return {
'compatible': len(schema_issues) == 0,
'issues': schema_issues,
'source_tables': len(set(row[0] for row in source_schema)),
'target_tables': len(set(row[0] for row in target_schema))
}
finally:
source_conn.close()
target_conn.close()
def _get_connection(self, config: Dict):
"""Get database connection based on config"""
import psycopg2
return psycopg2.connect(
host=config['host'],
port=config['port'],
database=config['database'],
user=config['user'],
password=config['password']
)
# Real-time monitoring dashboard with Grafana
class GrafanaDashboardExporter:
"""Export migration metrics to Grafana dashboard"""
@staticmethod
def generate_dashboard_json() -> Dict[str, Any]:
"""Generate Grafana dashboard configuration"""
return {
"dashboard": {
"id": None,
"title": "Database Migration Monitoring",
"tags": ["migration", "database"],
"timezone": "browser",
"panels": [
{
"id": 1,
"title": "Replication Lag",
"type": "graph",
"targets": [
{
"expr": "migration_replication_lag_seconds",
"legendFormat": "{{source_db}} -> {{target_db}}"
}
],
"yAxes": [
{
"label": "Seconds",
"min": 0
}
],
"alert": {
"conditions": [
{
"query": {"params": ["A", "5m", "now"]},
"reducer": {"params": [], "type": "last"},
"evaluator": {"params": [30], "type": "gt"}
}
],
"executionErrorState": "alerting",
"for": "5m",
"frequency": "10s",
"handler": 1,
"name": "High Replication Lag",
"noDataState": "no_data",
"notifications": []
}
},
{
"id": 2,
"title": "Migration Progress",
"type": "singlestat",
"targets": [
{
"expr": "migration_progress_percent",
"legendFormat": "{{phase}}"
}
],
"valueName": "current",
"format": "percent",
"thresholds": "50,80"
},
{
"id": 3,
"title": "Validation Errors",
"type": "graph",
"targets": [
{
"expr": "rate(migration_validation_errors_total[5m])",
"legendFormat": "{{table}} - {{error_type}}"
}
]
},
{
"id": 4,
"title": "CDC Events Rate",
"type": "graph",
"targets": [
{
"expr": "rate(migration_cdc_events_total[1m])",
"legendFormat": "{{source_table}} - {{event_type}}"
}
]
},
{
"id": 5,
"title": "System Resources",
"type": "graph",
"targets": [
{
"expr": "migration_cpu_usage_percent",
"legendFormat": "CPU Usage %"
},
{
"expr": "migration_memory_usage_bytes / 1024 / 1024 / 1024",
"legendFormat": "Memory Usage GB"
}
]
}
],
"time": {
"from": "now-1h",
"to": "now"
},
"refresh": "5s"
}
}
π‘οΈ Risk Mitigation & Rollback Strategies
Every migration is a high-stakes operation. Having bulletproof rollback strategies isn't optionalβit's mission-critical.
Rollback Strategy Matrix
Migration Phase | Primary Rollback Method | Secondary Method | RTO Target |
---|---|---|---|
Schema Migration | Database restore from backup | Manual DDL rollback scripts | 15 minutes |
Bulk Data Load | Stop ETL, restore target DB | Truncate target tables | 30 minutes |
CDC Phase | Stop CDC, resume from checkpoint | Replay from backup | 10 minutes |
Dual Write | Disable target writes via feature flag | DNS failover to source | 2 minutes |
Read Traffic | Feature flag flip (0% target reads) | Load balancer reconfiguration | 30 seconds |
Full Cutover | DNS failover + feature flags | Emergency database swap | 5 minutes |
Automated Rollback Implementation
import time
import boto3
import logging
from enum import Enum
from typing import Dict, List, Optional
from dataclasses import dataclass
import json
class RollbackTrigger(Enum):
MANUAL = "manual"
ERROR_RATE_HIGH = "error_rate_high"
REPLICATION_LAG_HIGH = "replication_lag_high"
VALIDATION_FAILED = "validation_failed"
PERFORMANCE_DEGRADED = "performance_degraded"
@dataclass
class RollbackPlan:
trigger: RollbackTrigger
steps: List[str]
estimated_time_seconds: int
prerequisites: List[str]
validation_checks: List[str]
class EmergencyRollbackSystem:
"""Automated rollback system with multiple trigger conditions"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.logger = logging.getLogger(__name__)
self.route53_client = boto3.client('route53')
self.elbv2_client = boto3.client('elbv2')
self.ssm_client = boto3.client('ssm')
# Define rollback plans
self.rollback_plans = {
RollbackTrigger.ERROR_RATE_HIGH: RollbackPlan(
trigger=RollbackTrigger.ERROR_RATE_HIGH,
steps=[
"disable_dual_writes",
"route_reads_to_source",
"stop_cdc_pipeline",
"notify_stakeholders"
],
estimated_time_seconds=120,
prerequisites=["source_db_healthy", "dns_records_ready"],
validation_checks=["error_rate_normal", "source_db_responsive"]
),
RollbackTrigger.REPLICATION_LAG_HIGH: RollbackPlan(
trigger=RollbackTrigger.REPLICATION_LAG_HIGH,
steps=[
"pause_application_writes",
"wait_for_replication_catchup",
"resume_or_rollback_decision"
],
estimated_time_seconds=300,
prerequisites=["cdc_pipeline_running"],
validation_checks=["replication_lag_acceptable"]
),
RollbackTrigger.MANUAL: RollbackPlan(
trigger=RollbackTrigger.MANUAL,
steps=[
"confirm_rollback_decision",
"execute_complete_rollback",
"validate_source_functionality",
"cleanup_target_resources"
],
estimated_time_seconds=600,
prerequisites=["admin_confirmation"],
validation_checks=["all_systems_operational"]
)
}
def execute_rollback(self, trigger: RollbackTrigger, context: Dict[str, Any] = None):
"""Execute rollback based on trigger type"""
rollback_plan = self.rollback_plans.get(trigger)
if not rollback_plan:
raise ValueError(f"No rollback plan defined for trigger: {trigger}")
self.logger.critical(f"π¨ EXECUTING ROLLBACK: {trigger.value}")
self.logger.info(f"Estimated rollback time: {rollback_plan.estimated_time_seconds} seconds")
# Check prerequisites
for prerequisite in rollback_plan.prerequisites:
if not self._check_prerequisite(prerequisite):
self.logger.error(f"Prerequisite failed: {prerequisite}")
raise Exception(f"Cannot execute rollback - prerequisite failed: {prerequisite}")
rollback_start = time.time()
try:
# Execute rollback steps
for step in rollback_plan.steps:
self.logger.info(f"Executing rollback step: {step}")
self._execute_rollback_step(step, context)
# Validate rollback success
for validation in rollback_plan.validation_checks:
if not self._validate_rollback_step(validation):
self.logger.error(f"Rollback validation failed: {validation}")
raise Exception(f"Rollback validation failed: {validation}")
rollback_duration = time.time() - rollback_start
self.logger.info(f"β
Rollback completed successfully in {rollback_duration:.1f} seconds")
return {
'success': True,
'duration_seconds': rollback_duration,
'trigger': trigger.value,
'steps_executed': rollback_plan.steps
}
except Exception as e:
rollback_duration = time.time() - rollback_start
self.logger.error(f"β Rollback failed after {rollback_duration:.1f} seconds: {e}")
# Attempt emergency procedures
self._execute_emergency_procedures()
return {
'success': False,
'duration_seconds': rollback_duration,
'error': str(e),
'trigger': trigger.value
}
def _check_prerequisite(self, prerequisite: str) -> bool:
"""Check if prerequisite is satisfied"""
if prerequisite == "source_db_healthy":
return self._check_database_health(self.config['source_db'])
elif prerequisite == "dns_records_ready":
return self._check_dns_records()
elif prerequisite == "cdc_pipeline_running":
return self._check_cdc_pipeline_status()
elif prerequisite == "admin_confirmation":
# In real implementation, this would check for admin approval
return True
return False
def _execute_rollback_step(self, step: str, context: Dict[str, Any] = None):
"""Execute individual rollback step"""
if step == "disable_dual_writes":
# Set feature flag to disable writes to target database
self.ssm_client.put_parameter(
Name='/migration/dual_write_enabled',
Value='false',
Type='String',
Overwrite=True
)
time.sleep(5) # Wait for applications to pick up change
elif step == "route_reads_to_source":
# Update Route53 weighted routing to send 100% traffic to source
self.route53_client.change_resource_record_sets(
HostedZoneId=self.config['hosted_zone_id'],
ChangeBatch={
'Changes': [
{
'Action': 'UPSERT',
'ResourceRecordSet': {
'Name': self.config['db_endpoint_name'],
'Type': 'CNAME',
'SetIdentifier': 'source-db',
'Weight': 100,
'TTL': 30, # Short TTL for fast propagation
'ResourceRecords': [
{'Value': self.config['source_db']['endpoint']}
]
}
},
{
'Action': 'UPSERT',
'ResourceRecordSet': {
'Name': self.config['db_endpoint_name'],
'Type': 'CNAME',
'SetIdentifier': 'target-db',
'Weight': 0, # Zero weight
'TTL': 30,
'ResourceRecords': [
{'Value': self.config['target_db']['endpoint']}
]
}
}
]
}
)
elif step == "stop_cdc_pipeline":
# Stop CDC pipeline (implementation depends on CDC tool)
if self.config.get('cdc_type') == 'debezium':
self._stop_debezium_connector()
elif self.config.get('cdc_type') == 'aws_dms':
self._stop_dms_task()
elif step == "notify_stakeholders":
# Send notifications to stakeholders
self._send_rollback_notification(context)
elif step == "pause_application_writes":
# Temporarily pause application writes
self.ssm_client.put_parameter(
Name='/migration/writes_enabled',
Value='false',
Type='String',
Overwrite=True
)
elif step == "wait_for_replication_catchup":
# Wait for replication to catch up
timeout = 300 # 5 minutes
start_time = time.time()
while time.time() - start_time < timeout:
lag = self._get_replication_lag()
if lag < 1: # Less than 1 second lag
break
time.sleep(5)
else:
raise Exception("Replication failed to catch up within timeout")
elif step == "resume_or_rollback_decision":
# In real implementation, this would involve human decision or automated logic
lag = self._get_replication_lag()
if lag > 5:
raise Exception("Replication lag still too high, continuing with rollback")
# Add more rollback steps as needed
def _validate_rollback_step(self, validation: str) -> bool:
"""Validate that rollback step was successful"""
if validation == "error_rate_normal":
# Check if error rate has returned to normal
current_error_rate = self._get_current_error_rate()
return current_error_rate < self.config.get('error_rate_threshold', 0.01)
elif validation == "source_db_responsive":
return self._check_database_health(self.config['source_db'])
elif validation == "replication_lag_acceptable":
lag = self._get_replication_lag()
return lag < self.config.get('max_acceptable_lag', 10)
elif validation == "all_systems_operational":
return (self._check_database_health(self.config['source_db']) and
self._check_application_health())
return True
def _execute_emergency_procedures(self):
"""Execute emergency procedures when normal rollback fails"""
self.logger.critical("π EXECUTING EMERGENCY PROCEDURES")
try:
# Emergency DNS failover
self.route53_client.change_resource_record_sets(
HostedZoneId=self.config['hosted_zone_id'],
ChangeBatch={
'Changes': [
{
'Action': 'UPSERT',
'ResourceRecordSet': {
'Name': self.config['emergency_failover_record'],
'Type': 'A',
'TTL': 60,
'ResourceRecords': [
{'Value': self.config['emergency_ip']}
]
}
}
]
}
)
# Set all systems to read-only mode
self.ssm_client.put_parameter(
Name='/emergency/read_only_mode',
Value='true',
Type='String',
Overwrite=True
)
# Send emergency alerts
self._send_emergency_alert()
except Exception as e:
self.logger.critical(f"Emergency procedures failed: {e}")
def _get_current_error_rate(self) -> float:
"""Get current application error rate"""
# Implementation would query monitoring system
# This is a placeholder
return 0.005
def _get_replication_lag(self) -> float:
"""Get current replication lag in seconds"""
# Implementation would query replication metrics
# This is a placeholder
return 2.3
def _check_database_health(self, db_config: Dict) -> bool:
"""Check if database is healthy and responsive"""
try:
import psycopg2
conn = psycopg2.connect(
host=db_config['host'],
port=db_config['port'],
database=db_config['database'],
user=db_config['health_check_user'],
password=db_config['health_check_password'],
connect_timeout=10
)
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
conn.close()
return result[0] == 1
except Exception as e:
self.logger.error(f"Database health check failed: {e}")
return False
def _check_application_health(self) -> bool:
"""Check if applications are healthy"""
import requests
health_endpoints = self.config.get('health_check_endpoints', [])
for endpoint in health_endpoints:
try:
response = requests.get(endpoint, timeout=10)
if response.status_code != 200:
return False
except requests.RequestException:
return False
return True
def _send_rollback_notification(self, context: Dict[str, Any] = None):
"""Send rollback notification to stakeholders"""
message = {
'severity': 'HIGH',
'event': 'DATABASE_MIGRATION_ROLLBACK',
'timestamp': datetime.utcnow().isoformat(),
'context': context or {},
'next_steps': 'Engineering team investigating. All systems operational on source database.'
}
# Send to Slack, PagerDuty, email, etc.
self.logger.info(f"Rollback notification sent: {message}")
# Automated monitoring and triggering system
class MigrationMonitor:
"""Continuously monitor migration health and trigger rollbacks if needed"""
def __init__(self, rollback_system: EmergencyRollbackSystem, thresholds: Dict[str, float]):
self.rollback_system = rollback_system
self.thresholds = thresholds
self.monitoring_active = True
self.logger = logging.getLogger(__name__)
def start_monitoring(self):
"""Start continuous monitoring"""
import threading
monitor_thread = threading.Thread(target=self._monitoring_loop, daemon=True)
monitor_thread.start()
self.logger.info("Migration monitoring started")
def _monitoring_loop(self):
"""Main monitoring loop"""
while self.monitoring_active:
try:
# Check error rate
error_rate = self._get_error_rate()
if error_rate > self.thresholds.get('error_rate', 0.05):
self.logger.warning(f"High error rate detected: {error_rate}")
if error_rate > self.thresholds.get('critical_error_rate', 0.10):
self._trigger_rollback(RollbackTrigger.ERROR_RATE_HIGH, {'error_rate': error_rate})
# Check replication lag
lag = self._get_replication_lag()
if lag > self.thresholds.get('replication_lag', 30):
self.logger.warning(f"High replication lag detected: {lag}s")
if lag > self.thresholds.get('critical_replication_lag', 120):
self._trigger_rollback(RollbackTrigger.REPLICATION_LAG_HIGH, {'lag_seconds': lag})
# Check data validation failures
validation_failures = self._get_validation_failures()
if validation_failures > self.thresholds.get('validation_failures', 10):
self.logger.warning(f"High validation failures: {validation_failures}")
if validation_failures > self.thresholds.get('critical_validation_failures', 50):
self._trigger_rollback(RollbackTrigger.VALIDATION_FAILED, {'failures': validation_failures})
time.sleep(10) # Check every 10 seconds
except Exception as e:
self.logger.error(f"Monitoring error: {e}")
time.sleep(30) # Longer wait on error
def _trigger_rollback(self, trigger: RollbackTrigger, context: Dict[str, Any]):
"""Trigger automated rollback"""
self.logger.critical(f"AUTOMATED ROLLBACK TRIGGERED: {trigger.value}")
# Stop monitoring to prevent multiple rollback attempts
self.monitoring_active = False
try:
result = self.rollback_system.execute_rollback(trigger, context)
if result['success']:
self.logger.info("Automated rollback completed successfully")
else:
self.logger.error("Automated rollback failed - manual intervention required")
except Exception as e:
self.logger.critical(f"Automated rollback execution failed: {e}")
π Operational Playbook
Pre-Migration Checklist
Infrastructure Readiness
- [ ] Source database backup completed and verified
- [ ] Target database provisioned with appropriate sizing
- [ ] Network connectivity established (VPN/private links)
- [ ] Security groups and firewall rules configured
- [ ] Monitoring and alerting systems deployed
- [ ] Rollback infrastructure tested and ready
Application Readiness
- [ ] Feature flags implemented for traffic routing
- [ ] Database connection pools configured for both databases
- [ ] Application health checks updated
- [ ] Load balancer configuration prepared
- [ ] Circuit breakers configured for database failures
Team Readiness
- [ ] Migration runbook reviewed by all team members
- [ ] Rollback procedures tested in staging environment
- [ ] Communication channels established (Slack, conference bridge)
- [ ] On-call schedules arranged for migration window
- [ ] Stakeholders notified of migration timeline
Migration Day Execution
T-2 Hours: Final Preparations
# Final backup of source database
pg_dump -h source-db -U postgres -d production > final_backup_$(date +%Y%m%d_%H%M%S).sql
# Verify target database connectivity
psql -h target-db -U postgres -d production -c "SELECT version();"
# Check replication lag (should be < 1 second)
SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp())) AS lag_seconds;
# Verify application health
curl -f http://app-health-check/status
# Start enhanced monitoring
kubectl apply -f migration-monitoring.yaml
T-30 Minutes: Begin Migration
# Enable dual-write mode
aws ssm put-parameter --name "/migration/dual_write_enabled" --value "true" --overwrite
# Wait for applications to pick up configuration
sleep 60
# Verify dual writes are working
tail -f /var/log/application.log | grep "dual_write"
T-15 Minutes: Start Traffic Shift
# Shift 10% of read traffic to target
python migration_scripts/update_traffic_weights.py --read-target-percent 10
# Monitor for 5 minutes
python migration_scripts/monitor_health.py --duration 300
# Shift to 50% if healthy
python migration_scripts/update_traffic_weights.py --read-target-percent 50
T-5 Minutes: Final Cutover
# Stop writes to source database
aws ssm put-parameter --name "/migration/source_writes_enabled" --value "false" --overwrite
# Wait for final replication sync
python migration_scripts/wait_for_sync.py --max-lag-seconds 1
# Shift 100% traffic to target
python migration_scripts/update_traffic_weights.py --read-target-percent 100 --write-target-percent 100
# Verify migration success
python migration_scripts/validate_migration.py --full-check
Post-Migration Activities
Immediate (T+1 Hour)
- [ ] Run comprehensive data validation
- [ ] Monitor application error rates and performance
- [ ] Verify all critical business functions
- [ ] Update DNS records to point to target database
- [ ] Document any issues encountered and resolutions
Short-term (T+24 Hours)
- [ ] Optimize target database configuration and indexes
- [ ] Review and tune application connection pools
- [ ] Analyze query performance and identify slow queries
- [ ] Update backup and maintenance schedules
- [ ] Conduct retrospective with migration team
Long-term (T+1 Week)
- [ ] Decommission source database infrastructure
- [ ] Update disaster recovery procedures
- [ ] Archive migration logs and documentation
- [ ] Update monitoring dashboards and alerts
- [ ] Plan for next migration (if applicable)
π― Best Practices & Lessons Learned
The Golden Rules of Zero-Downtime Migration
1. Trust but Verify Everything
- Assume every component can fail
- Validate data at every stage
- Test rollback procedures before you need them
- Monitor everything that can be monitored
2. Gradual is Better than Big Bang
- Bulk load β CDC β Dual writes β Gradual cutover
- Start with read-only workloads
- Use percentage-based traffic shifting
- Canary releases for critical features
3. Observability is Non-Negotiable
- Real-time replication lag monitoring
- Application-level health checks
- Business metrics validation (revenue, user activity)
- Automated alerting with clear escalation paths
4. Plan for Failure Scenarios
- Source database failure during migration
- Network partitions between databases
- Application bugs exposed by new database
- Partial data corruption or inconsistency
Common Pitfalls and How to Avoid Them
Schema Evolution During Migration
-- Problem: Schema changes applied to source but not target
ALTER TABLE users ADD COLUMN created_by_ip INET;
-- Solution: Coordinate schema changes across both databases
-- Always apply schema changes to target first, then source
Character Encoding Issues
# Problem: Different character encodings between databases
# Source: latin1, Target: utf8
# Solution: Handle encoding conversion explicitly
def convert_encoding(text):
if isinstance(text, str):
return text.encode('latin1').decode('utf8', errors='replace')
return text
Time Zone Confusion
-- Problem: Timestamps stored differently across databases
-- Source: America/New_York, Target: UTC
-- Solution: Normalize all timestamps to UTC
UPDATE migrated_table
SET created_at = created_at AT TIME ZONE 'America/New_York' AT TIME ZONE 'UTC';
Foreign Key Constraint Violations
-- Problem: Data loaded out of order causes FK violations
-- Solution: Disable FK checks during bulk load, re-enable after
SET foreign_key_checks = 0;
-- Load data
SET foreign_key_checks = 1;
Performance Optimization Techniques
Parallel Processing
# Use multiple workers for bulk data migration
from concurrent.futures import ThreadPoolExecutor
from sqlalchemy import create_engine
def migrate_table_chunk(table_name, offset, limit):
source_engine = create_engine(SOURCE_DB_URL, pool_size=20)
target_engine = create_engine(TARGET_DB_URL, pool_size=20)
df = pd.read_sql(f"""
SELECT * FROM {table_name}
ORDER BY id OFFSET {offset} LIMIT {limit}
""", source_engine)
df.to_sql(table_name, target_engine, if_exists='append', index=False)
# Process large tables in parallel chunks
with ThreadPoolExecutor(max_workers=8) as executor:
chunk_size = 10000
total_rows = get_table_row_count('large_table')
futures = []
for offset in range(0, total_rows, chunk_size):
future = executor.submit(migrate_table_chunk, 'large_table', offset, chunk_size)
futures.append(future)
# Wait for all chunks to complete
for future in futures:
future.result()
Index Management
-- Drop indexes before bulk load, recreate after
DROP INDEX CONCURRENTLY idx_users_email;
DROP INDEX CONCURRENTLY idx_users_created_at;
-- Bulk load data
COPY users FROM '/path/to/data.csv' WITH CSV HEADER;
-- Recreate indexes
CREATE INDEX CONCURRENTLY idx_users_email ON users(email);
CREATE INDEX CONCURRENTLY idx_users_created_at ON users(created_at);
-- Analyze tables for query planner
ANALYZE users;
Connection Pooling
# Optimize connection pooling for migration workload
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
migration_engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=50, # Larger pool for migration
max_overflow=100, # Allow bursts
pool_pre_ping=True, # Validate connections
pool_recycle=3600 # Recycle connections hourly
)
Real-World War Stories
The Case of the Vanishing Microseconds
PostgreSQL stores timestamps with microsecond precision, but MySQL only supports millisecond precision by default. During a PostgreSQL β MySQL migration, we discovered that high-frequency trading timestamps were being truncated, causing order processing logic to fail.
Solution: Used MySQL 8.0's DATETIME(6)
type for microsecond precision and added explicit precision handling in the ETL pipeline.
The Unicode Normalization Nightmare
Migrating user-generated content from MySQL (utf8) to PostgreSQL (utf8) should have been straightforward. However, some emoji and special characters were stored using different Unicode normalization forms, causing duplicate key violations on unique constraints.
Solution: Implemented Unicode normalization (NFC) during the transformation phase and added data quality checks for non-ASCII characters.
The Phantom Read Replica Lag
During a critical migration, replication lag suddenly spiked to 10 minutes despite no apparent load increase. The culprit was a single analytical query running on the source database that created a massive transaction, blocking replication.
Solution: Implemented query monitoring on source databases and automatic query killing for long-running transactions during migration windows.
π Observability & Validation
No migration succeeds without visibility. You need:
- Lag monitoring β replication delay, CDC offsets
- Data validation jobs β nightly row counts, hash comparisons
- Application metrics β error rates, query latency
- User-level canaries β test user accounts to verify correctness
Advanced Validation Strategies
Business Logic Validation
def validate_business_metrics():
"""Validate critical business metrics across databases"""
# Compare daily revenue calculations
source_revenue = get_daily_revenue(source_db, date.today())
target_revenue = get_daily_revenue(target_db, date.today())
revenue_diff = abs(source_revenue - target_revenue)
if revenue_diff > 100: # $100 threshold
alert_finance_team(f"Revenue calculation mismatch: ${revenue_diff}")
# Compare user activity metrics
source_active_users = get_active_user_count(source_db)
target_active_users = get_active_user_count(target_db)
if abs(source_active_users - target_active_users) > 10:
alert_product_team("Active user count mismatch")
Automated Regression Testing
class MigrationRegressionTests:
"""Automated tests to run during migration phases"""
def test_user_authentication(self):
"""Test user login functionality"""
test_users = get_test_user_accounts()
for user in test_users:
response = authenticate_user(user.email, user.password)
assert response.success, f"Authentication failed for {user.email}"
def test_order_processing(self):
"""Test complete order processing workflow"""
test_order = create_test_order()
# Place order
order_response = place_order(test_order)
assert order_response.success
# Verify in database
db_order = get_order_by_id(order_response.order_id)
assert db_order.status == 'pending'
# Process payment
payment_response = process_payment(order_response.order_id)
assert payment_response.success
# Verify order completion
final_order = get_order_by_id(order_response.order_id)
assert final_order.status == 'completed'
π‘οΈ Rollback Strategies
Always assume failure. Options:
- Read-Only Freeze β stop new writes, replay missed events, retry
- Feature-Flag Flip β route reads/writes back to old DB instantly
- Dual DB Window β keep old DB warm for X days before decommission
Decision Matrix for Rollback Triggers
Condition | Automatic Rollback | Manual Decision | Continue |
---|---|---|---|
Error rate > 10% | β Immediate | ||
Replication lag > 2 minutes | β Assess cause | ||
Data validation failures > 100 | β Within 5 minutes | ||
Performance degradation > 50% | β Business decision | ||
Single user report | β Investigate |
π― Final Thoughts
Zero-downtime migration is not a one-night project. It's an iterative engineering effort involving:
- Bulk + incremental sync (ETL + CDC)
- Dual writes for safety
- Feature flags for cutover
- Continuous validation
- A tested rollback plan
The actual migration isn't the hard part β the hard part is:
- Handling edge cases (schema mismatch, nulls, encoding issues)
- Keeping the business running while you operate two DBs at once
- Convincing management to invest in testing and observability
But done right β your users won't even notice that your backend just swapped its heart out mid-flight.
Key Success Factors
Technical Excellence
- Comprehensive testing in staging environments
- Robust monitoring and alerting systems
- Well-defined rollback procedures
- Gradual, percentage-based cutover strategies
Organizational Alignment
- Executive sponsorship and clear success metrics
- Cross-functional team with defined roles and responsibilities
- Regular stakeholder communication and status updates
- Post-migration retrospectives and knowledge sharing
Risk Management
- Conservative timelines with built-in buffer periods
- Multiple validation checkpoints throughout the process
- Automated rollback triggers for critical failure scenarios
- Comprehensive disaster recovery and business continuity planning
Remember: Every successful migration starts with acknowledging that something will go wrong. The teams that succeed are those who prepare for failure, not those who assume success.
This guide represents battle-tested patterns from dozens of production migrations across various industries and scales. Each migration is unique, but the fundamental principles remain constant: prepare thoroughly, monitor continuously, and always have a way back.
Top comments (1)
Helpful π