DEV Community

Cover image for Real-Time Data Processing Techniques for High-Performance Analytics Systems That Scale
Nithin Bharadwaj
Nithin Bharadwaj

Posted on

Real-Time Data Processing Techniques for High-Performance Analytics Systems That Scale

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

I've spent considerable time building real-time analytics systems, and the challenge of processing continuous data streams while maintaining low latency never gets easier. The key lies in selecting the right combination of techniques that match your specific throughput and accuracy requirements.

Time-based Windowing for Stream Aggregation

Working with streaming data requires efficient windowing mechanisms that aggregate events over time intervals. I've found that using Python's deque provides excellent performance for maintaining sliding windows with constant-time operations.

from collections import deque, defaultdict
import time
import threading
from typing import Dict, Any, List
import json
import statistics

class StreamingWindowProcessor:
    def __init__(self, window_size_seconds: int = 300):
        self.window_size = window_size_seconds
        self.data_points = deque()
        self.aggregates = {}
        self.lock = threading.RLock()
        self.processors = []

    def add_data_point(self, timestamp: float, value: Dict[str, Any]):
        with self.lock:
            self.data_points.append({
                'timestamp': timestamp,
                'value': value
            })
            self._expire_old_data(timestamp)
            self._update_aggregates()

    def _expire_old_data(self, current_timestamp: float):
        cutoff = current_timestamp - self.window_size
        while self.data_points and self.data_points[0]['timestamp'] < cutoff:
            self.data_points.popleft()

    def _update_aggregates(self):
        if not self.data_points:
            self.aggregates = {}
            return

        numeric_values = []
        categorical_counts = defaultdict(int)
        error_count = 0

        for point in self.data_points:
            data = point['value']

            if 'metric_value' in data and isinstance(data['metric_value'], (int, float)):
                numeric_values.append(data['metric_value'])

            if 'category' in data:
                categorical_counts[data['category']] += 1

            if data.get('is_error', False):
                error_count += 1

        self.aggregates = {
            'count': len(self.data_points),
            'error_rate': error_count / len(self.data_points) if self.data_points else 0,
            'category_distribution': dict(categorical_counts)
        }

        if numeric_values:
            self.aggregates.update({
                'mean': statistics.mean(numeric_values),
                'median': statistics.median(numeric_values),
                'std_dev': statistics.stdev(numeric_values) if len(numeric_values) > 1 else 0,
                'min': min(numeric_values),
                'max': max(numeric_values)
            })

    def get_current_stats(self) -> Dict[str, Any]:
        with self.lock:
            return self.aggregates.copy()

# Real-time data simulation
processor = StreamingWindowProcessor(window_size_seconds=60)

import random
import time

def simulate_real_time_data():
    start_time = time.time()

    for i in range(200):
        current_time = time.time()

        # Simulate various data patterns
        base_value = 100 + 20 * math.sin(i * 0.1)  # Sine wave pattern
        noise = random.gauss(0, 5)  # Add noise

        data_point = {
            'metric_value': base_value + noise,
            'category': random.choice(['web', 'api', 'database', 'cache']),
            'is_error': random.random() < 0.05,  # 5% error rate
            'user_id': f'user_{random.randint(1, 100)}'
        }

        processor.add_data_point(current_time, data_point)

        if i % 20 == 0:
            stats = processor.get_current_stats()
            print(f"Window stats at {i} events:")
            print(json.dumps(stats, indent=2, default=str))
            print("-" * 50)

        time.sleep(0.05)  # 50ms between events

simulate_real_time_data()
Enter fullscreen mode Exit fullscreen mode

Probabilistic Data Structures for Massive Scale

When dealing with millions of events per second, exact counting becomes impractical. I rely on probabilistic algorithms that provide approximate answers with predictable accuracy bounds and constant memory usage.

import hashlib
import math
import random
from typing import Set, List

