DEV Community

Cover image for 9 Essential Techniques for Building Production-Ready Python Microservices That Scale
Nithin Bharadwaj
Nithin Bharadwaj

Posted on

9 Essential Techniques for Building Production-Ready Python Microservices That Scale

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

Building production-ready microservices in Python requires mastering specific techniques that address the complexities of distributed systems. I've spent considerable time refining these approaches through real-world deployments, and I can share nine essential patterns that consistently deliver reliable, maintainable services.

Health Check Implementation

Health checks form the foundation of reliable microservice monitoring. I've found that comprehensive health checks must validate both internal service state and external dependencies to provide meaningful status information to orchestration platforms.

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncio
import aioredis
import asyncpg
from datetime import datetime
from typing import Dict, Any
import psutil
import logging

app = FastAPI()

class HealthStatus(BaseModel):
    status: str
    timestamp: datetime
    checks: Dict[str, Any]
    service_info: Dict[str, str]

class HealthChecker:
    def __init__(self):
        self.checks = {}
        self.redis_pool = None
        self.db_pool = None
        self.service_start_time = datetime.now()

    async def initialize(self):
        try:
            self.redis_pool = aioredis.from_url("redis://localhost:6379")
            self.db_pool = await asyncpg.create_pool(
                "postgresql://user:pass@localhost/db",
                min_size=1,
                max_size=5
            )
        except Exception as e:
            logging.error(f"Failed to initialize health checker: {e}")

    async def check_database(self) -> Dict[str, Any]:
        try:
            start_time = datetime.now()
            async with self.db_pool.acquire() as conn:
                result = await conn.fetchval("SELECT version()")
                response_time = (datetime.now() - start_time).total_seconds()
                return {
                    "status": "healthy",
                    "response_time_ms": round(response_time * 1000, 2),
                    "active_connections": self.db_pool.get_size()
                }
        except Exception as e:
            return {
                "status": "unhealthy",
                "error": str(e),
                "error_type": type(e).__name__
            }

    async def check_redis(self) -> Dict[str, Any]:
        try:
            start_time = datetime.now()
            info = await self.redis_pool.info()
            response_time = (datetime.now() - start_time).total_seconds()
            return {
                "status": "healthy",
                "response_time_ms": round(response_time * 1000, 2),
                "connected_clients": info.get("connected_clients", 0),
                "used_memory_human": info.get("used_memory_human", "unknown")
            }
        except Exception as e:
            return {
                "status": "unhealthy",
                "error": str(e),
                "error_type": type(e).__name__
            }

    async def check_system_resources(self) -> Dict[str, Any]:
        try:
            cpu_percent = psutil.cpu_percent(interval=0.1)
            memory = psutil.virtual_memory()
            disk = psutil.disk_usage('/')

            status = "healthy"
            warnings = []

            if cpu_percent > 80:
                warnings.append("High CPU usage")
                status = "degraded"
            if memory.percent > 85:
                warnings.append("High memory usage")
                status = "degraded"
            if (disk.used / disk.total) * 100 > 90:
                warnings.append("Low disk space")
                status = "unhealthy"

            return {
                "status": status,
                "cpu_percent": round(cpu_percent, 2),
                "memory_percent": round(memory.percent, 2),
                "disk_percent": round((disk.used / disk.total) * 100, 2),
                "warnings": warnings
            }
        except Exception as e:
            return {
                "status": "unhealthy",
                "error": str(e)
            }

health_checker = HealthChecker()

@app.on_event("startup")
async def startup():
    await health_checker.initialize()

