DEV Community

Cover image for 6 Essential Python Techniques for Robust Distributed Systems Development
Nithin Bharadwaj
Nithin Bharadwaj

Posted on

6 Essential Python Techniques for Robust Distributed Systems Development

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

Building distributed systems requires mastering several critical techniques that ensure components work together seamlessly across network boundaries. I've discovered through extensive hands-on experience that six key Python techniques form the foundation of robust distributed architectures.

Service Discovery and Registration

Service discovery solves the fundamental problem of how components find and communicate with each other in dynamic environments. When services start, move, or scale, other components need a reliable way to locate them without hardcoded addresses.

The consul-python library provides an excellent foundation for service registration with built-in health checking. I've found this approach particularly effective because it automatically removes unhealthy services from the registry, preventing requests to failing instances.

import consul
import time
import threading
import requests
from flask import Flask, jsonify
import logging

class ServiceRegistry:
    def __init__(self, service_name, service_port, consul_host='localhost', consul_port=8500):
        self.service_name = service_name
        self.service_port = service_port
        self.consul = consul.Consul(host=consul_host, port=consul_port)
        self.service_id = f"{service_name}-{service_port}-{int(time.time())}"
        self.logger = logging.getLogger(__name__)

    def register(self):
        try:
            self.consul.agent.service.register(
                name=self.service_name,
                service_id=self.service_id,
                address='localhost',
                port=self.service_port,
                tags=['python', 'web-service'],
                check=consul.Check.http(
                    f"http://localhost:{self.service_port}/health", 
                    interval="10s",
                    timeout="5s"
                )
            )
            self.logger.info(f"Service {self.service_id} registered successfully")
        except Exception as e:
            self.logger.error(f"Failed to register service: {e}")

    def deregister(self):
        try:
            self.consul.agent.service.deregister(self.service_id)
            self.logger.info(f"Service {self.service_id} deregistered")
        except Exception as e:
            self.logger.error(f"Failed to deregister service: {e}")

    def discover_service(self, service_name):
        try:
            services = self.consul.health.service(service_name, passing=True)[1]
            endpoints = [(s['Service']['Address'], s['Service']['Port']) for s in services]
            self.logger.info(f"Discovered {len(endpoints)} instances of {service_name}")
            return endpoints
        except Exception as e:
            self.logger.error(f"Service discovery failed: {e}")
            return []

class ServiceClient:
    def __init__(self, registry, target_service):
        self.registry = registry
        self.target_service = target_service
        self.current_endpoints = []
        self.current_index = 0

    def get_endpoint(self):
        # Simple round-robin load balancing
        endpoints = self.registry.discover_service(self.target_service)
        if endpoints:
            self.current_endpoints = endpoints
            endpoint = self.current_endpoints[self.current_index % len(self.current_endpoints)]
            self.current_index += 1
            return f"http://{endpoint[0]}:{endpoint[1]}"
        return None

    def make_request(self, path, **kwargs):
        endpoint = self.get_endpoint()
        if endpoint:
            try:
                response = requests.get(f"{endpoint}{path}", timeout=5, **kwargs)
                return response.json() if response.status_code == 200 else None
            except requests.exceptions.RequestException as e:
                logging.error(f"Request failed: {e}")
        return None
Enter fullscreen mode Exit fullscreen mode

This implementation demonstrates how services register themselves with health checks and how clients can discover available services dynamically. I've included round-robin load balancing to distribute requests across multiple service instances.

Circuit Breaker Pattern

Circuit breakers prevent cascading failures by monitoring service health and blocking requests to failing dependencies. This technique protects both upstream and downstream services during outages or high error rates.