class HyperLogLogCounter:
    def __init__(self, precision: int = 10):
        self.precision = precision
        self.num_buckets = 2 ** precision
        self.buckets = [0] * self.num_buckets
        self.alpha = self._calculate_alpha()

    def _calculate_alpha(self) -> float:
        if self.num_buckets >= 128:
            return 0.7213 / (1 + 1.079 / self.num_buckets)
        elif self.num_buckets >= 64:
            return 0.709
        elif self.num_buckets >= 32:
            return 0.697
        else:
            return 0.5

    def add(self, item: str):
        # Hash the item
        hash_value = int(hashlib.md5(item.encode()).hexdigest(), 16)

        # Extract bucket index from first 'precision' bits
        bucket_index = hash_value & (self.num_buckets - 1)

        # Count leading zeros in remaining bits
        remaining_bits = hash_value >> self.precision
        leading_zeros = self._leading_zeros(remaining_bits) + 1

        # Update bucket with maximum leading zeros seen
        self.buckets[bucket_index] = max(self.buckets[bucket_index], leading_zeros)

    def _leading_zeros(self, value: int) -> int:
        if value == 0:
            return 32
        return (value ^ (value - 1)).bit_length() - 1

    def estimate_cardinality(self) -> int:
        # Calculate raw estimate
        raw_estimate = self.alpha * (self.num_buckets ** 2) / sum(2 ** (-bucket) for bucket in self.buckets)

        # Apply small range correction
        if raw_estimate <= 2.5 * self.num_buckets:
            zero_buckets = self.buckets.count(0)
            if zero_buckets != 0:
                return int(self.num_buckets * math.log(self.num_buckets / zero_buckets))

        return int(raw_estimate)

class CountMinSketch:
    def __init__(self, width: int = 1000, depth: int = 5):
        self.width = width
        self.depth = depth
        self.table = [[0] * width for _ in range(depth)]
        self.hash_functions = self._generate_hash_functions()

    def _generate_hash_functions(self) -> List:
        return [
            lambda x, i=i: hash(f"{x}_{i}") % self.width 
            for i in range(self.depth)
        ]

    def add(self, item: str, count: int = 1):
        for i, hash_func in enumerate(self.hash_functions):
            index = hash_func(item)
            self.table[i][index] += count

    def estimate_count(self, item: str) -> int:
        estimates = []
        for i, hash_func in enumerate(self.hash_functions):
            index = hash_func(item)
            estimates.append(self.table[i][index])
        return min(estimates)  # Return minimum to reduce overestimation

    def get_heavy_hitters(self, threshold: int) -> List[tuple]:
        # This is a simplified version - production systems would need more sophisticated approaches
        candidates = {}

        # Sample items that might be heavy hitters
        for row in range(self.depth):
            for col in range(self.width):
                if self.table[row][col] >= threshold:
                    # In practice, you'd need to maintain a separate structure to map back to original items
                    candidates[f"item_{row}_{col}"] = self.table[row][col]

        return list(candidates.items())

# Usage in real-time analytics
cardinality_tracker = HyperLogLogCounter(precision=12)
frequency_tracker = CountMinSketch(width=10000, depth=7)

# Simulate processing a high-volume stream
def process_event_stream():
    for i in range(100000):
        # Generate realistic user IDs with power-law distribution
        if random.random() < 0.8:
            user_id = f"user_{random.randint(1, 1000)}"  # Common users
        else:
            user_id = f"user_{random.randint(1, 10000)}"  # Rare users

        event_type = random.choice(['click', 'view', 'purchase', 'logout'])

        # Track unique users
        cardinality_tracker.add(user_id)

        # Track event frequency
        frequency_tracker.add(f"{user_id}:{event_type}")

        if i % 10000 == 0:
            unique_users = cardinality_tracker.estimate_cardinality()
            print(f"Processed {i} events, estimated unique users: {unique_users}")

process_event_stream()
Enter fullscreen mode Exit fullscreen mode

Event-Driven Architecture with Message Queues

Real-time analytics systems benefit from decoupled architectures where data producers and consumers operate independently. I've implemented many systems using Redis Streams for low-latency message passing.

import redis
import json
import threading
import time
from typing import Dict, Any, Callable
import uuid