@app.get("/health", response_model=HealthStatus)
async def health_check():
    check_tasks = [
        health_checker.check_database(),
        health_checker.check_redis(),
        health_checker.check_system_resources()
    ]

    try:
        checks = await asyncio.wait_for(
            asyncio.gather(*check_tasks, return_exceptions=True),
            timeout=5.0
        )
    except asyncio.TimeoutError:
        checks = [{"status": "timeout", "error": "Health check timeout"}] * 3

    check_results = {
        "database": checks[0] if not isinstance(checks[0], Exception) else {"status": "error", "error": str(checks[0])},
        "redis": checks[1] if not isinstance(checks[1], Exception) else {"status": "error", "error": str(checks[1])},
        "system": checks[2] if not isinstance(checks[2], Exception) else {"status": "error", "error": str(checks[2])}
    }

    # Determine overall status
    statuses = [check.get("status", "unknown") for check in check_results.values()]
    if "unhealthy" in statuses:
        overall_status = "unhealthy"
    elif "degraded" in statuses:
        overall_status = "degraded"
    elif all(status == "healthy" for status in statuses):
        overall_status = "healthy"
    else:
        overall_status = "unknown"

    uptime = datetime.now() - health_checker.service_start_time

    return HealthStatus(
        status=overall_status,
        timestamp=datetime.now(),
        checks=check_results,
        service_info={
            "uptime_seconds": int(uptime.total_seconds()),
            "version": "1.0.0",
            "environment": "production"
        }
    )
Enter fullscreen mode Exit fullscreen mode

The health check implementation provides multiple levels of validation. I include dependency checks for databases and caches, system resource monitoring, and service-specific validations. This comprehensive approach helps operations teams identify issues before they impact users.

Structured Logging with Correlation IDs

Distributed tracing through correlation IDs enables following requests across service boundaries. I've implemented a logging system that automatically propagates trace identifiers through HTTP headers and internal processing.

import logging
import uuid
from contextvars import ContextVar
from fastapi import FastAPI, Request, Response
from fastapi.middleware.base import BaseHTTPMiddleware
import json
from datetime import datetime
from typing import Any, Dict

# Context variable to store correlation ID
correlation_id: ContextVar[str] = ContextVar('correlation_id', default='')

class CorrelationIdMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # Extract or generate correlation ID
        corr_id = request.headers.get('X-Correlation-ID', str(uuid.uuid4()))
        correlation_id.set(corr_id)

        # Add to response headers
        response = await call_next(request)
        response.headers['X-Correlation-ID'] = corr_id
        return response

class StructuredLogger:
    def __init__(self, service_name: str):
        self.service_name = service_name
        self.logger = logging.getLogger(service_name)
        self.logger.setLevel(logging.INFO)

        # Create custom formatter
        handler = logging.StreamHandler()
        handler.setFormatter(self.CustomFormatter())
        self.logger.addHandler(handler)

    class CustomFormatter(logging.Formatter):
        def format(self, record):
            log_entry = {
                "timestamp": datetime.utcnow().isoformat() + "Z",
                "level": record.levelname,
                "service": getattr(record, 'service_name', 'unknown'),
                "correlation_id": correlation_id.get(''),
                "message": record.getMessage(),
                "module": record.module,
                "function": record.funcName,
                "line": record.lineno
            }

            # Add extra fields if present
            if hasattr(record, 'extra_fields'):
                log_entry.update(record.extra_fields)

            return json.dumps(log_entry)

    def info(self, message: str, **kwargs):
        extra = {'service_name': self.service_name}
        if kwargs:
            extra['extra_fields'] = kwargs
        self.logger.info(message, extra=extra)

    def error(self, message: str, error: Exception = None, **kwargs):
        extra = {'service_name': self.service_name}
        extra_fields = kwargs.copy()

        if error:
            extra_fields.update({
                'error_type': type(error).__name__,
                'error_message': str(error)
            })

        if extra_fields:
            extra['extra_fields'] = extra_fields

        self.logger.error(message, extra=extra)

    def warning(self, message: str, **kwargs):
        extra = {'service_name': self.service_name}
        if kwargs:
            extra['extra_fields'] = kwargs
        self.logger.warning(message, extra=extra)

# Initialize logger
logger = StructuredLogger("user-service")