import time
import threading
from enum import Enum
from collections import deque
import requests

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60, expected_exception=Exception):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception

        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
        self.lock = threading.Lock()

        # Track recent calls for monitoring
        self.call_history = deque(maxlen=100)

    def _record_success(self):
        with self.lock:
            self.failure_count = 0
            self.state = CircuitState.CLOSED
            self.call_history.append(('success', time.time()))

    def _record_failure(self):
        with self.lock:
            self.failure_count += 1
            self.last_failure_time = time.time()
            self.call_history.append(('failure', time.time()))

            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN

    def _can_attempt_call(self):
        if self.state == CircuitState.CLOSED:
            return True
        elif self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time >= self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                return True
            return False
        else:  # HALF_OPEN
            return True

    def __call__(self, func):
        def wrapper(*args, **kwargs):
            if not self._can_attempt_call():
                raise Exception(f"Circuit breaker is {self.state.value}")

            try:
                result = func(*args, **kwargs)
                self._record_success()
                return result
            except self.expected_exception as e:
                self._record_failure()
                raise e
        return wrapper

    def get_stats(self):
        with self.lock:
            recent_calls = list(self.call_history)
            success_count = sum(1 for call in recent_calls if call[0] == 'success')
            failure_count = len(recent_calls) - success_count

            return {
                'state': self.state.value,
                'failure_count': self.failure_count,
                'recent_success_rate': success_count / len(recent_calls) if recent_calls else 0,
                'recent_calls': len(recent_calls)
            }

# Usage example with HTTP requests
class ServiceClient:
    def __init__(self, base_url):
        self.base_url = base_url
        self.circuit_breaker = CircuitBreaker(
            failure_threshold=3,
            recovery_timeout=30,
            expected_exception=requests.exceptions.RequestException
        )

    @property
    def get_user_data(self):
        @self.circuit_breaker
        def _get_user_data(user_id):
            response = requests.get(f"{self.base_url}/users/{user_id}", timeout=5)
            if response.status_code != 200:
                raise requests.exceptions.RequestException(f"HTTP {response.status_code}")
            return response.json()
        return _get_user_data

    def fetch_user_with_fallback(self, user_id):
        try:
            return self.get_user_data(user_id)
        except Exception as e:
            # Return cached or default data when circuit is open
            return {
                'id': user_id,
                'name': 'Unknown User',
                'error': 'Service temporarily unavailable'
            }
Enter fullscreen mode Exit fullscreen mode

The circuit breaker monitors failure rates and automatically opens when thresholds are exceeded. During the open state, requests fail fast without overwhelming the struggling service. After a timeout period, it transitions to half-open to test if the service has recovered.

Message Queue Integration

Message queues enable asynchronous communication between components, providing loose coupling and reliable message delivery. I prefer using Redis for simple pub/sub scenarios and RabbitMQ for more complex routing requirements.

import redis
import json
import threading
import time
import logging
from typing import Callable, Dict, Any
import pika
from concurrent.futures import ThreadPoolExecutor

class RedisMessageBroker:
    def __init__(self, host='localhost', port=6379, db=0):
        self.redis_client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
        self.pubsub = self.redis_client.pubsub()
        self.subscribers = {}
        self.executor = ThreadPoolExecutor(max_workers=10)
        self.logger = logging.getLogger(__name__)

    def publish(self, channel: str, message: Dict[Any, Any]):
        try:
            serialized_message = json.dumps(message)
            self.redis_client.publish(channel, serialized_message)
            self.logger.info(f"Published message to {channel}")
        except Exception as e:
            self.logger.error(f"Failed to publish message: {e}")

    def subscribe(self, channel: str, callback: Callable):
        self.subscribers[channel] = callback
        self.pubsub.subscribe(channel)

    def start_listening(self):
        def listen():
            for message in self.pubsub.listen():
                if message['type'] == 'message':
                    channel = message['channel']
                    if channel in self.subscribers:
                        try:
                            data = json.loads(message['data'])
                            # Process message in thread pool to avoid blocking
                            self.executor.submit(self.subscribers[channel], data)
                        except Exception as e:
                            self.logger.error(f"Error processing message: {e}")

        thread = threading.Thread(target=listen, daemon=True)
        thread.start()
        return thread