class RealTimeEventProcessor:
    def __init__(self, redis_host: str = 'localhost', redis_port: int = 6379):
        self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
        self.processors = {}
        self.running = False
        self.consumer_threads = []

    def create_stream(self, stream_name: str):
        """Create a new Redis stream for events"""
        try:
            self.redis_client.xgroup_create(stream_name, 'analytics_group', id='0', mkstream=True)
        except redis.RedisError:
            pass  # Group might already exist

    def publish_event(self, stream_name: str, event_data: Dict[str, Any]):
        """Publish an event to a Redis stream"""
        event_id = self.redis_client.xadd(stream_name, event_data)
        return event_id

    def register_processor(self, stream_name: str, processor_func: Callable):
        """Register a processing function for a specific stream"""
        self.processors[stream_name] = processor_func

    def start_consuming(self):
        """Start consuming events from all registered streams"""
        self.running = True

        for stream_name in self.processors.keys():
            thread = threading.Thread(
                target=self._consume_stream,
                args=(stream_name,),
                daemon=True
            )
            thread.start()
            self.consumer_threads.append(thread)

    def _consume_stream(self, stream_name: str):
        consumer_name = f"consumer_{uuid.uuid4().hex[:8]}"
        processor_func = self.processors[stream_name]

        while self.running:
            try:
                messages = self.redis_client.xreadgroup(
                    'analytics_group',
                    consumer_name,
                    {stream_name: '>'},
                    count=10,
                    block=1000
                )

                for stream, msgs in messages:
                    for msg_id, fields in msgs:
                        try:
                            # Process the event
                            processor_func(fields)

                            # Acknowledge the message
                            self.redis_client.xack(stream_name, 'analytics_group', msg_id)

                        except Exception as e:
                            print(f"Error processing message {msg_id}: {e}")

            except redis.RedisError as e:
                print(f"Redis error in consumer: {e}")
                time.sleep(1)

    def stop_consuming(self):
        """Stop consuming events"""
        self.running = False
        for thread in self.consumer_threads:
            thread.join(timeout=5)

# Analytics processors
class AnalyticsAggregator:
    def __init__(self):
        self.metrics = {}
        self.lock = threading.Lock()

    def process_user_event(self, event_data: Dict[str, Any]):
        with self.lock:
            user_id = event_data.get('user_id')
            event_type = event_data.get('event_type')
            timestamp = float(event_data.get('timestamp', time.time()))

            if user_id not in self.metrics:
                self.metrics[user_id] = {
                    'total_events': 0,
                    'event_types': {},
                    'first_seen': timestamp,
                    'last_seen': timestamp
                }

            user_metrics = self.metrics[user_id]
            user_metrics['total_events'] += 1
            user_metrics['event_types'][event_type] = user_metrics['event_types'].get(event_type, 0) + 1
            user_metrics['last_seen'] = max(user_metrics['last_seen'], timestamp)

    def process_performance_event(self, event_data: Dict[str, Any]):
        # Process performance metrics
        response_time = float(event_data.get('response_time', 0))
        endpoint = event_data.get('endpoint', 'unknown')

        # Update performance statistics
        print(f"Performance event: {endpoint} took {response_time}ms")

    def get_user_summary(self, user_id: str) -> Dict[str, Any]:
        with self.lock:
            return self.metrics.get(user_id, {})

# Example usage
def setup_analytics_pipeline():
    # Initialize the event processor
    processor = RealTimeEventProcessor()
    aggregator = AnalyticsAggregator()

    # Create streams
    processor.create_stream('user_events')
    processor.create_stream('performance_events')

    # Register processors
    processor.register_processor('user_events', aggregator.process_user_event)
    processor.register_processor('performance_events', aggregator.process_performance_event)

    # Start consuming
    processor.start_consuming()

    # Simulate publishing events
    for i in range(50):
        # User events
        processor.publish_event('user_events', {
            'user_id': f'user_{i % 10}',
            'event_type': random.choice(['login', 'click', 'purchase']),
            'timestamp': str(time.time()),
            'session_id': f'session_{uuid.uuid4().hex[:8]}'
        })

        # Performance events
        processor.publish_event('performance_events', {
            'endpoint': f'/api/endpoint_{i % 5}',
            'response_time': str(random.uniform(50, 500)),
            'status_code': str(random.choice([200, 201, 400, 500])),
            'timestamp': str(time.time())
        })

        time.sleep(0.1)

    # Let it process for a while
    time.sleep(5)

    # Check some results
    for user_id in [f'user_{i}' for i in range(5)]:
        summary = aggregator.get_user_summary(user_id)
        if summary:
            print(f"User {user_id}: {json.dumps(summary, indent=2, default=str)}")

    processor.stop_consuming()

setup_analytics_pipeline()
Enter fullscreen mode Exit fullscreen mode

Incremental Aggregation for Continuous Updates

Rather than recalculating statistics from scratch, incremental aggregation maintains running totals that update efficiently as new data arrives. This approach significantly reduces computational overhead in high-throughput scenarios.

import math
import time
from typing import Dict, Any, Optional
import threading