app = FastAPI()
app.add_middleware(CorrelationIdMiddleware)

@app.middleware("http")
async def log_requests(request: Request, call_next):
    start_time = datetime.now()

    logger.info(
        "Request started",
        method=request.method,
        url=str(request.url),
        client_ip=request.client.host
    )

    try:
        response = await call_next(request)

        duration = (datetime.now() - start_time).total_seconds()
        logger.info(
            "Request completed",
            method=request.method,
            url=str(request.url),
            status_code=response.status_code,
            duration_ms=round(duration * 1000, 2)
        )

        return response

    except Exception as e:
        duration = (datetime.now() - start_time).total_seconds()
        logger.error(
            "Request failed",
            error=e,
            method=request.method,
            url=str(request.url),
            duration_ms=round(duration * 1000, 2)
        )
        raise

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    logger.info("Fetching user", user_id=user_id)

    try:
        # Simulate user fetch
        if user_id <= 0:
            raise ValueError("Invalid user ID")

        user_data = {"id": user_id, "name": f"User {user_id}"}
        logger.info("User found", user_id=user_id, user_name=user_data["name"])
        return user_data

    except ValueError as e:
        logger.error("Invalid user request", error=e, user_id=user_id)
        raise HTTPException(status_code=400, detail=str(e))
    except Exception as e:
        logger.error("Failed to fetch user", error=e, user_id=user_id)
        raise HTTPException(status_code=500, detail="Internal server error")
Enter fullscreen mode Exit fullscreen mode

This logging approach creates searchable, structured logs that operations teams can analyze efficiently. I include contextual information like user IDs, operation types, and performance metrics in each log entry.

Configuration Management

Production services require flexible configuration management that separates secrets from code. I use environment-based configuration with validation to ensure all required settings are present at startup.

from pydantic import BaseSettings, validator
from typing import Optional, List
import os
from enum import Enum

class LogLevel(str, Enum):
    DEBUG = "DEBUG"
    INFO = "INFO"
    WARNING = "WARNING"
    ERROR = "ERROR"

class DatabaseConfig(BaseSettings):
    host: str = "localhost"
    port: int = 5432
    username: str
    password: str
    database: str
    pool_size: int = 10
    max_overflow: int = 20

    @property
    def url(self) -> str:
        return f"postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}"

    class Config:
        env_prefix = "DB_"

class RedisConfig(BaseSettings):
    host: str = "localhost"
    port: int = 6379
    password: Optional[str] = None
    db: int = 0
    max_connections: int = 20

    @property
    def url(self) -> str:
        auth = f":{self.password}@" if self.password else ""
        return f"redis://{auth}{self.host}:{self.port}/{self.db}"

    class Config:
        env_prefix = "REDIS_"

class ServiceConfig(BaseSettings):
    name: str = "microservice"
    version: str = "1.0.0"
    host: str = "0.0.0.0"
    port: int = 8000
    debug: bool = False
    log_level: LogLevel = LogLevel.INFO
    cors_origins: List[str] = ["*"]
    api_key: str
    jwt_secret: str

    # External service URLs
    auth_service_url: str
    notification_service_url: str

    # Feature flags
    enable_metrics: bool = True
    enable_tracing: bool = True
    enable_rate_limiting: bool = True

    @validator('cors_origins', pre=True)
    def parse_cors_origins(cls, v):
        if isinstance(v, str):
            return [origin.strip() for origin in v.split(',')]
        return v

    @validator('api_key', 'jwt_secret')
    def validate_secrets(cls, v):
        if not v or len(v) < 32:
            raise ValueError('Secret must be at least 32 characters long')
        return v

    class Config:
        env_file = ".env"
        case_sensitive = False