class RabbitMQMessageBroker:
    def __init__(self, host='localhost', port=5672, username='guest', password='guest'):
        self.connection_params = pika.ConnectionParameters(
            host=host, port=port, 
            credentials=pika.PlainCredentials(username, password)
        )
        self.connection = None
        self.channel = None
        self.logger = logging.getLogger(__name__)

    def connect(self):
        try:
            self.connection = pika.BlockingConnection(self.connection_params)
            self.channel = self.connection.channel()
            self.logger.info("Connected to RabbitMQ")
        except Exception as e:
            self.logger.error(f"Failed to connect to RabbitMQ: {e}")

    def declare_queue(self, queue_name: str, durable=True):
        if self.channel:
            self.channel.queue_declare(queue=queue_name, durable=durable)

    def publish_to_queue(self, queue_name: str, message: Dict[Any, Any], persistent=True):
        if not self.channel:
            self.connect()

        try:
            properties = pika.BasicProperties(
                delivery_mode=2 if persistent else 1  # Make message persistent
            )

            self.channel.basic_publish(
                exchange='',
                routing_key=queue_name,
                body=json.dumps(message),
                properties=properties
            )
            self.logger.info(f"Published message to queue {queue_name}")
        except Exception as e:
            self.logger.error(f"Failed to publish message: {e}")

    def consume_from_queue(self, queue_name: str, callback: Callable, auto_ack=False):
        def wrapper(ch, method, properties, body):
            try:
                message = json.loads(body.decode('utf-8'))
                callback(message)
                if not auto_ack:
                    ch.basic_ack(delivery_tag=method.delivery_tag)
            except Exception as e:
                self.logger.error(f"Error processing message: {e}")
                if not auto_ack:
                    ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

        if not self.channel:
            self.connect()

        self.channel.basic_qos(prefetch_count=1)  # Fair dispatch
        self.channel.basic_consume(
            queue=queue_name,
            on_message_callback=wrapper,
            auto_ack=auto_ack
        )

    def start_consuming(self):
        if self.channel:
            self.logger.info("Started consuming messages")
            self.channel.start_consuming()

# Example usage in a distributed order processing system
class OrderProcessor:
    def __init__(self):
        self.redis_broker = RedisMessageBroker()
        self.rabbitmq_broker = RabbitMQMessageBroker()
        self.rabbitmq_broker.connect()
        self.rabbitmq_broker.declare_queue('order_processing')

    def process_order(self, order_data):
        # Process the order
        order_id = order_data.get('order_id')
        logging.info(f"Processing order {order_id}")

        # Simulate processing time
        time.sleep(2)

        # Publish completion event
        completion_event = {
            'order_id': order_id,
            'status': 'completed',
            'timestamp': time.time()
        }

        self.redis_broker.publish('order_completed', completion_event)

    def start_processing(self):
        self.rabbitmq_broker.consume_from_queue('order_processing', self.process_order)
        self.rabbitmq_broker.start_consuming()
Enter fullscreen mode Exit fullscreen mode

This implementation shows both Redis pub/sub for event broadcasting and RabbitMQ for reliable work queues. The message brokers handle serialization, delivery guarantees, and error recovery automatically.

Distributed Configuration Management

Distributed configuration management centralizes application settings and enables dynamic updates without service restarts. I've found etcd particularly reliable for this purpose due to its strong consistency guarantees.

import etcd3
import json
import threading
import time
from typing import Dict, Any, Callable
import logging

class DistributedConfig:
    def __init__(self, etcd_host='localhost', etcd_port=2379, config_prefix='/app/config/'):
        self.etcd_client = etcd3.client(host=etcd_host, port=etcd_port)
        self.config_prefix = config_prefix
        self.local_cache = {}
        self.watchers = {}
        self.callbacks = {}
        self.logger = logging.getLogger(__name__)
        self.lock = threading.Lock()

    def set_config(self, key: str, value: Any):
        try:
            full_key = f"{self.config_prefix}{key}"
            serialized_value = json.dumps(value)
            self.etcd_client.put(full_key, serialized_value)

            with self.lock:
                self.local_cache[key] = value

            self.logger.info(f"Set config {key} = {value}")
        except Exception as e:
            self.logger.error(f"Failed to set config {key}: {e}")

    def get_config(self, key: str, default=None):
        # Try local cache first
        with self.lock:
            if key in self.local_cache:
                return self.local_cache[key]

        # Fetch from etcd
        try:
            full_key = f"{self.config_prefix}{key}"
            value, metadata = self.etcd_client.get(full_key)

            if value is not None:
                deserialized_value = json.loads(value.decode('utf-8'))
                with self.lock:
                    self.local_cache[key] = deserialized_value
                return deserialized_value
            else:
                return default
        except Exception as e:
            self.logger.error(f"Failed to get config {key}: {e}")
            return default

    def watch_config(self, key: str, callback: Callable[[str, Any], None]):
        full_key = f"{self.config_prefix}{key}"

        def watch_callback(event):
            try:
                if event.type == etcd3.events.PutEvent:
                    new_value = json.loads(event.value.decode('utf-8'))
                    old_value = self.local_cache.get(key)

                    with self.lock:
                        self.local_cache[key] = new_value

                    if new_value != old_value:
                        callback(key, new_value)
                        self.logger.info(f"Config {key} changed to {new_value}")

            except Exception as e:
                self.logger.error(f"Error in config watcher for {key}: {e}")

        watch_id = self.etcd_client.add_watch_callback(full_key, watch_callback)
        self.watchers[key] = watch_id
        self.callbacks[key] = callback

    def get_all_configs(self, prefix=''):
        try:
            search_prefix = f"{self.config_prefix}{prefix}"
            configs = {}

            for value, metadata in self.etcd_client.get_prefix(search_prefix):
                key = metadata.key.decode('utf-8').replace(self.config_prefix, '')
                configs[key] = json.loads(value.decode('utf-8'))

            return configs
        except Exception as e:
            self.logger.error(f"Failed to get configs with prefix {prefix}: {e}")
            return {}

    def remove_config(self, key: str):
        try:
            full_key = f"{self.config_prefix}{key}"
            self.etcd_client.delete(full_key)

            with self.lock:
                self.local_cache.pop(key, None)

            self.logger.info(f"Removed config {key}")
        except Exception as e:
            self.logger.error(f"Failed to remove config {key}: {e}")