class IncrementalAggregator:
    def __init__(self):
        self.count = 0
        self.sum_value = 0.0
        self.sum_squares = 0.0
        self.min_value = float('inf')
        self.max_value = float('-inf')
        self.lock = threading.Lock()

        # For exponential moving averages
        self.ema_alpha = 0.1
        self.ema_value = None

        # For percentile estimation (P-squared algorithm)
        self.percentile_markers = []
        self.percentile_positions = []
        self.initialized_percentiles = False

    def add_value(self, value: float, weight: float = 1.0):
        with self.lock:
            self.count += weight
            weighted_value = value * weight
            self.sum_value += weighted_value
            self.sum_squares += value * weighted_value

            self.min_value = min(self.min_value, value)
            self.max_value = max(self.max_value, value)

            # Update exponential moving average
            if self.ema_value is None:
                self.ema_value = value
            else:
                self.ema_value = self.ema_alpha * value + (1 - self.ema_alpha) * self.ema_value

            # Update percentile estimation
            self._update_percentiles(value)

    def _update_percentiles(self, value: float):
        """Update percentile estimates using P-squared algorithm approximation"""
        if not self.initialized_percentiles:
            self.percentile_markers.append(value)
            if len(self.percentile_markers) >= 5:
                self.percentile_markers.sort()
                self.percentile_positions = [1, 2, 3, 4, 5]
                self.initialized_percentiles = True
            return

        # Find position for new value
        position = 1
        for i, marker in enumerate(self.percentile_markers):
            if value <= marker:
                break
            position += 1

        # Update positions
        for i in range(position - 1, len(self.percentile_positions)):
            self.percentile_positions[i] += 1

        # Adjust markers (simplified version)
        if position <= len(self.percentile_markers):
            self.percentile_markers[position - 1] = value

    def get_statistics(self) -> Dict[str, Any]:
        with self.lock:
            if self.count == 0:
                return {'count': 0}

            mean = self.sum_value / self.count
            variance = (self.sum_squares / self.count) - (mean ** 2)
            std_dev = math.sqrt(max(0, variance))

            stats = {
                'count': self.count,
                'mean': mean,
                'std_dev': std_dev,
                'min': self.min_value if self.min_value != float('inf') else None,
                'max': self.max_value if self.max_value != float('-inf') else None,
                'sum': self.sum_value,
                'ema': self.ema_value
            }

            if self.initialized_percentiles:
                stats['estimated_percentiles'] = {
                    'p25': self.percentile_markers[1],
                    'p50': self.percentile_markers[2],
                    'p75': self.percentile_markers[3]
                }

            return stats

class MultiLevelAggregator:
    def __init__(self):
        self.aggregators = {
            'minute': IncrementalAggregator(),
            'hour': IncrementalAggregator(),
            'day': IncrementalAggregator()
        }
        self.last_reset = {
            'minute': time.time(),
            'hour': time.time(),
            'day': time.time()
        }
        self.lock = threading.Lock()

    def add_data_point(self, value: float, weight: float = 1.0):
        current_time = time.time()

        with self.lock:
            # Check if we need to reset any aggregators
            self._check_reset_periods(current_time)

            # Add to all active aggregators
            for aggregator in self.aggregators.values():
                aggregator.add_value(value, weight)

    def _check_reset_periods(self, current_time: float):
        # Reset minute aggregator every minute
        if current_time - self.last_reset['minute'] >= 60:
            self.aggregators['minute'] = IncrementalAggregator()
            self.last_reset['minute'] = current_time

        # Reset hour aggregator every hour
        if current_time - self.last_reset['hour'] >= 3600:
            self.aggregators['hour'] = IncrementalAggregator()
            self.last_reset['hour'] = current_time

        # Reset day aggregator every day
        if current_time - self.last_reset['day'] >= 86400:
            self.aggregators['day'] = IncrementalAggregator()
            self.last_reset['day'] = current_time

    def get_all_statistics(self) -> Dict[str, Dict[str, Any]]:
        current_time = time.time()

        with self.lock:
            self._check_reset_periods(current_time)

            return {
                period: aggregator.get_statistics()
                for period, aggregator in self.aggregators.items()
            }

# Example usage with simulated real-time data
multi_aggregator = MultiLevelAggregator()