class Config:
    def __init__(self):
        self.service = ServiceConfig()
        self.database = DatabaseConfig()
        self.redis = RedisConfig()

    def validate(self):
        """Validate configuration on startup"""
        required_configs = [
            self.service.api_key,
            self.service.jwt_secret,
            self.database.username,
            self.database.password
        ]

        missing = [config for config in required_configs if not config]
        if missing:
            raise ValueError(f"Missing required configuration: {missing}")

        return True

# Global configuration instance
config = Config()

# Configuration validation decorator
def require_config(func):
    """Decorator to ensure configuration is valid before function execution"""
    def wrapper(*args, **kwargs):
        config.validate()
        return func(*args, **kwargs)
    return wrapper

@require_config
def create_database_connection():
    """Create database connection using validated configuration"""
    return config.database.url

# Environment-specific configuration loading
def load_config_for_environment(env: str = None):
    """Load configuration for specific environment"""
    env = env or os.getenv("ENVIRONMENT", "development")

    env_files = {
        "development": ".env.dev",
        "testing": ".env.test",
        "staging": ".env.staging",
        "production": ".env.prod"
    }

    env_file = env_files.get(env, ".env")
    if os.path.exists(env_file):
        config.service = ServiceConfig(_env_file=env_file)
        config.database = DatabaseConfig(_env_file=env_file)
        config.redis = RedisConfig(_env_file=env_file)

    config.validate()
    return config
Enter fullscreen mode Exit fullscreen mode

This configuration system provides type safety, validation, and environment-specific overrides. I separate database, cache, and service configurations to maintain clarity and enable independent scaling decisions.

Graceful Shutdown Handling

Production services must handle shutdown signals gracefully to avoid data corruption and ensure ongoing requests complete successfully. I implement signal handlers that coordinate cleanup operations.

import signal
import asyncio
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
import uvicorn
from typing import Callable, List
import time

class GracefulShutdownManager:
    def __init__(self, app: FastAPI):
        self.app = app
        self.shutdown_handlers: List[Callable] = []
        self.is_shutting_down = False
        self.active_requests = 0
        self.shutdown_timeout = 30  # seconds

    def add_shutdown_handler(self, handler: Callable):
        """Add a cleanup handler to be called during shutdown"""
        self.shutdown_handlers.append(handler)

    async def graceful_shutdown(self, signum, frame):
        """Handle shutdown signals gracefully"""
        logging.info(f"Received signal {signum}, initiating graceful shutdown...")
        self.is_shutting_down = True

        # Wait for active requests to complete
        start_time = time.time()
        while self.active_requests > 0 and (time.time() - start_time) < self.shutdown_timeout:
            logging.info(f"Waiting for {self.active_requests} active requests to complete...")
            await asyncio.sleep(1)

        # Execute shutdown handlers
        for handler in self.shutdown_handlers:
            try:
                if asyncio.iscoroutinefunction(handler):
                    await handler()
                else:
                    handler()
                logging.info(f"Executed shutdown handler: {handler.__name__}")
            except Exception as e:
                logging.error(f"Error in shutdown handler {handler.__name__}: {e}")

        logging.info("Graceful shutdown completed")

    def setup_signal_handlers(self):
        """Setup signal handlers for graceful shutdown"""
        def signal_handler(signum, frame):
            loop = asyncio.get_event_loop()
            loop.create_task(self.graceful_shutdown(signum, frame))

        signal.signal(signal.SIGTERM, signal_handler)
        signal.signal(signal.SIGINT, signal_handler)

# Application with graceful shutdown
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    logging.info("Starting up service...")

    # Initialize database pool
    db_pool = await create_db_pool()
    app.state.db_pool = db_pool

    # Initialize Redis connection
    redis_client = await create_redis_client()
    app.state.redis = redis_client

    # Setup shutdown manager
    shutdown_manager = GracefulShutdownManager(app)
    app.state.shutdown_manager = shutdown_manager

    # Register cleanup handlers
    shutdown_manager.add_shutdown_handler(cleanup_database)
    shutdown_manager.add_shutdown_handler(cleanup_redis)
    shutdown_manager.add_shutdown_handler(cleanup_background_tasks)

    # Setup signal handlers
    shutdown_manager.setup_signal_handlers()

    logging.info("Service startup completed")

    yield

    # Shutdown
    logging.info("Shutting down service...")
    await shutdown_manager.graceful_shutdown(None, None)