class ConfigurableService:
    def __init__(self, service_name):
        self.service_name = service_name
        self.config = DistributedConfig()
        self.settings = {}
        self.logger = logging.getLogger(__name__)

        # Load initial configuration
        self._load_configuration()

        # Watch for configuration changes
        self._setup_config_watchers()

    def _load_configuration(self):
        # Load service-specific configuration
        service_configs = self.config.get_all_configs(f"{self.service_name}/")
        self.settings.update(service_configs)

        # Load global configuration
        global_configs = self.config.get_all_configs("global/")
        self.settings.update(global_configs)

        self.logger.info(f"Loaded configuration: {self.settings}")

    def _setup_config_watchers(self):
        def config_changed(key, new_value):
            self.settings[key.replace(f"{self.service_name}/", "").replace("global/", "")] = new_value
            self.logger.info(f"Configuration updated: {key} = {new_value}")
            self._on_config_change(key, new_value)

        # Watch service-specific configs
        self.config.watch_config(f"{self.service_name}/database_url", config_changed)
        self.config.watch_config(f"{self.service_name}/max_connections", config_changed)

        # Watch global configs
        self.config.watch_config("global/log_level", config_changed)
        self.config.watch_config("global/feature_flags", config_changed)

    def _on_config_change(self, key, new_value):
        # Handle specific configuration changes
        if 'log_level' in key:
            self._update_log_level(new_value)
        elif 'database_url' in key:
            self._reconnect_database(new_value)

    def _update_log_level(self, log_level):
        logging.getLogger().setLevel(getattr(logging, log_level.upper()))

    def _reconnect_database(self, database_url):
        # Implement database reconnection logic
        self.logger.info(f"Reconnecting to database: {database_url}")

    def get_setting(self, key, default=None):
        return self.settings.get(key, default)
Enter fullscreen mode Exit fullscreen mode

This configuration management system provides automatic reloading of settings when they change in etcd. Services can react to configuration changes without requiring restarts, enabling zero-downtime updates of critical settings.

Leader Election

Leader election ensures only one instance performs critical operations in distributed scenarios. This technique prevents conflicts when multiple service instances attempt to execute the same scheduled tasks or manage shared resources.

import kazoo
from kazoo.client import KazooClient
from kazoo.protocol.states import EventType
import threading
import time
import logging
from typing import Callable, Optional