def simulate_metrics_collection():
    import random
    import math

    start_time = time.time()

    for i in range(1000):
        # Simulate different types of metrics
        base_value = 100 + 50 * math.sin(i * 0.02)  # Cyclical pattern
        noise = random.gauss(0, 10)
        spike = 200 if random.random() < 0.01 else 0  # Occasional spikes

        metric_value = base_value + noise + spike

        multi_aggregator.add_data_point(metric_value)

        # Print statistics every 100 data points
        if i % 100 == 0:
            stats = multi_aggregator.get_all_statistics()
            print(f"\nIteration {i} - Multi-level Statistics:")
            for period, period_stats in stats.items():
                print(f"{period.upper()}: Count={period_stats.get('count', 0):.0f}, "
                      f"Mean={period_stats.get('mean', 0):.2f}, "
                      f"StdDev={period_stats.get('std_dev', 0):.2f}")

        time.sleep(0.01)  # 10ms between measurements

simulate_metrics_collection()
Enter fullscreen mode Exit fullscreen mode

Real-Time Anomaly Detection

Identifying unusual patterns in streaming data requires algorithms that can adapt to changing baselines while maintaining low false-positive rates. I've implemented several approaches that work well in production environments.


python
import numpy as np
from collections import deque
import statistics
import time
from typing import Dict, List, Tuple, Optional
import math

class StreamingAnomalyDetector:
    def __init__(self, window_size: int = 100, sensitivity: float = 2.5):
        self.window_size = window_size
        self.sensitivity = sensitivity  # Z-score threshold
        self.values = deque(maxlen=window_size)
        self.timestamps = deque(maxlen=window_size)

        # Adaptive parameters
        self.baseline_mean = 0.0
        self.baseline_std = 1.0
        self.adaptation_rate = 0.01

        # Anomaly tracking
        self.anomaly_count = 0
        self.total_count = 0

    def add_value(self, value: float, timestamp: Optional[float] = None) -> Dict[str, Any]:
        if timestamp is None:
            timestamp = time.time()

        self.values.append(value)
        self.timestamps.append(timestamp)
        self.total_count += 1

        # Calculate current statistics
        if len(self.values) < 10:  # Need minimum data for reliable statistics
            return {
                'value': value,
                'is_anomaly': False,
                'z_score': 0.0,
                'confidence': 0.0,
                'baseline_mean': self.baseline_mean,
                'baseline_std': self.baseline_std
            }

        current_mean = statistics.mean(self.values)
        current_std = statistics.stdev(self.values) if len(self.values) > 1 else 1.0

        # Update adaptive baseline
        self.baseline_mean = (1 - self.adaptation_rate) * self.baseline_mean + self.adaptation_rate * current_mean
        self.baseline_std = (1 - self.adaptation_rate) * self.baseline_std + self.adaptation_rate * current_std

        # Calculate Z-score
        z_score = (value - self.baseline_mean) / max(self.baseline_std, 0.001)
        is_anomaly = abs(z_score) > self.sensitivity

        if is_anomaly:
            self.anomaly_count += 1

        confidence = min(1.0, abs(z_score) / self.sensitivity) if self.sensitivity > 0 else 0.0

        return {
            'value': value,
            'timestamp': timestamp,
            'is_anomaly': is_anomaly,
            'z_score': z_score,
            'confidence': confidence,
            'baseline_mean': self.baseline_mean,
            'baseline_std': self.baseline_std,
            'anomaly_rate': self.anomaly_count / self.total_count
        }

class MultiVariateAnomalyDetector:
    def __init__(self, feature_names: List[str], window_size: int = 50):
        self.feature_names = feature_names
        self.window_size = window_size
        self.data_buffer = {name: deque(maxlen=window_size) for name in feature_names}
        self.correlation_matrix = None
        self.means = {name: 0.0 for name in feature_names}
        self.stds = {name: 1.0 for name in feature_names}

    def add_sample(self, sample: Dict[str, float]) -> Dict[str, Any]:
        # Add sample to buffers
        for feature in self.feature_names:
            if feature in sample:
                self.data_buffer[feature].append(sample[feature])

        # Need sufficient data for multivariate analysis
        min_samples = min(len(buffer) for buffer in self.data_buffer.values())
        if min_samples < 20:
            return {
                'is_anomaly': False,
                'anomaly_score': 0.0,
                'feature_contributions': {}
            }

        # Update statistics
        self._update_statistics()

        # Calculate Mahalanobis distance
        anomaly_score, feature_contributions = self._calculate_anomaly_score(sample)

        # Determine if this is an anomaly
        threshold = 3.0  # Mahalanobis distance threshold
        is_anomaly = anomaly_score > threshold

        return {
            'is_anomaly': is_anomaly,
            'anomaly_score': anomaly_score,
            'threshold': threshold,
            'feature_contributions': feature_contributions,
            'sample': sample
        }

    def _update_statistics(self):
        for feature in self.feature_names:
            if len(self.data_buffer[feature]) > 1:
                self.means[feature] = statistics.mean(self.data_buffer[feature])
                self.stds[feature] = statistics.stdev(self.data_buffer[feature])

    def _calculate_anomaly_score(self, sample: Dict[str, float]) -> Tuple[float, Dict[str, float]]:
        # Simplified anomaly score using normalized distances
        total_score = 0.0
        feature_contributions = {}

        for feature in self.feature_names:
            if feature in sample and self.stds[feature] > 0:
                normalized_distance = abs(sample[feature] - self.means[feature]) / self.stds[feature]
                feature_contributions[feature] = normalized_distance
                total_score += normalized_distance ** 2

        return math.sqrt(total_score), feature_contributions