app = FastAPI(lifespan=lifespan)

@app.middleware("http")
async def track_requests(request, call_next):
    """Track active requests for graceful shutdown"""
    shutdown_manager = getattr(app.state, 'shutdown_manager', None)

    if shutdown_manager and shutdown_manager.is_shutting_down:
        return {"error": "Service is shutting down"}, 503

    if shutdown_manager:
        shutdown_manager.active_requests += 1

    try:
        response = await call_next(request)
        return response
    finally:
        if shutdown_manager:
            shutdown_manager.active_requests -= 1

# Cleanup handlers
async def cleanup_database():
    """Close database connections"""
    if hasattr(app.state, 'db_pool') and app.state.db_pool:
        await app.state.db_pool.close()
        logging.info("Database connections closed")

async def cleanup_redis():
    """Close Redis connections"""
    if hasattr(app.state, 'redis') and app.state.redis:
        await app.state.redis.close()
        logging.info("Redis connections closed")

async def cleanup_background_tasks():
    """Cancel running background tasks"""
    tasks = [task for task in asyncio.all_tasks() if not task.done()]
    if tasks:
        logging.info(f"Cancelling {len(tasks)} background tasks")
        for task in tasks:
            task.cancel()
        await asyncio.gather(*tasks, return_exceptions=True)

# Helper functions for initialization
async def create_db_pool():
    """Create database connection pool"""
    # Implementation depends on your database driver
    pass

async def create_redis_client():
    """Create Redis client"""
    # Implementation depends on your Redis client
    pass

if __name__ == "__main__":
    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        port=8000,
        reload=False,
        log_config={
            "version": 1,
            "disable_existing_loggers": False,
            "formatters": {
                "default": {
                    "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
                },
            },
            "handlers": {
                "default": {
                    "formatter": "default",
                    "class": "logging.StreamHandler",
                    "stream": "ext://sys.stdout",
                },
            },
            "root": {
                "level": "INFO",
                "handlers": ["default"],
            },
        }
    )
Enter fullscreen mode Exit fullscreen mode

This shutdown system ensures that ongoing requests complete before service termination. I track active requests and provide configurable timeout periods to balance graceful shutdown with deployment speed requirements.

API Versioning Strategy

API versioning enables service evolution while maintaining backward compatibility. I prefer header-based versioning combined with content negotiation for flexibility in client implementations.

from fastapi import FastAPI, Header, HTTPException, Depends
from pydantic import BaseModel
from typing import Optional, Dict, Any
from enum import Enum
import json

class APIVersion(str, Enum):
    V1 = "1.0"
    V2 = "2.0"
    V3 = "3.0"

class VersionedResponse(BaseModel):
    data: Dict[str, Any]
    meta: Dict[str, Any]
    version: str

class APIVersionManager:
    def __init__(self):
        self.supported_versions = {
            APIVersion.V1: "1.0",
            APIVersion.V2: "2.0", 
            APIVersion.V3: "3.0"
        }
        self.default_version = APIVersion.V2
        self.deprecated_versions = {APIVersion.V1}

    def parse_version(self, version_header: Optional[str], accept_header: Optional[str]) -> APIVersion:
        """Parse API version from headers"""
        # Check explicit version header first
        if version_header:
            try:
                requested_version = APIVersion(version_header)
                if requested_version in self.supported_versions:
                    return requested_version
            except ValueError:
                pass

        # Check Accept header for version
        if accept_header:
            if "application/vnd.api.v1+json" in accept_header:
                return APIVersion.V1
            elif "application/vnd.api.v2+json" in accept_header:
                return APIVersion.V2
            elif "application/vnd.api.v3+json" in accept_header:
                return APIVersion.V3

        return self.default_version

    def is_deprecated(self, version: APIVersion) -> bool:
        return version in self.deprecated_versions

