As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
Building production-ready microservices in Python requires mastering specific techniques that address the complexities of distributed systems. I've spent considerable time refining these approaches through real-world deployments, and I can share nine essential patterns that consistently deliver reliable, maintainable services.
Health Check Implementation
Health checks form the foundation of reliable microservice monitoring. I've found that comprehensive health checks must validate both internal service state and external dependencies to provide meaningful status information to orchestration platforms.
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import asyncio
import aioredis
import asyncpg
from datetime import datetime
from typing import Dict, Any
import psutil
import logging
app = FastAPI()
class HealthStatus(BaseModel):
status: str
timestamp: datetime
checks: Dict[str, Any]
service_info: Dict[str, str]
class HealthChecker:
def __init__(self):
self.checks = {}
self.redis_pool = None
self.db_pool = None
self.service_start_time = datetime.now()
async def initialize(self):
try:
self.redis_pool = aioredis.from_url("redis://localhost:6379")
self.db_pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/db",
min_size=1,
max_size=5
)
except Exception as e:
logging.error(f"Failed to initialize health checker: {e}")
async def check_database(self) -> Dict[str, Any]:
try:
start_time = datetime.now()
async with self.db_pool.acquire() as conn:
result = await conn.fetchval("SELECT version()")
response_time = (datetime.now() - start_time).total_seconds()
return {
"status": "healthy",
"response_time_ms": round(response_time * 1000, 2),
"active_connections": self.db_pool.get_size()
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e),
"error_type": type(e).__name__
}
async def check_redis(self) -> Dict[str, Any]:
try:
start_time = datetime.now()
info = await self.redis_pool.info()
response_time = (datetime.now() - start_time).total_seconds()
return {
"status": "healthy",
"response_time_ms": round(response_time * 1000, 2),
"connected_clients": info.get("connected_clients", 0),
"used_memory_human": info.get("used_memory_human", "unknown")
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e),
"error_type": type(e).__name__
}
async def check_system_resources(self) -> Dict[str, Any]:
try:
cpu_percent = psutil.cpu_percent(interval=0.1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
status = "healthy"
warnings = []
if cpu_percent > 80:
warnings.append("High CPU usage")
status = "degraded"
if memory.percent > 85:
warnings.append("High memory usage")
status = "degraded"
if (disk.used / disk.total) * 100 > 90:
warnings.append("Low disk space")
status = "unhealthy"
return {
"status": status,
"cpu_percent": round(cpu_percent, 2),
"memory_percent": round(memory.percent, 2),
"disk_percent": round((disk.used / disk.total) * 100, 2),
"warnings": warnings
}
except Exception as e:
return {
"status": "unhealthy",
"error": str(e)
}
health_checker = HealthChecker()
@app.on_event("startup")
async def startup():
await health_checker.initialize()
@app.get("/health", response_model=HealthStatus)
async def health_check():
check_tasks = [
health_checker.check_database(),
health_checker.check_redis(),
health_checker.check_system_resources()
]
try:
checks = await asyncio.wait_for(
asyncio.gather(*check_tasks, return_exceptions=True),
timeout=5.0
)
except asyncio.TimeoutError:
checks = [{"status": "timeout", "error": "Health check timeout"}] * 3
check_results = {
"database": checks[0] if not isinstance(checks[0], Exception) else {"status": "error", "error": str(checks[0])},
"redis": checks[1] if not isinstance(checks[1], Exception) else {"status": "error", "error": str(checks[1])},
"system": checks[2] if not isinstance(checks[2], Exception) else {"status": "error", "error": str(checks[2])}
}
# Determine overall status
statuses = [check.get("status", "unknown") for check in check_results.values()]
if "unhealthy" in statuses:
overall_status = "unhealthy"
elif "degraded" in statuses:
overall_status = "degraded"
elif all(status == "healthy" for status in statuses):
overall_status = "healthy"
else:
overall_status = "unknown"
uptime = datetime.now() - health_checker.service_start_time
return HealthStatus(
status=overall_status,
timestamp=datetime.now(),
checks=check_results,
service_info={
"uptime_seconds": int(uptime.total_seconds()),
"version": "1.0.0",
"environment": "production"
}
)
The health check implementation provides multiple levels of validation. I include dependency checks for databases and caches, system resource monitoring, and service-specific validations. This comprehensive approach helps operations teams identify issues before they impact users.
Structured Logging with Correlation IDs
Distributed tracing through correlation IDs enables following requests across service boundaries. I've implemented a logging system that automatically propagates trace identifiers through HTTP headers and internal processing.
import logging
import uuid
from contextvars import ContextVar
from fastapi import FastAPI, Request, Response
from fastapi.middleware.base import BaseHTTPMiddleware
import json
from datetime import datetime
from typing import Any, Dict
# Context variable to store correlation ID
correlation_id: ContextVar[str] = ContextVar('correlation_id', default='')
class CorrelationIdMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Extract or generate correlation ID
corr_id = request.headers.get('X-Correlation-ID', str(uuid.uuid4()))
correlation_id.set(corr_id)
# Add to response headers
response = await call_next(request)
response.headers['X-Correlation-ID'] = corr_id
return response
class StructuredLogger:
def __init__(self, service_name: str):
self.service_name = service_name
self.logger = logging.getLogger(service_name)
self.logger.setLevel(logging.INFO)
# Create custom formatter
handler = logging.StreamHandler()
handler.setFormatter(self.CustomFormatter())
self.logger.addHandler(handler)
class CustomFormatter(logging.Formatter):
def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"service": getattr(record, 'service_name', 'unknown'),
"correlation_id": correlation_id.get(''),
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
# Add extra fields if present
if hasattr(record, 'extra_fields'):
log_entry.update(record.extra_fields)
return json.dumps(log_entry)
def info(self, message: str, **kwargs):
extra = {'service_name': self.service_name}
if kwargs:
extra['extra_fields'] = kwargs
self.logger.info(message, extra=extra)
def error(self, message: str, error: Exception = None, **kwargs):
extra = {'service_name': self.service_name}
extra_fields = kwargs.copy()
if error:
extra_fields.update({
'error_type': type(error).__name__,
'error_message': str(error)
})
if extra_fields:
extra['extra_fields'] = extra_fields
self.logger.error(message, extra=extra)
def warning(self, message: str, **kwargs):
extra = {'service_name': self.service_name}
if kwargs:
extra['extra_fields'] = kwargs
self.logger.warning(message, extra=extra)
# Initialize logger
logger = StructuredLogger("user-service")
app = FastAPI()
app.add_middleware(CorrelationIdMiddleware)
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = datetime.now()
logger.info(
"Request started",
method=request.method,
url=str(request.url),
client_ip=request.client.host
)
try:
response = await call_next(request)
duration = (datetime.now() - start_time).total_seconds()
logger.info(
"Request completed",
method=request.method,
url=str(request.url),
status_code=response.status_code,
duration_ms=round(duration * 1000, 2)
)
return response
except Exception as e:
duration = (datetime.now() - start_time).total_seconds()
logger.error(
"Request failed",
error=e,
method=request.method,
url=str(request.url),
duration_ms=round(duration * 1000, 2)
)
raise
@app.get("/users/{user_id}")
async def get_user(user_id: int):
logger.info("Fetching user", user_id=user_id)
try:
# Simulate user fetch
if user_id <= 0:
raise ValueError("Invalid user ID")
user_data = {"id": user_id, "name": f"User {user_id}"}
logger.info("User found", user_id=user_id, user_name=user_data["name"])
return user_data
except ValueError as e:
logger.error("Invalid user request", error=e, user_id=user_id)
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error("Failed to fetch user", error=e, user_id=user_id)
raise HTTPException(status_code=500, detail="Internal server error")
This logging approach creates searchable, structured logs that operations teams can analyze efficiently. I include contextual information like user IDs, operation types, and performance metrics in each log entry.
Configuration Management
Production services require flexible configuration management that separates secrets from code. I use environment-based configuration with validation to ensure all required settings are present at startup.
from pydantic import BaseSettings, validator
from typing import Optional, List
import os
from enum import Enum
class LogLevel(str, Enum):
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
class DatabaseConfig(BaseSettings):
host: str = "localhost"
port: int = 5432
username: str
password: str
database: str
pool_size: int = 10
max_overflow: int = 20
@property
def url(self) -> str:
return f"postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}"
class Config:
env_prefix = "DB_"
class RedisConfig(BaseSettings):
host: str = "localhost"
port: int = 6379
password: Optional[str] = None
db: int = 0
max_connections: int = 20
@property
def url(self) -> str:
auth = f":{self.password}@" if self.password else ""
return f"redis://{auth}{self.host}:{self.port}/{self.db}"
class Config:
env_prefix = "REDIS_"
class ServiceConfig(BaseSettings):
name: str = "microservice"
version: str = "1.0.0"
host: str = "0.0.0.0"
port: int = 8000
debug: bool = False
log_level: LogLevel = LogLevel.INFO
cors_origins: List[str] = ["*"]
api_key: str
jwt_secret: str
# External service URLs
auth_service_url: str
notification_service_url: str
# Feature flags
enable_metrics: bool = True
enable_tracing: bool = True
enable_rate_limiting: bool = True
@validator('cors_origins', pre=True)
def parse_cors_origins(cls, v):
if isinstance(v, str):
return [origin.strip() for origin in v.split(',')]
return v
@validator('api_key', 'jwt_secret')
def validate_secrets(cls, v):
if not v or len(v) < 32:
raise ValueError('Secret must be at least 32 characters long')
return v
class Config:
env_file = ".env"
case_sensitive = False
class Config:
def __init__(self):
self.service = ServiceConfig()
self.database = DatabaseConfig()
self.redis = RedisConfig()
def validate(self):
"""Validate configuration on startup"""
required_configs = [
self.service.api_key,
self.service.jwt_secret,
self.database.username,
self.database.password
]
missing = [config for config in required_configs if not config]
if missing:
raise ValueError(f"Missing required configuration: {missing}")
return True
# Global configuration instance
config = Config()
# Configuration validation decorator
def require_config(func):
"""Decorator to ensure configuration is valid before function execution"""
def wrapper(*args, **kwargs):
config.validate()
return func(*args, **kwargs)
return wrapper
@require_config
def create_database_connection():
"""Create database connection using validated configuration"""
return config.database.url
# Environment-specific configuration loading
def load_config_for_environment(env: str = None):
"""Load configuration for specific environment"""
env = env or os.getenv("ENVIRONMENT", "development")
env_files = {
"development": ".env.dev",
"testing": ".env.test",
"staging": ".env.staging",
"production": ".env.prod"
}
env_file = env_files.get(env, ".env")
if os.path.exists(env_file):
config.service = ServiceConfig(_env_file=env_file)
config.database = DatabaseConfig(_env_file=env_file)
config.redis = RedisConfig(_env_file=env_file)
config.validate()
return config
This configuration system provides type safety, validation, and environment-specific overrides. I separate database, cache, and service configurations to maintain clarity and enable independent scaling decisions.
Graceful Shutdown Handling
Production services must handle shutdown signals gracefully to avoid data corruption and ensure ongoing requests complete successfully. I implement signal handlers that coordinate cleanup operations.
import signal
import asyncio
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
import uvicorn
from typing import Callable, List
import time
class GracefulShutdownManager:
def __init__(self, app: FastAPI):
self.app = app
self.shutdown_handlers: List[Callable] = []
self.is_shutting_down = False
self.active_requests = 0
self.shutdown_timeout = 30 # seconds
def add_shutdown_handler(self, handler: Callable):
"""Add a cleanup handler to be called during shutdown"""
self.shutdown_handlers.append(handler)
async def graceful_shutdown(self, signum, frame):
"""Handle shutdown signals gracefully"""
logging.info(f"Received signal {signum}, initiating graceful shutdown...")
self.is_shutting_down = True
# Wait for active requests to complete
start_time = time.time()
while self.active_requests > 0 and (time.time() - start_time) < self.shutdown_timeout:
logging.info(f"Waiting for {self.active_requests} active requests to complete...")
await asyncio.sleep(1)
# Execute shutdown handlers
for handler in self.shutdown_handlers:
try:
if asyncio.iscoroutinefunction(handler):
await handler()
else:
handler()
logging.info(f"Executed shutdown handler: {handler.__name__}")
except Exception as e:
logging.error(f"Error in shutdown handler {handler.__name__}: {e}")
logging.info("Graceful shutdown completed")
def setup_signal_handlers(self):
"""Setup signal handlers for graceful shutdown"""
def signal_handler(signum, frame):
loop = asyncio.get_event_loop()
loop.create_task(self.graceful_shutdown(signum, frame))
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Application with graceful shutdown
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logging.info("Starting up service...")
# Initialize database pool
db_pool = await create_db_pool()
app.state.db_pool = db_pool
# Initialize Redis connection
redis_client = await create_redis_client()
app.state.redis = redis_client
# Setup shutdown manager
shutdown_manager = GracefulShutdownManager(app)
app.state.shutdown_manager = shutdown_manager
# Register cleanup handlers
shutdown_manager.add_shutdown_handler(cleanup_database)
shutdown_manager.add_shutdown_handler(cleanup_redis)
shutdown_manager.add_shutdown_handler(cleanup_background_tasks)
# Setup signal handlers
shutdown_manager.setup_signal_handlers()
logging.info("Service startup completed")
yield
# Shutdown
logging.info("Shutting down service...")
await shutdown_manager.graceful_shutdown(None, None)
app = FastAPI(lifespan=lifespan)
@app.middleware("http")
async def track_requests(request, call_next):
"""Track active requests for graceful shutdown"""
shutdown_manager = getattr(app.state, 'shutdown_manager', None)
if shutdown_manager and shutdown_manager.is_shutting_down:
return {"error": "Service is shutting down"}, 503
if shutdown_manager:
shutdown_manager.active_requests += 1
try:
response = await call_next(request)
return response
finally:
if shutdown_manager:
shutdown_manager.active_requests -= 1
# Cleanup handlers
async def cleanup_database():
"""Close database connections"""
if hasattr(app.state, 'db_pool') and app.state.db_pool:
await app.state.db_pool.close()
logging.info("Database connections closed")
async def cleanup_redis():
"""Close Redis connections"""
if hasattr(app.state, 'redis') and app.state.redis:
await app.state.redis.close()
logging.info("Redis connections closed")
async def cleanup_background_tasks():
"""Cancel running background tasks"""
tasks = [task for task in asyncio.all_tasks() if not task.done()]
if tasks:
logging.info(f"Cancelling {len(tasks)} background tasks")
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
# Helper functions for initialization
async def create_db_pool():
"""Create database connection pool"""
# Implementation depends on your database driver
pass
async def create_redis_client():
"""Create Redis client"""
# Implementation depends on your Redis client
pass
if __name__ == "__main__":
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=False,
log_config={
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
},
},
"handlers": {
"default": {
"formatter": "default",
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
},
},
"root": {
"level": "INFO",
"handlers": ["default"],
},
}
)
This shutdown system ensures that ongoing requests complete before service termination. I track active requests and provide configurable timeout periods to balance graceful shutdown with deployment speed requirements.
API Versioning Strategy
API versioning enables service evolution while maintaining backward compatibility. I prefer header-based versioning combined with content negotiation for flexibility in client implementations.
from fastapi import FastAPI, Header, HTTPException, Depends
from pydantic import BaseModel
from typing import Optional, Dict, Any
from enum import Enum
import json
class APIVersion(str, Enum):
V1 = "1.0"
V2 = "2.0"
V3 = "3.0"
class VersionedResponse(BaseModel):
data: Dict[str, Any]
meta: Dict[str, Any]
version: str
class APIVersionManager:
def __init__(self):
self.supported_versions = {
APIVersion.V1: "1.0",
APIVersion.V2: "2.0",
APIVersion.V3: "3.0"
}
self.default_version = APIVersion.V2
self.deprecated_versions = {APIVersion.V1}
def parse_version(self, version_header: Optional[str], accept_header: Optional[str]) -> APIVersion:
"""Parse API version from headers"""
# Check explicit version header first
if version_header:
try:
requested_version = APIVersion(version_header)
if requested_version in self.supported_versions:
return requested_version
except ValueError:
pass
# Check Accept header for version
if accept_header:
if "application/vnd.api.v1+json" in accept_header:
return APIVersion.V1
elif "application/vnd.api.v2+json" in accept_header:
return APIVersion.V2
elif "application/vnd.api.v3+json" in accept_header:
return APIVersion.V3
return self.default_version
def is_deprecated(self, version: APIVersion) -> bool:
return version in self.deprecated_versions
version_manager = APIVersionManager()
def get_api_version(
api_version: Optional[str] = Header(None, alias="X-API-Version"),
accept: Optional[str] = Header(None)
) -> APIVersion:
"""Dependency to extract API version from request headers"""
return version_manager.parse_version(api_version, accept)
app = FastAPI()
# Version-specific response models
class UserV1(BaseModel):
id: int
name: str
email: str
class UserV2(BaseModel):
id: int
first_name: str
last_name: str
email: str
created_at: str
class UserV3(BaseModel):
id: int
profile: Dict[str, str]
contact: Dict[str, str]
metadata: Dict[str, Any]
class UserService:
"""Service layer with version-aware transformations"""
def get_user_data(self, user_id: int) -> Dict[str, Any]:
"""Get raw user data from database"""
# Simulate database fetch
return {
"id": user_id,
"first_name": "John",
"last_name": "Doe",
"email": "john.doe@example.com",
"created_at": "2023-01-01T00:00:00Z",
"phone": "+1234567890",
"address": "123 Main St",
"preferences": {"theme": "dark", "notifications": True}
}
def transform_user_v1(self, user_data: Dict[str, Any]) -> UserV1:
"""Transform user data for API v1"""
return UserV1(
id=user_data["id"],
name=f"{user_data['first_name']} {user_data['last_name']}",
email=user_data["email"]
)
def transform_user_v2(self, user_data: Dict[str, Any]) -> UserV2:
"""Transform user data for API v2"""
return UserV2(
id=user_data["id"],
first_name=user_data["first_name"],
last_name=user_data["last_name"],
email=user_data["email"],
created_at=user_data["created_at"]
)
def transform_user_v3(self, user_data: Dict[str, Any]) -> UserV3:
"""Transform user data for API v3"""
return UserV3(
id=user_data["id"],
profile={
"first_name": user_data["first_name"],
"last_name": user_data["last_name"]
},
contact={
"email": user_data["email"],
"phone": user_data["phone"],
"address": user_data["address"]
},
metadata={
"created_at": user_data["created_at"],
"preferences": user_data["preferences"]
}
)
user_service = UserService()
@app.get("/users/{user_id}")
async def get_user(
user_id: int,
version: APIVersion = Depends(get_api_version)
):
"""Get user with version-specific response format"""
# Add deprecation warning for old versions
headers = {}
if version_manager.is_deprecated(version):
headers["X-API-Deprecated"] = "true"
headers["X-API-Deprecation-Date"] = "2024-12-31"
headers["X-API-Replacement"] = "2.0"
# Fetch user data
try:
user_data = user_service.get_user_data(user_id)
except Exception as e:
raise HTTPException(status_code=404, detail="User not found")
# Transform based on version
if version == APIVersion.V1:
transformed_user = user_service.transform_user_v1(user_data)
response_data = transformed_user.dict()
elif version == APIVersion.V2:
transformed_user = user_service.transform_user_v2(user_data)
response_data = transformed_user.dict()
elif version == APIVersion.V3:
transformed_user = user_service.transform_user_v3(user_data)
response_data = transformed_user.dict()
else:
raise HTTPException(status_code=400, detail="Unsupported API version")
# Build versioned response
response = VersionedResponse(
data=response_data,
meta={
"version": version.value,
"deprecated": version_manager.is_deprecated(version),
"timestamp": "2023-01-01T00:00:00Z"
},
version=version.value
)
return response
@app.get("/api/versions")
async def get_supported_versions():
"""Return supported API versions"""
return {
"supported_versions": list(version_manager.supported_versions.values()),
"default_version": version_manager.default_version.value,
"deprecated_versions": [v.value for v in version_manager.deprecated_versions]
}
# Middleware to add version headers to all responses
@app.middleware("http")
async def add_version_headers(request, call_next):
response = await call_next(request)
response.headers["X-API-Supported-Versions"] = ",".join(
version_manager.supported_versions.values()
)
response.headers["X-API-Default-Version"] = version_manager.default_version.value
return response
This versioning approach allows clients to specify their preferred API version while providing clear migration paths. I include deprecation warnings and metadata to help clients understand version lifecycles.
Database Migration Management
Database schema changes require careful coordination across environments. I use Alembic for migration management with automated rollback capabilities and environment-specific configurations.
python
from alembic import command
from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.operations import Operations
from sqlalchemy import create_engine, MetaData, inspect
from sqlalchemy.orm import sessionmaker
import logging
import os
from typing import List, Optional
class MigrationManager:
def __init__(self, database_url: str, alembic_ini_path: str = "alembic.ini"):
self.database_url = database_url
self.engine = create_engine(database_url)
self.alembic_cfg = Config(alembic_ini_path)
self.alembic_cfg.set_main_option("sqlalchemy.url", database_url)
# Setup logging
self.logger = logging.getLogger(__name__)
def get_current_revision(self) -> Optional[str]:
"""Get current database revision"""
try:
with self.engine.connect() as conn:
context = MigrationContext.configure(conn)
return context.get_current_revision()
except Exception as e:
self.logger.error(f"Failed to get current revision: {e}")
return None
def get_pending_migrations(self) -> List[str]:
"""Get list of pending migrations"""
try:
# Create a script directory from the config
from alembic.script import ScriptDirectory
script = ScriptDirectory.from_config(self
---
📘 **Checkout my [latest ebook](https://youtu.be/WpR6F4ky4uM) for free on my channel!**
Be sure to **like**, **share**, **comment**, and **subscribe** to the channel!
---
## 101 Books
**101 Books** is an AI-driven publishing company co-founded by author **Aarav Joshi**. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as **$4**—making quality knowledge accessible to everyone.
Check out our book **[Golang Clean Code](https://www.amazon.com/dp/B0DQQF9K3Z)** available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for **Aarav Joshi** to find more of our titles. Use the provided link to enjoy **special discounts**!
## Our Creations
Be sure to check out our creations:
**[Investor Central](https://www.investorcentral.co.uk/)** | **[Investor Central Spanish](https://spanish.investorcentral.co.uk/)** | **[Investor Central German](https://german.investorcentral.co.uk/)** | **[Smart Living](https://smartliving.investorcentral.co.uk/)** | **[Epochs & Echoes](https://epochsandechoes.com/)** | **[Puzzling Mysteries](https://www.puzzlingmysteries.com/)** | **[Hindutva](http://hindutva.epochsandechoes.com/)** | **[Elite Dev](https://elitedev.in/)** | **[JS Schools](https://jsschools.com/)**
---
### We are on Medium
**[Tech Koala Insights](https://techkoalainsights.com/)** | **[Epochs & Echoes World](https://world.epochsandechoes.com/)** | **[Investor Central Medium](https://medium.investorcentral.co.uk/)** | **[Puzzling Mysteries Medium](https://medium.com/puzzling-mysteries)** | **[Science & Epochs Medium](https://science.epochsandechoes.com/)** | **[Modern Hindutva](https://modernhindutva.substack.com/)**
Top comments (0)