# Usage example with streaming data
def demonstrate_anomaly_detection():
    # Single-variate detector
    detector = StreamingAnomalyDetector(window_size=50, sensitivity=2.0)

    # Multi-variate detector
    multivariate_detector = MultiVariateAnomalyDetector(
        feature_names=['cpu_usage', 'memory_usage', 'response_time'],
        window_size=30
    )

    anomalies_detected = []
    multivariate_anomalies = []

    # Simulate streaming metrics
    for i in range(200):
        # Generate mostly normal data with occasional anomalies
        base_cpu = 50 + 20 * math.sin(i * 0.1)
        base_memory = 60 + 15 * math.cos(i * 0.05)
        base_response = 100 + 30 * math.sin(i * 0.08)

        # Add noise
        cpu_usage = base_cpu + random.gauss(0, 5)
        memory_usage = base_memory + random.gauss(0, 3)
        response_time = base_response + random.gauss(0, 10)

        # Inject anomalies occasionally
        if random.random() < 0.05:  # 5% chance of anomaly
            if random.random() < 0.5:
                cpu_usage += random.uniform(30, 50)  # CPU spike
            else:
                response_time += random.uniform(200, 400)  # Response time spike

        # Test single-variate detection on CPU
        cpu_result = detector.add_value(cpu_usage)
        if cpu_result['is_anomaly']:
            anomalies_detected.append({
                'iteration': i,
                'value': cpu_usage,
                'z_score': cpu_result['z_score'],
                'confidence': cpu_result['confidence']
            })

        # Test multi-variate detection
        sample = {
            'cpu_usage': cpu_usage,
            'memory_usage': memory_usage,
            'response_time': response_time
        }

        mv_result = multivariate_detector.add_sample(sample)
        if mv_result['is_anomaly']:
            multivariate_anomalies.append({
                'iteration': i,
                'sample': sample,
                'score
---
📘 **Checkout my [latest ebook](https://youtu.be/WpR6F4ky4uM) for free on my channel!**  
Be sure to **like**, **share**, **comment**, and **subscribe** to the channel!

---
## 101 Books

**101 Books** is an AI-driven publishing company co-founded by author **Aarav Joshi**. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as **$4**—making quality knowledge accessible to everyone.

Check out our book **[Golang Clean Code](https://www.amazon.com/dp/B0DQQF9K3Z)** available on Amazon. 

Stay tuned for updates and exciting news. When shopping for books, search for **Aarav Joshi** to find more of our titles. Use the provided link to enjoy **special discounts**!

## Our Creations

Be sure to check out our creations:

**[Investor Central](https://www.investorcentral.co.uk/)** | **[Investor Central Spanish](https://spanish.investorcentral.co.uk/)** | **[Investor Central German](https://german.investorcentral.co.uk/)** | **[Smart Living](https://smartliving.investorcentral.co.uk/)** | **[Epochs & Echoes](https://epochsandechoes.com/)** | **[Puzzling Mysteries](https://www.puzzlingmysteries.com/)** | **[Hindutva](http://hindutva.epochsandechoes.com/)** | **[Elite Dev](https://elitedev.in/)** | **[JS Schools](https://jsschools.com/)**

---

### We are on Medium

**[Tech Koala Insights](https://techkoalainsights.com/)** | **[Epochs & Echoes World](https://world.epochsandechoes.com/)** | **[Investor Central Medium](https://medium.investorcentral.co.uk/)** | **[Puzzling Mysteries Medium](https://medium.com/puzzling-mysteries)** | **[Science & Epochs Medium](https://science.epochsandechoes.com/)** | **[Modern Hindutva](https://modernhindutva.substack.com/)**

Enter fullscreen mode Exit fullscreen mode

Top comments (0)