version_manager = APIVersionManager()

def get_api_version(
    api_version: Optional[str] = Header(None, alias="X-API-Version"),
    accept: Optional[str] = Header(None)
) -> APIVersion:
    """Dependency to extract API version from request headers"""
    return version_manager.parse_version(api_version, accept)

app = FastAPI()

# Version-specific response models
class UserV1(BaseModel):
    id: int
    name: str
    email: str

class UserV2(BaseModel):
    id: int
    first_name: str
    last_name: str
    email: str
    created_at: str

class UserV3(BaseModel):
    id: int
    profile: Dict[str, str]
    contact: Dict[str, str]
    metadata: Dict[str, Any]

class UserService:
    """Service layer with version-aware transformations"""

    def get_user_data(self, user_id: int) -> Dict[str, Any]:
        """Get raw user data from database"""
        # Simulate database fetch
        return {
            "id": user_id,
            "first_name": "John",
            "last_name": "Doe",
            "email": "john.doe@example.com",
            "created_at": "2023-01-01T00:00:00Z",
            "phone": "+1234567890",
            "address": "123 Main St",
            "preferences": {"theme": "dark", "notifications": True}
        }

    def transform_user_v1(self, user_data: Dict[str, Any]) -> UserV1:
        """Transform user data for API v1"""
        return UserV1(
            id=user_data["id"],
            name=f"{user_data['first_name']} {user_data['last_name']}",
            email=user_data["email"]
        )

    def transform_user_v2(self, user_data: Dict[str, Any]) -> UserV2:
        """Transform user data for API v2"""
        return UserV2(
            id=user_data["id"],
            first_name=user_data["first_name"],
            last_name=user_data["last_name"],
            email=user_data["email"],
            created_at=user_data["created_at"]
        )

    def transform_user_v3(self, user_data: Dict[str, Any]) -> UserV3:
        """Transform user data for API v3"""
        return UserV3(
            id=user_data["id"],
            profile={
                "first_name": user_data["first_name"],
                "last_name": user_data["last_name"]
            },
            contact={
                "email": user_data["email"],
                "phone": user_data["phone"],
                "address": user_data["address"]
            },
            metadata={
                "created_at": user_data["created_at"],
                "preferences": user_data["preferences"]
            }
        )

user_service = UserService()

@app.get("/users/{user_id}")
async def get_user(
    user_id: int,
    version: APIVersion = Depends(get_api_version)
):
    """Get user with version-specific response format"""

    # Add deprecation warning for old versions
    headers = {}
    if version_manager.is_deprecated(version):
        headers["X-API-Deprecated"] = "true"
        headers["X-API-Deprecation-Date"] = "2024-12-31"
        headers["X-API-Replacement"] = "2.0"

    # Fetch user data
    try:
        user_data = user_service.get_user_data(user_id)
    except Exception as e:
        raise HTTPException(status_code=404, detail="User not found")

    # Transform based on version
    if version == APIVersion.V1:
        transformed_user = user_service.transform_user_v1(user_data)
        response_data = transformed_user.dict()
    elif version == APIVersion.V2:
        transformed_user = user_service.transform_user_v2(user_data)
        response_data = transformed_user.dict()
    elif version == APIVersion.V3:
        transformed_user = user_service.transform_user_v3(user_data)
        response_data = transformed_user.dict()
    else:
        raise HTTPException(status_code=400, detail="Unsupported API version")

    # Build versioned response
    response = VersionedResponse(
        data=response_data,
        meta={
            "version": version.value,
            "deprecated": version_manager.is_deprecated(version),
            "timestamp": "2023-01-01T00:00:00Z"
        },
        version=version.value
    )

    return response

