As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
Building distributed systems requires mastering several critical techniques that ensure components work together seamlessly across network boundaries. I've discovered through extensive hands-on experience that six key Python techniques form the foundation of robust distributed architectures.
Service Discovery and Registration
Service discovery solves the fundamental problem of how components find and communicate with each other in dynamic environments. When services start, move, or scale, other components need a reliable way to locate them without hardcoded addresses.
The consul-python library provides an excellent foundation for service registration with built-in health checking. I've found this approach particularly effective because it automatically removes unhealthy services from the registry, preventing requests to failing instances.
import consul
import time
import threading
import requests
from flask import Flask, jsonify
import logging
class ServiceRegistry:
def __init__(self, service_name, service_port, consul_host='localhost', consul_port=8500):
self.service_name = service_name
self.service_port = service_port
self.consul = consul.Consul(host=consul_host, port=consul_port)
self.service_id = f"{service_name}-{service_port}-{int(time.time())}"
self.logger = logging.getLogger(__name__)
def register(self):
try:
self.consul.agent.service.register(
name=self.service_name,
service_id=self.service_id,
address='localhost',
port=self.service_port,
tags=['python', 'web-service'],
check=consul.Check.http(
f"http://localhost:{self.service_port}/health",
interval="10s",
timeout="5s"
)
)
self.logger.info(f"Service {self.service_id} registered successfully")
except Exception as e:
self.logger.error(f"Failed to register service: {e}")
def deregister(self):
try:
self.consul.agent.service.deregister(self.service_id)
self.logger.info(f"Service {self.service_id} deregistered")
except Exception as e:
self.logger.error(f"Failed to deregister service: {e}")
def discover_service(self, service_name):
try:
services = self.consul.health.service(service_name, passing=True)[1]
endpoints = [(s['Service']['Address'], s['Service']['Port']) for s in services]
self.logger.info(f"Discovered {len(endpoints)} instances of {service_name}")
return endpoints
except Exception as e:
self.logger.error(f"Service discovery failed: {e}")
return []
class ServiceClient:
def __init__(self, registry, target_service):
self.registry = registry
self.target_service = target_service
self.current_endpoints = []
self.current_index = 0
def get_endpoint(self):
# Simple round-robin load balancing
endpoints = self.registry.discover_service(self.target_service)
if endpoints:
self.current_endpoints = endpoints
endpoint = self.current_endpoints[self.current_index % len(self.current_endpoints)]
self.current_index += 1
return f"http://{endpoint[0]}:{endpoint[1]}"
return None
def make_request(self, path, **kwargs):
endpoint = self.get_endpoint()
if endpoint:
try:
response = requests.get(f"{endpoint}{path}", timeout=5, **kwargs)
return response.json() if response.status_code == 200 else None
except requests.exceptions.RequestException as e:
logging.error(f"Request failed: {e}")
return None
This implementation demonstrates how services register themselves with health checks and how clients can discover available services dynamically. I've included round-robin load balancing to distribute requests across multiple service instances.
Circuit Breaker Pattern
Circuit breakers prevent cascading failures by monitoring service health and blocking requests to failing dependencies. This technique protects both upstream and downstream services during outages or high error rates.
import time
import threading
from enum import Enum
from collections import deque
import requests
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60, expected_exception=Exception):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
self.lock = threading.Lock()
# Track recent calls for monitoring
self.call_history = deque(maxlen=100)
def _record_success(self):
with self.lock:
self.failure_count = 0
self.state = CircuitState.CLOSED
self.call_history.append(('success', time.time()))
def _record_failure(self):
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
self.call_history.append(('failure', time.time()))
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def _can_attempt_call(self):
if self.state == CircuitState.CLOSED:
return True
elif self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
return True
return False
else: # HALF_OPEN
return True
def __call__(self, func):
def wrapper(*args, **kwargs):
if not self._can_attempt_call():
raise Exception(f"Circuit breaker is {self.state.value}")
try:
result = func(*args, **kwargs)
self._record_success()
return result
except self.expected_exception as e:
self._record_failure()
raise e
return wrapper
def get_stats(self):
with self.lock:
recent_calls = list(self.call_history)
success_count = sum(1 for call in recent_calls if call[0] == 'success')
failure_count = len(recent_calls) - success_count
return {
'state': self.state.value,
'failure_count': self.failure_count,
'recent_success_rate': success_count / len(recent_calls) if recent_calls else 0,
'recent_calls': len(recent_calls)
}
# Usage example with HTTP requests
class ServiceClient:
def __init__(self, base_url):
self.base_url = base_url
self.circuit_breaker = CircuitBreaker(
failure_threshold=3,
recovery_timeout=30,
expected_exception=requests.exceptions.RequestException
)
@property
def get_user_data(self):
@self.circuit_breaker
def _get_user_data(user_id):
response = requests.get(f"{self.base_url}/users/{user_id}", timeout=5)
if response.status_code != 200:
raise requests.exceptions.RequestException(f"HTTP {response.status_code}")
return response.json()
return _get_user_data
def fetch_user_with_fallback(self, user_id):
try:
return self.get_user_data(user_id)
except Exception as e:
# Return cached or default data when circuit is open
return {
'id': user_id,
'name': 'Unknown User',
'error': 'Service temporarily unavailable'
}
The circuit breaker monitors failure rates and automatically opens when thresholds are exceeded. During the open state, requests fail fast without overwhelming the struggling service. After a timeout period, it transitions to half-open to test if the service has recovered.
Message Queue Integration
Message queues enable asynchronous communication between components, providing loose coupling and reliable message delivery. I prefer using Redis for simple pub/sub scenarios and RabbitMQ for more complex routing requirements.
import redis
import json
import threading
import time
import logging
from typing import Callable, Dict, Any
import pika
from concurrent.futures import ThreadPoolExecutor
class RedisMessageBroker:
def __init__(self, host='localhost', port=6379, db=0):
self.redis_client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
self.pubsub = self.redis_client.pubsub()
self.subscribers = {}
self.executor = ThreadPoolExecutor(max_workers=10)
self.logger = logging.getLogger(__name__)
def publish(self, channel: str, message: Dict[Any, Any]):
try:
serialized_message = json.dumps(message)
self.redis_client.publish(channel, serialized_message)
self.logger.info(f"Published message to {channel}")
except Exception as e:
self.logger.error(f"Failed to publish message: {e}")
def subscribe(self, channel: str, callback: Callable):
self.subscribers[channel] = callback
self.pubsub.subscribe(channel)
def start_listening(self):
def listen():
for message in self.pubsub.listen():
if message['type'] == 'message':
channel = message['channel']
if channel in self.subscribers:
try:
data = json.loads(message['data'])
# Process message in thread pool to avoid blocking
self.executor.submit(self.subscribers[channel], data)
except Exception as e:
self.logger.error(f"Error processing message: {e}")
thread = threading.Thread(target=listen, daemon=True)
thread.start()
return thread
class RabbitMQMessageBroker:
def __init__(self, host='localhost', port=5672, username='guest', password='guest'):
self.connection_params = pika.ConnectionParameters(
host=host, port=port,
credentials=pika.PlainCredentials(username, password)
)
self.connection = None
self.channel = None
self.logger = logging.getLogger(__name__)
def connect(self):
try:
self.connection = pika.BlockingConnection(self.connection_params)
self.channel = self.connection.channel()
self.logger.info("Connected to RabbitMQ")
except Exception as e:
self.logger.error(f"Failed to connect to RabbitMQ: {e}")
def declare_queue(self, queue_name: str, durable=True):
if self.channel:
self.channel.queue_declare(queue=queue_name, durable=durable)
def publish_to_queue(self, queue_name: str, message: Dict[Any, Any], persistent=True):
if not self.channel:
self.connect()
try:
properties = pika.BasicProperties(
delivery_mode=2 if persistent else 1 # Make message persistent
)
self.channel.basic_publish(
exchange='',
routing_key=queue_name,
body=json.dumps(message),
properties=properties
)
self.logger.info(f"Published message to queue {queue_name}")
except Exception as e:
self.logger.error(f"Failed to publish message: {e}")
def consume_from_queue(self, queue_name: str, callback: Callable, auto_ack=False):
def wrapper(ch, method, properties, body):
try:
message = json.loads(body.decode('utf-8'))
callback(message)
if not auto_ack:
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
self.logger.error(f"Error processing message: {e}")
if not auto_ack:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
if not self.channel:
self.connect()
self.channel.basic_qos(prefetch_count=1) # Fair dispatch
self.channel.basic_consume(
queue=queue_name,
on_message_callback=wrapper,
auto_ack=auto_ack
)
def start_consuming(self):
if self.channel:
self.logger.info("Started consuming messages")
self.channel.start_consuming()
# Example usage in a distributed order processing system
class OrderProcessor:
def __init__(self):
self.redis_broker = RedisMessageBroker()
self.rabbitmq_broker = RabbitMQMessageBroker()
self.rabbitmq_broker.connect()
self.rabbitmq_broker.declare_queue('order_processing')
def process_order(self, order_data):
# Process the order
order_id = order_data.get('order_id')
logging.info(f"Processing order {order_id}")
# Simulate processing time
time.sleep(2)
# Publish completion event
completion_event = {
'order_id': order_id,
'status': 'completed',
'timestamp': time.time()
}
self.redis_broker.publish('order_completed', completion_event)
def start_processing(self):
self.rabbitmq_broker.consume_from_queue('order_processing', self.process_order)
self.rabbitmq_broker.start_consuming()
This implementation shows both Redis pub/sub for event broadcasting and RabbitMQ for reliable work queues. The message brokers handle serialization, delivery guarantees, and error recovery automatically.
Distributed Configuration Management
Distributed configuration management centralizes application settings and enables dynamic updates without service restarts. I've found etcd particularly reliable for this purpose due to its strong consistency guarantees.
import etcd3
import json
import threading
import time
from typing import Dict, Any, Callable
import logging
class DistributedConfig:
def __init__(self, etcd_host='localhost', etcd_port=2379, config_prefix='/app/config/'):
self.etcd_client = etcd3.client(host=etcd_host, port=etcd_port)
self.config_prefix = config_prefix
self.local_cache = {}
self.watchers = {}
self.callbacks = {}
self.logger = logging.getLogger(__name__)
self.lock = threading.Lock()
def set_config(self, key: str, value: Any):
try:
full_key = f"{self.config_prefix}{key}"
serialized_value = json.dumps(value)
self.etcd_client.put(full_key, serialized_value)
with self.lock:
self.local_cache[key] = value
self.logger.info(f"Set config {key} = {value}")
except Exception as e:
self.logger.error(f"Failed to set config {key}: {e}")
def get_config(self, key: str, default=None):
# Try local cache first
with self.lock:
if key in self.local_cache:
return self.local_cache[key]
# Fetch from etcd
try:
full_key = f"{self.config_prefix}{key}"
value, metadata = self.etcd_client.get(full_key)
if value is not None:
deserialized_value = json.loads(value.decode('utf-8'))
with self.lock:
self.local_cache[key] = deserialized_value
return deserialized_value
else:
return default
except Exception as e:
self.logger.error(f"Failed to get config {key}: {e}")
return default
def watch_config(self, key: str, callback: Callable[[str, Any], None]):
full_key = f"{self.config_prefix}{key}"
def watch_callback(event):
try:
if event.type == etcd3.events.PutEvent:
new_value = json.loads(event.value.decode('utf-8'))
old_value = self.local_cache.get(key)
with self.lock:
self.local_cache[key] = new_value
if new_value != old_value:
callback(key, new_value)
self.logger.info(f"Config {key} changed to {new_value}")
except Exception as e:
self.logger.error(f"Error in config watcher for {key}: {e}")
watch_id = self.etcd_client.add_watch_callback(full_key, watch_callback)
self.watchers[key] = watch_id
self.callbacks[key] = callback
def get_all_configs(self, prefix=''):
try:
search_prefix = f"{self.config_prefix}{prefix}"
configs = {}
for value, metadata in self.etcd_client.get_prefix(search_prefix):
key = metadata.key.decode('utf-8').replace(self.config_prefix, '')
configs[key] = json.loads(value.decode('utf-8'))
return configs
except Exception as e:
self.logger.error(f"Failed to get configs with prefix {prefix}: {e}")
return {}
def remove_config(self, key: str):
try:
full_key = f"{self.config_prefix}{key}"
self.etcd_client.delete(full_key)
with self.lock:
self.local_cache.pop(key, None)
self.logger.info(f"Removed config {key}")
except Exception as e:
self.logger.error(f"Failed to remove config {key}: {e}")
class ConfigurableService:
def __init__(self, service_name):
self.service_name = service_name
self.config = DistributedConfig()
self.settings = {}
self.logger = logging.getLogger(__name__)
# Load initial configuration
self._load_configuration()
# Watch for configuration changes
self._setup_config_watchers()
def _load_configuration(self):
# Load service-specific configuration
service_configs = self.config.get_all_configs(f"{self.service_name}/")
self.settings.update(service_configs)
# Load global configuration
global_configs = self.config.get_all_configs("global/")
self.settings.update(global_configs)
self.logger.info(f"Loaded configuration: {self.settings}")
def _setup_config_watchers(self):
def config_changed(key, new_value):
self.settings[key.replace(f"{self.service_name}/", "").replace("global/", "")] = new_value
self.logger.info(f"Configuration updated: {key} = {new_value}")
self._on_config_change(key, new_value)
# Watch service-specific configs
self.config.watch_config(f"{self.service_name}/database_url", config_changed)
self.config.watch_config(f"{self.service_name}/max_connections", config_changed)
# Watch global configs
self.config.watch_config("global/log_level", config_changed)
self.config.watch_config("global/feature_flags", config_changed)
def _on_config_change(self, key, new_value):
# Handle specific configuration changes
if 'log_level' in key:
self._update_log_level(new_value)
elif 'database_url' in key:
self._reconnect_database(new_value)
def _update_log_level(self, log_level):
logging.getLogger().setLevel(getattr(logging, log_level.upper()))
def _reconnect_database(self, database_url):
# Implement database reconnection logic
self.logger.info(f"Reconnecting to database: {database_url}")
def get_setting(self, key, default=None):
return self.settings.get(key, default)
This configuration management system provides automatic reloading of settings when they change in etcd. Services can react to configuration changes without requiring restarts, enabling zero-downtime updates of critical settings.
Leader Election
Leader election ensures only one instance performs critical operations in distributed scenarios. This technique prevents conflicts when multiple service instances attempt to execute the same scheduled tasks or manage shared resources.
import kazoo
from kazoo.client import KazooClient
from kazoo.protocol.states import EventType
import threading
import time
import logging
from typing import Callable, Optional
class LeaderElector:
def __init__(self, zk_hosts='localhost:2181', election_path='/election', node_name=None):
self.zk_hosts = zk_hosts
self.election_path = election_path
self.node_name = node_name or f"node-{int(time.time())}"
self.zk_client = None
self.is_leader = False
self.leader_callbacks = []
self.follower_callbacks = []
self.logger = logging.getLogger(__name__)
self.election_lock = threading.Lock()
def connect(self):
try:
self.zk_client = KazooClient(hosts=self.zk_hosts)
self.zk_client.start()
# Ensure election path exists
self.zk_client.ensure_path(self.election_path)
self.logger.info("Connected to ZooKeeper")
except Exception as e:
self.logger.error(f"Failed to connect to ZooKeeper: {e}")
raise
def start_election(self):
if not self.zk_client:
self.connect()
def election_worker():
try:
# Create sequential ephemeral node
node_path = f"{self.election_path}/{self.node_name}-"
actual_path = self.zk_client.create(
node_path,
value=self.node_name.encode('utf-8'),
ephemeral=True,
sequence=True
)
self.logger.info(f"Created election node: {actual_path}")
while True:
try:
# Get all children and sort them
children = sorted(self.zk_client.get_children(self.election_path))
if not children:
continue
# Extract sequence number from our node
our_node = actual_path.split('/')[-1]
our_index = children.index(our_node)
if our_index == 0:
# We are the leader
self._become_leader()
else:
# Watch the node before us
predecessor = children[our_index - 1]
predecessor_path = f"{self.election_path}/{predecessor}"
self._become_follower()
# Set up watch on predecessor
event = threading.Event()
@self.zk_client.DataWatch(predecessor_path)
def watch_predecessor(data, stat):
if stat is None: # Node was deleted
event.set()
return False # Remove watch
return True
# Wait for predecessor to disappear
event.wait()
except kazoo.exceptions.NoNodeError:
# Node was deleted, restart election
time.sleep(1)
continue
except Exception as e:
self.logger.error(f"Election error: {e}")
time.sleep(5)
except Exception as e:
self.logger.error(f"Election worker failed: {e}")
thread = threading.Thread(target=election_worker, daemon=True)
thread.start()
return thread
def _become_leader(self):
with self.election_lock:
if not self.is_leader:
self.is_leader = True
self.logger.info(f"{self.node_name} became leader")
for callback in self.leader_callbacks:
try:
callback()
except Exception as e:
self.logger.error(f"Leader callback failed: {e}")
def _become_follower(self):
with self.election_lock:
if self.is_leader:
self.is_leader = False
self.logger.info(f"{self.node_name} became follower")
for callback in self.follower_callbacks:
try:
callback()
except Exception as e:
self.logger.error(f"Follower callback failed: {e}")
def add_leader_callback(self, callback: Callable[[], None]):
self.leader_callbacks.append(callback)
def add_follower_callback(self, callback: Callable[[], None]):
self.follower_callbacks.append(callback)
def is_current_leader(self) -> bool:
return self.is_leader
class DistributedScheduler:
def __init__(self, node_name):
self.node_name = node_name
self.elector = LeaderElector(node_name=node_name)
self.scheduled_tasks = {}
self.task_threads = {}
self.running = False
self.logger = logging.getLogger(__name__)
# Set up leader election callbacks
self.elector.add_leader_callback(self._on_become_leader)
self.elector.add_follower_callback(self._on_become_follower)
def _on_become_leader(self):
self.logger.info("Starting scheduled tasks as leader")
for task_name, (interval, func) in self.scheduled_tasks.items():
self._start_task(task_name, interval, func)
def _on_become_follower(self):
self.logger.info("Stopping scheduled tasks as follower")
self._stop_all_tasks()
def _start_task(self, task_name: str, interval: int, func: Callable):
def task_runner():
while self.running and self.elector.is_current_leader():
try:
func()
time.sleep(interval)
except Exception as e:
self.logger.error(f"Task {task_name} failed: {e}")
time.sleep(interval)
thread = threading.Thread(target=task_runner, daemon=True)
self.task_threads[task_name] = thread
thread.start()
def _stop_all_tasks(self):
for task_name in list(self.task_threads.keys()):
self.task_threads.pop(task_name, None)
def schedule_task(self, task_name: str, interval: int, func: Callable):
self.scheduled_tasks[task_name] = (interval, func)
# If we're already the leader, start the task immediately
if self.elector.is_current_leader():
self._start_task(task_name, interval, func)
def start(self):
self.running = True
self.elector.start_election()
def stop(self):
self.running = False
self._stop_all_tasks()
# Example usage
def cleanup_old_logs():
logging.info("Cleaning up old log files...")
# Implement log cleanup logic
def send_daily_reports():
logging.info("Sending daily reports...")
# Implement report generation and sending
scheduler = DistributedScheduler("scheduler-node-1")
scheduler.schedule_task("log_cleanup", 3600, cleanup_old_logs) # Every hour
scheduler.schedule_task("daily_reports", 86400, send_daily_reports) # Every day
scheduler.start()
This leader election implementation ensures that only one instance runs scheduled tasks, preventing duplicate work across multiple service instances. The system automatically handles leader failures by promoting followers.
Distributed Tracing
Distributed tracing tracks requests across multiple services using OpenTelemetry. This technique provides visibility into complex request flows and helps identify performance bottlenecks in distributed systems.
python
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.propagate import inject, extract
import requests
import time
import logging
from flask import Flask, request, jsonify
import functools
class DistributedTracer:
def __init__(self, service_name, jaeger_endpoint='http://localhost:14268/api/traces'):
self.service_name = service_name
# Configure tracer
trace.set_tracer_provider(TracerProvider())
tracer_provider = trace.get_tracer_provider()
# Configure Jaeger exporter
jaeger_exporter = JaegerExporter(
endpoint=jaeger_endpoint,
service_name=service_name,
)
span_processor = BatchSpanProcessor(jaeger_exporter)
tracer_provider.add_span_processor(span_processor)
self.tracer = trace.get_tracer(__name__)
# Auto-instrument HTTP requests
RequestsInstrumentor().instrument()
def trace_function(self, operation_name=None):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
span_name = operation_name or f"{func.__name__}"
with self.tracer.start_as_current_span(span_name) as span:
# Add function metadata
span.set_attribute("function.name", func.__name__)
span.set_attribute("service.name", self.service_name)
# Add arguments as attributes (be careful with sensitive data)
if args:
span.set_attribute("function.args_count", len(args))
if kwargs:
span.set_attribute("function.kwargs_count", len(kwargs))
try:
start_time = time.time()
result = func(*args, **kwargs)
# Record success
span.set_attribute("function.duration_ms",
(time.time() - start_time) * 1000)
span.
---
## 101 Books
**101 Books** is an AI-driven publishing company co-founded by author **Aarav Joshi**. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as **$4**—making quality knowledge accessible to everyone.
Check out our book **[Golang Clean Code](https://www.amazon.com/dp/B0DQQF9K3Z)** available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for **Aarav Joshi** to find more of our titles. Use the provided link to enjoy **special discounts**!
## Our Creations
Be sure to check out our creations:
**[Investor Central](https://www.investorcentral.co.uk/)** | **[Investor Central Spanish](https://spanish.investorcentral.co.uk/)** | **[Investor Central German](https://german.investorcentral.co.uk/)** | **[Smart Living](https://smartliving.investorcentral.co.uk/)** | **[Epochs & Echoes](https://epochsandechoes.com/)** | **[Puzzling Mysteries](https://www.puzzlingmysteries.com/)** | **[Hindutva](http://hindutva.epochsandechoes.com/)** | **[Elite Dev](https://elitedev.in/)** | **[JS Schools](https://jsschools.com/)**
---
### We are on Medium
**[Tech Koala Insights](https://techkoalainsights.com/)** | **[Epochs & Echoes World](https://world.epochsandechoes.com/)** | **[Investor Central Medium](https://medium.investorcentral.co.uk/)** | **[Puzzling Mysteries Medium](https://medium.com/puzzling-mysteries)** | **[Science & Epochs Medium](https://science.epochsandechoes.com/)** | **[Modern Hindutva](https://modernhindutva.substack.com/)**
Top comments (0)