class LeaderElector:
    def __init__(self, zk_hosts='localhost:2181', election_path='/election', node_name=None):
        self.zk_hosts = zk_hosts
        self.election_path = election_path
        self.node_name = node_name or f"node-{int(time.time())}"
        self.zk_client = None
        self.is_leader = False
        self.leader_callbacks = []
        self.follower_callbacks = []
        self.logger = logging.getLogger(__name__)
        self.election_lock = threading.Lock()

    def connect(self):
        try:
            self.zk_client = KazooClient(hosts=self.zk_hosts)
            self.zk_client.start()

            # Ensure election path exists
            self.zk_client.ensure_path(self.election_path)
            self.logger.info("Connected to ZooKeeper")

        except Exception as e:
            self.logger.error(f"Failed to connect to ZooKeeper: {e}")
            raise

    def start_election(self):
        if not self.zk_client:
            self.connect()

        def election_worker():
            try:
                # Create sequential ephemeral node
                node_path = f"{self.election_path}/{self.node_name}-"
                actual_path = self.zk_client.create(
                    node_path, 
                    value=self.node_name.encode('utf-8'),
                    ephemeral=True, 
                    sequence=True
                )

                self.logger.info(f"Created election node: {actual_path}")

                while True:
                    try:
                        # Get all children and sort them
                        children = sorted(self.zk_client.get_children(self.election_path))

                        if not children:
                            continue

                        # Extract sequence number from our node
                        our_node = actual_path.split('/')[-1]
                        our_index = children.index(our_node)

                        if our_index == 0:
                            # We are the leader
                            self._become_leader()
                        else:
                            # Watch the node before us
                            predecessor = children[our_index - 1]
                            predecessor_path = f"{self.election_path}/{predecessor}"

                            self._become_follower()

                            # Set up watch on predecessor
                            event = threading.Event()

                            @self.zk_client.DataWatch(predecessor_path)
                            def watch_predecessor(data, stat):
                                if stat is None:  # Node was deleted
                                    event.set()
                                    return False  # Remove watch
                                return True

                            # Wait for predecessor to disappear
                            event.wait()

                    except kazoo.exceptions.NoNodeError:
                        # Node was deleted, restart election
                        time.sleep(1)
                        continue
                    except Exception as e:
                        self.logger.error(f"Election error: {e}")
                        time.sleep(5)

            except Exception as e:
                self.logger.error(f"Election worker failed: {e}")

        thread = threading.Thread(target=election_worker, daemon=True)
        thread.start()
        return thread

    def _become_leader(self):
        with self.election_lock:
            if not self.is_leader:
                self.is_leader = True
                self.logger.info(f"{self.node_name} became leader")

                for callback in self.leader_callbacks:
                    try:
                        callback()
                    except Exception as e:
                        self.logger.error(f"Leader callback failed: {e}")

    def _become_follower(self):
        with self.election_lock:
            if self.is_leader:
                self.is_leader = False
                self.logger.info(f"{self.node_name} became follower")

                for callback in self.follower_callbacks:
                    try:
                        callback()
                    except Exception as e:
                        self.logger.error(f"Follower callback failed: {e}")

    def add_leader_callback(self, callback: Callable[[], None]):
        self.leader_callbacks.append(callback)

    def add_follower_callback(self, callback: Callable[[], None]):
        self.follower_callbacks.append(callback)

    def is_current_leader(self) -> bool:
        return self.is_leader

class DistributedScheduler:
    def __init__(self, node_name):
        self.node_name = node_name
        self.elector = LeaderElector(node_name=node_name)
        self.scheduled_tasks = {}
        self.task_threads = {}
        self.running = False
        self.logger = logging.getLogger(__name__)

        # Set up leader election callbacks
        self.elector.add_leader_callback(self._on_become_leader)
        self.elector.add_follower_callback(self._on_become_follower)

    def _on_become_leader(self):
        self.logger.info("Starting scheduled tasks as leader")
        for task_name, (interval, func) in self.scheduled_tasks.items():
            self._start_task(task_name, interval, func)

    def _on_become_follower(self):
        self.logger.info("Stopping scheduled tasks as follower")
        self._stop_all_tasks()

    def _start_task(self, task_name: str, interval: int, func: Callable):
        def task_runner():
            while self.running and self.elector.is_current_leader():
                try:
                    func()
                    time.sleep(interval)
                except Exception as e:
                    self.logger.error(f"Task {task_name} failed: {e}")
                    time.sleep(interval)

        thread = threading.Thread(target=task_runner, daemon=True)
        self.task_threads[task_name] = thread
        thread.start()

    def _stop_all_tasks(self):
        for task_name in list(self.task_threads.keys()):
            self.task_threads.pop(task_name, None)

    def schedule_task(self, task_name: str, interval: int, func: Callable):
        self.scheduled_tasks[task_name] = (interval, func)

        # If we're already the leader, start the task immediately
        if self.elector.is_current_leader():
            self._start_task(task_name, interval, func)

    def start(self):
        self.running = True
        self.elector.start_election()

    def stop(self):
        self.running = False
        self._stop_all_tasks()