@app.get("/api/versions")
async def get_supported_versions():
    """Return supported API versions"""
    return {
        "supported_versions": list(version_manager.supported_versions.values()),
        "default_version": version_manager.default_version.value,
        "deprecated_versions": [v.value for v in version_manager.deprecated_versions]
    }

# Middleware to add version headers to all responses
@app.middleware("http")
async def add_version_headers(request, call_next):
    response = await call_next(request)
    response.headers["X-API-Supported-Versions"] = ",".join(
        version_manager.supported_versions.values()
    )
    response.headers["X-API-Default-Version"] = version_manager.default_version.value
    return response
Enter fullscreen mode Exit fullscreen mode

This versioning approach allows clients to specify their preferred API version while providing clear migration paths. I include deprecation warnings and metadata to help clients understand version lifecycles.

Database Migration Management

Database schema changes require careful coordination across environments. I use Alembic for migration management with automated rollback capabilities and environment-specific configurations.


python
from alembic import command
from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.operations import Operations
from sqlalchemy import create_engine, MetaData, inspect
from sqlalchemy.orm import sessionmaker
import logging
import os
from typing import List, Optional

class MigrationManager:
    def __init__(self, database_url: str, alembic_ini_path: str = "alembic.ini"):
        self.database_url = database_url
        self.engine = create_engine(database_url)
        self.alembic_cfg = Config(alembic_ini_path)
        self.alembic_cfg.set_main_option("sqlalchemy.url", database_url)

        # Setup logging
        self.logger = logging.getLogger(__name__)

    def get_current_revision(self) -> Optional[str]:
        """Get current database revision"""
        try:
            with self.engine.connect() as conn:
                context = MigrationContext.configure(conn)
                return context.get_current_revision()
        except Exception as e:
            self.logger.error(f"Failed to get current revision: {e}")
            return None

    def get_pending_migrations(self) -> List[str]:
        """Get list of pending migrations"""
        try:
            # Create a script directory from the config
            from alembic.script import ScriptDirectory
            script = ScriptDirectory.from_config(self
---
📘 **Checkout my [latest ebook](https://youtu.be/WpR6F4ky4uM) for free on my channel!**  
Be sure to **like**, **share**, **comment**, and **subscribe** to the channel!

---
## 101 Books

**101 Books** is an AI-driven publishing company co-founded by author **Aarav Joshi**. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as **$4**—making quality knowledge accessible to everyone.

Check out our book **[Golang Clean Code](https://www.amazon.com/dp/B0DQQF9K3Z)** available on Amazon. 

Stay tuned for updates and exciting news. When shopping for books, search for **Aarav Joshi** to find more of our titles. Use the provided link to enjoy **special discounts**!

## Our Creations

Be sure to check out our creations:

**[Investor Central](https://www.investorcentral.co.uk/)** | **[Investor Central Spanish](https://spanish.investorcentral.co.uk/)** | **[Investor Central German](https://german.investorcentral.co.uk/)** | **[Smart Living](https://smartliving.investorcentral.co.uk/)** | **[Epochs & Echoes](https://epochsandechoes.com/)** | **[Puzzling Mysteries](https://www.puzzlingmysteries.com/)** | **[Hindutva](http://hindutva.epochsandechoes.com/)** | **[Elite Dev](https://elitedev.in/)** | **[JS Schools](https://jsschools.com/)**

---

### We are on Medium

**[Tech Koala Insights](https://techkoalainsights.com/)** | **[Epochs & Echoes World](https://world.epochsandechoes.com/)** | **[Investor Central Medium](https://medium.investorcentral.co.uk/)** | **[Puzzling Mysteries Medium](https://medium.com/puzzling-mysteries)** | **[Science & Epochs Medium](https://science.epochsandechoes.com/)** | **[Modern Hindutva](https://modernhindutva.substack.com/)**

Enter fullscreen mode Exit fullscreen mode

Top comments (0)