# Example usage
def cleanup_old_logs():
    logging.info("Cleaning up old log files...")
    # Implement log cleanup logic

def send_daily_reports():
    logging.info("Sending daily reports...")
    # Implement report generation and sending

scheduler = DistributedScheduler("scheduler-node-1")
scheduler.schedule_task("log_cleanup", 3600, cleanup_old_logs)  # Every hour
scheduler.schedule_task("daily_reports", 86400, send_daily_reports)  # Every day
scheduler.start()
Enter fullscreen mode Exit fullscreen mode

This leader election implementation ensures that only one instance runs scheduled tasks, preventing duplicate work across multiple service instances. The system automatically handles leader failures by promoting followers.

Distributed Tracing

Distributed tracing tracks requests across multiple services using OpenTelemetry. This technique provides visibility into complex request flows and helps identify performance bottlenecks in distributed systems.


python
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.propagate import inject, extract
import requests
import time
import logging
from flask import Flask, request, jsonify
import functools

class DistributedTracer:
    def __init__(self, service_name, jaeger_endpoint='http://localhost:14268/api/traces'):
        self.service_name = service_name

        # Configure tracer
        trace.set_tracer_provider(TracerProvider())
        tracer_provider = trace.get_tracer_provider()

        # Configure Jaeger exporter
        jaeger_exporter = JaegerExporter(
            endpoint=jaeger_endpoint,
            service_name=service_name,
        )

        span_processor = BatchSpanProcessor(jaeger_exporter)
        tracer_provider.add_span_processor(span_processor)

        self.tracer = trace.get_tracer(__name__)

        # Auto-instrument HTTP requests
        RequestsInstrumentor().instrument()

    def trace_function(self, operation_name=None):
        def decorator(func):
            @functools.wraps(func)
            def wrapper(*args, **kwargs):
                span_name = operation_name or f"{func.__name__}"

                with self.tracer.start_as_current_span(span_name) as span:
                    # Add function metadata
                    span.set_attribute("function.name", func.__name__)
                    span.set_attribute("service.name", self.service_name)

                    # Add arguments as attributes (be careful with sensitive data)
                    if args:
                        span.set_attribute("function.args_count", len(args))
                    if kwargs:
                        span.set_attribute("function.kwargs_count", len(kwargs))

                    try:
                        start_time = time.time()
                        result = func(*args, **kwargs)

                        # Record success
                        span.set_attribute("function.duration_ms", 
                                         (time.time() - start_time) * 1000)
                        span.

---
## 101 Books

**101 Books** is an AI-driven publishing company co-founded by author **Aarav Joshi**. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as **$4**—making quality knowledge accessible to everyone.

Check out our book **[Golang Clean Code](https://www.amazon.com/dp/B0DQQF9K3Z)** available on Amazon. 

Stay tuned for updates and exciting news. When shopping for books, search for **Aarav Joshi** to find more of our titles. Use the provided link to enjoy **special discounts**!

## Our Creations

Be sure to check out our creations:

**[Investor Central](https://www.investorcentral.co.uk/)** | **[Investor Central Spanish](https://spanish.investorcentral.co.uk/)** | **[Investor Central German](https://german.investorcentral.co.uk/)** | **[Smart Living](https://smartliving.investorcentral.co.uk/)** | **[Epochs & Echoes](https://epochsandechoes.com/)** | **[Puzzling Mysteries](https://www.puzzlingmysteries.com/)** | **[Hindutva](http://hindutva.epochsandechoes.com/)** | **[Elite Dev](https://elitedev.in/)** | **[JS Schools](https://jsschools.com/)**

---

### We are on Medium

**[Tech Koala Insights](https://techkoalainsights.com/)** | **[Epochs & Echoes World](https://world.epochsandechoes.com/)** | **[Investor Central Medium](https://medium.investorcentral.co.uk/)** | **[Puzzling Mysteries Medium](https://medium.com/puzzling-mysteries)** | **[Science & Epochs Medium](https://science.epochsandechoes.com/)** | **[Modern Hindutva](https://modernhindutva.substack.com/)**

Enter fullscreen mode Exit fullscreen mode

Top comments (0)