As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
Processing CPU-intensive tasks efficiently remains one of Python's most challenging aspects. The Global Interpreter Lock creates bottlenecks that prevent true multithreading for computational workloads. However, several concurrency patterns can overcome these limitations and harness the full power of modern multi-core processors.
I've spent years optimizing Python applications for heavy computational work, and these eight patterns consistently deliver significant performance improvements. Each pattern addresses specific scenarios where CPU-bound operations can benefit from parallel execution.
Process Pool Pattern for Maximum Parallelism
The process pool pattern distributes work across multiple Python processes, completely bypassing GIL restrictions. This approach works exceptionally well for embarrassingly parallel problems where tasks operate independently.
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import time
import math
def prime_factorization(n):
"""CPU-intensive prime factorization"""
factors = []
d = 2
while d * d <= n:
while n % d == 0:
factors.append(d)
n //= d
d += 1
if n > 1:
factors.append(n)
return factors
def batch_factorize_sequential(numbers):
"""Sequential processing baseline"""
start_time = time.time()
results = []
for num in numbers:
results.append(prime_factorization(num))
return results, time.time() - start_time
def batch_factorize_parallel(numbers, workers=None):
"""Parallel processing with automatic worker scaling"""
if workers is None:
workers = mp.cpu_count()
start_time = time.time()
with ProcessPoolExecutor(max_workers=workers) as executor:
results = list(executor.map(prime_factorization, numbers))
return results, time.time() - start_time
# Performance comparison
if __name__ == "__main__":
test_numbers = [982451653, 982451654, 982451655, 982451656,
982451657, 982451658, 982451659, 982451660]
seq_results, seq_time = batch_factorize_sequential(test_numbers)
par_results, par_time = batch_factorize_parallel(test_numbers)
print(f"Sequential: {seq_time:.3f}s")
print(f"Parallel: {par_time:.3f}s")
print(f"Speedup: {seq_time/par_time:.2f}x")
Process pools automatically manage worker lifecycles and distribute tasks evenly. The pattern scales naturally with available CPU cores while handling process creation overhead transparently.
Shared Memory Arrays for Large Datasets
When processing large arrays or matrices, copying data between processes creates significant overhead. Shared memory arrays eliminate this bottleneck by allowing multiple processes to access the same memory region directly.
import multiprocessing as mp
import numpy as np
from multiprocessing import shared_memory
import time
def create_shared_array(shape, dtype=np.float64):
"""Create a numpy array backed by shared memory"""
size = np.prod(shape) * np.dtype(dtype).itemsize
shm = shared_memory.SharedMemory(create=True, size=size)
array = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
return shm, array
def worker_process_chunk(shm_name, shape, dtype, start_row, end_row):
"""Worker function that operates on shared memory"""
# Attach to existing shared memory
shm = shared_memory.SharedMemory(name=shm_name)
array = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
# Process assigned rows with expensive computation
for i in range(start_row, end_row):
# Simulate complex mathematical operation
array[i] = np.sin(array[i]) * np.cos(array[i]) + np.sqrt(np.abs(array[i]))
shm.close()
def parallel_matrix_processing(matrix_size=(1000, 1000), num_processes=None):
"""Process large matrix using shared memory"""
if num_processes is None:
num_processes = mp.cpu_count()
# Create shared memory array
shm, shared_array = create_shared_array(matrix_size)
# Initialize with random data
shared_array[:] = np.random.randn(*matrix_size)
# Calculate work distribution
rows_per_process = matrix_size[0] // num_processes
processes = []
start_time = time.time()
# Launch worker processes
for i in range(num_processes):
start_row = i * rows_per_process
end_row = start_row + rows_per_process
if i == num_processes - 1: # Last process handles remainder
end_row = matrix_size[0]
p = mp.Process(
target=worker_process_chunk,
args=(shm.name, matrix_size, np.float64, start_row, end_row)
)
processes.append(p)
p.start()
# Wait for completion
for p in processes:
p.join()
processing_time = time.time() - start_time
# Cleanup
result_copy = shared_array.copy()
shm.close()
shm.unlink()
return result_copy, processing_time
# Example usage
if __name__ == "__main__":
result, duration = parallel_matrix_processing((2000, 2000))
print(f"Processed {result.shape} matrix in {duration:.3f}s")
print(f"Result sample: {result[0, :5]}")
Shared memory arrays provide near-zero overhead data access across processes. This pattern excels when the computation-to-data ratio is high and copying costs would otherwise dominate execution time.
Producer-Consumer Queue Pattern
Continuous processing workloads benefit from producer-consumer patterns using multiprocessing queues. This approach maintains steady throughput while handling variable input rates and processing times.
import multiprocessing as mp
import queue
import time
import random
import json
def data_producer(input_queue, num_items=100):
"""Generate work items continuously"""
for i in range(num_items):
# Simulate variable data generation rate
time.sleep(random.uniform(0.01, 0.05))
work_item = {
'id': i,
'data': [random.randint(1, 1000) for _ in range(100)],
'timestamp': time.time()
}
input_queue.put(work_item)
if i % 20 == 0:
print(f"Produced {i} items")
# Signal completion
input_queue.put(None)
def cpu_worker(input_queue, output_queue, worker_id):
"""Process work items from queue"""
processed_count = 0
while True:
try:
work_item = input_queue.get(timeout=1.0)
if work_item is None: # Shutdown signal
input_queue.put(None) # Propagate to other workers
break
# Simulate CPU-intensive processing
start_time = time.time()
data = work_item['data']
# Complex computation
result = {
'id': work_item['id'],
'sum': sum(data),
'mean': sum(data) / len(data),
'sorted_sample': sorted(data[:10]),
'processing_time': 0,
'worker_id': worker_id
}
# Simulate variable processing time
time.sleep(random.uniform(0.02, 0.08))
result['processing_time'] = time.time() - start_time
output_queue.put(result)
processed_count += 1
except queue.Empty:
continue
print(f"Worker {worker_id} processed {processed_count} items")
def result_collector(output_queue, expected_items):
"""Collect and aggregate results"""
results = []
collected = 0
while collected < expected_items:
try:
result = output_queue.get(timeout=2.0)
results.append(result)
collected += 1
if collected % 25 == 0:
print(f"Collected {collected} results")
except queue.Empty:
print("Timeout waiting for results")
break
return results
def run_producer_consumer_pipeline(num_workers=4, num_items=100):
"""Orchestrate the entire pipeline"""
# Create queues
input_queue = mp.Queue(maxsize=20) # Limited size for backpressure
output_queue = mp.Queue()
# Start processes
producer = mp.Process(target=data_producer, args=(input_queue, num_items))
workers = []
for i in range(num_workers):
worker = mp.Process(target=cpu_worker, args=(input_queue, output_queue, i))
workers.append(worker)
collector = mp.Process(target=result_collector, args=(output_queue, num_items))
# Launch all processes
start_time = time.time()
producer.start()
collector.start()
for worker in workers:
worker.start()
# Wait for completion
producer.join()
for worker in workers:
worker.join()
collector.join()
total_time = time.time() - start_time
print(f"Pipeline completed in {total_time:.3f}s")
return total_time
# Execute pipeline
if __name__ == "__main__":
execution_time = run_producer_consumer_pipeline(num_workers=4, num_items=100)
print(f"Throughput: {100/execution_time:.2f} items/second")
Queue-based patterns handle backpressure naturally and adapt to varying processing speeds. The bounded queue size prevents memory exhaustion when producers outpace consumers.
Hybrid Asyncio with Process Delegation
Combining asyncio with process pools enables applications to handle I/O operations asynchronously while delegating CPU-intensive work to separate processes. This pattern maximizes utilization of both I/O and CPU resources.
import asyncio
import aiohttp
import aiofiles
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import json
import time
import hashlib
def cpu_intensive_analysis(data_chunk):
"""CPU-bound data analysis function"""
# Simulate complex analysis: cryptographic hashing and statistics
hasher = hashlib.sha256()
analysis_result = {
'chunk_size': len(data_chunk),
'hash': '',
'word_count': 0,
'char_frequency': {},
'processing_time': time.time()
}
# Hash computation
hasher.update(data_chunk.encode('utf-8'))
analysis_result['hash'] = hasher.hexdigest()
# Word analysis
words = data_chunk.lower().split()
analysis_result['word_count'] = len(words)
# Character frequency analysis
for char in data_chunk.lower():
if char.isalpha():
analysis_result['char_frequency'][char] = \
analysis_result['char_frequency'].get(char, 0) + 1
analysis_result['processing_time'] = time.time() - analysis_result['processing_time']
return analysis_result
class HybridProcessor:
def __init__(self, max_workers=None):
self.process_executor = ProcessPoolExecutor(max_workers=max_workers)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
self.process_executor.shutdown(wait=True)
async def fetch_data(self, url):
"""Asynchronous data fetching"""
try:
async with self.session.get(url) as response:
if response.status == 200:
return await response.text()
else:
return f"Error {response.status}"
except Exception as e:
return f"Fetch error: {str(e)}"
async def process_url(self, url, chunk_size=1000):
"""Fetch data and process in CPU-intensive manner"""
print(f"Starting processing for {url}")
# Asynchronous I/O operation
data = await self.fetch_data(url)
if data.startswith("Error") or data.startswith("Fetch error"):
return {'url': url, 'error': data}
# Split data into chunks for parallel processing
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
# Delegate CPU-intensive work to process pool
loop = asyncio.get_event_loop()
tasks = []
for i, chunk in enumerate(chunks):
task = loop.run_in_executor(
self.process_executor,
cpu_intensive_analysis,
chunk
)
tasks.append(task)
# Wait for all CPU-intensive operations to complete
chunk_results = await asyncio.gather(*tasks)
# Aggregate results
total_result = {
'url': url,
'total_chunks': len(chunks),
'total_words': sum(r['word_count'] for r in chunk_results),
'combined_hash': '',
'total_processing_time': sum(r['processing_time'] for r in chunk_results),
'char_frequency_summary': {}
}
# Combine character frequencies
for result in chunk_results:
for char, count in result['char_frequency'].items():
total_result['char_frequency_summary'][char] = \
total_result['char_frequency_summary'].get(char, 0) + count
# Combined hash of all chunk hashes
combined_hashes = ''.join(r['hash'] for r in chunk_results)
total_result['combined_hash'] = hashlib.sha256(combined_hashes.encode()).hexdigest()
print(f"Completed processing for {url}")
return total_result
async def process_multiple_sources(urls):
"""Process multiple data sources concurrently"""
async with HybridProcessor(max_workers=mp.cpu_count()) as processor:
start_time = time.time()
# Process all URLs concurrently
tasks = [processor.process_url(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
total_time = time.time() - start_time
# Filter successful results
successful_results = [r for r in results if isinstance(r, dict) and 'error' not in r]
return successful_results, total_time
# Example usage with mock data
async def main():
# Using httpbin for testing (replace with real URLs)
test_urls = [
'https://httpbin.org/base64/SGVsbG8gV29ybGQh', # Returns "Hello World!"
'https://jsonplaceholder.typicode.com/posts/1',
'https://jsonplaceholder.typicode.com/posts/2',
]
results, duration = await process_multiple_sources(test_urls)
print(f"\nProcessed {len(results)} sources in {duration:.3f}s")
for result in results:
if 'error' not in result:
print(f"URL: {result['url']}")
print(f" Words: {result['total_words']}")
print(f" Processing time: {result['total_processing_time']:.3f}s")
print(f" Top chars: {sorted(result['char_frequency_summary'].items(), key=lambda x: x[1], reverse=True)[:5]}")
if __name__ == "__main__":
asyncio.run(main())
This hybrid approach prevents I/O operations from blocking CPU-intensive computations and vice versa. The pattern scales well for applications that need to process data from multiple sources simultaneously.
Memory Mapping for Large File Processing
Memory mapping enables efficient processing of large files without loading entire datasets into memory. Multiple processes can access mapped regions concurrently with minimal overhead.
import mmap
import multiprocessing as mp
import os
import time
import re
from pathlib import Path
def create_test_file(filename, size_mb=100):
"""Create a large test file for demonstration"""
with open(filename, 'w') as f:
# Generate repetitive content with patterns
base_content = "The quick brown fox jumps over the lazy dog. " * 100 + "\n"
lines_needed = (size_mb * 1024 * 1024) // len(base_content)
for i in range(lines_needed):
f.write(f"Line {i}: {base_content}")
return os.path.getsize(filename)
def process_file_chunk(filename, start_pos, end_pos, pattern):
"""Process a specific chunk of the file using memory mapping"""
matches = []
with open(filename, 'rb') as f:
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mmapped_file:
# Extract the chunk
chunk = mmapped_file[start_pos:end_pos]
text = chunk.decode('utf-8', errors='ignore')
# Find pattern matches
for match in re.finditer(pattern, text, re.IGNORECASE):
matches.append({
'position': start_pos + match.start(),
'text': match.group(),
'line_context': text[max(0, match.start()-50):match.end()+50]
})
return {
'start_pos': start_pos,
'end_pos': end_pos,
'matches': matches,
'chunk_size': end_pos - start_pos
}
def parallel_file_search(filename, pattern, num_processes=None):
"""Search for pattern in large file using parallel memory mapping"""
if num_processes is None:
num_processes = mp.cpu_count()
file_size = os.path.getsize(filename)
chunk_size = file_size // num_processes
# Calculate chunk boundaries
chunks = []
for i in range(num_processes):
start_pos = i * chunk_size
end_pos = start_pos + chunk_size
if i == num_processes - 1: # Last chunk gets remainder
end_pos = file_size
chunks.append((start_pos, end_pos))
print(f"Processing {file_size:,} bytes in {len(chunks)} chunks")
# Process chunks in parallel
start_time = time.time()
with mp.Pool(processes=num_processes) as pool:
tasks = []
for start_pos, end_pos in chunks:
task = pool.apply_async(
process_file_chunk,
(filename, start_pos, end_pos, pattern)
)
tasks.append(task)
# Collect results
chunk_results = [task.get() for task in tasks]
processing_time = time.time() - start_time
# Aggregate results
all_matches = []
total_bytes_processed = 0
for result in chunk_results:
all_matches.extend(result['matches'])
total_bytes_processed += result['chunk_size']
return {
'matches': all_matches,
'processing_time': processing_time,
'bytes_processed': total_bytes_processed,
'throughput_mb_per_sec': (total_bytes_processed / (1024 * 1024)) / processing_time
}
def demonstrate_mmap_performance():
"""Compare memory-mapped processing with traditional file reading"""
filename = "large_test_file.txt"
pattern = r'\b[A-Z][a-z]+ \d+:' # Pattern like "Line 123:"
# Create test file if it doesn't exist
if not Path(filename).exists():
print("Creating test file...")
file_size = create_test_file(filename, size_mb=50)
print(f"Created {file_size:,} byte test file")
# Parallel memory-mapped search
print("\nRunning parallel memory-mapped search...")
mmap_result = parallel_file_search(filename, pattern)
print(f"Found {len(mmap_result['matches'])} matches")
print(f"Processing time: {mmap_result['processing_time']:.3f}s")
print(f"Throughput: {mmap_result['throughput_mb_per_sec']:.2f} MB/s")
# Show sample matches
if mmap_result['matches']:
print("\nSample matches:")
for match in mmap_result['matches'][:5]:
print(f" Position {match['position']:,}: {match['text']}")
# Cleanup
try:
os.remove(filename)
print(f"\nCleaned up {filename}")
except OSError:
pass
if __name__ == "__main__":
demonstrate_mmap_performance()
Memory mapping scales efficiently with file size since it doesn't require loading data into Python objects. This pattern excels for log analysis, data mining, and any scenario involving large text or binary files.
Batch Processing for Small Task Optimization
When individual tasks are small, the overhead of process creation can outweigh parallelization benefits. Batching groups multiple small tasks into larger work units to improve efficiency.
import multiprocessing as mp
import time
import math
import statistics
from typing import List, Callable, Any
def simple_computation(x):
"""Small CPU task - too small for individual parallelization"""
return math.sqrt(x) * math.sin(x) + math.cos(x ** 2)
def batch_computation(batch_data):
"""Process a batch of small computations"""
results = []
start_time = time.time()
for item in batch_data:
result = simple_computation(item)
results.append(result)
processing_time = time.time() - start_time
return {
'results': results,
'batch_size': len(batch_data),
'processing_time': processing_time
}
class BatchProcessor:
def __init__(self, batch_size=1000, num_processes=None):
self.batch_size = batch_size
self.num_processes = num_processes or mp.cpu_count()
def create_batches(self, data_list):
"""Split data into optimally sized batches"""
batches = []
for i in range(0, len(data_list), self.batch_size):
batch = data_list[i:i + self.batch_size]
batches.append(batch)
return batches
def process_sequential(self, data_list):
"""Sequential processing baseline"""
start_time = time.time()
results = [simple_computation(x) for x in data_list]
return results, time.time() - start_time
def process_parallel_naive(self, data_list):
"""Naive parallelization - one process per item"""
start_time = time.time()
with mp.Pool(processes=self.num_processes) as pool:
results = pool.map(simple_computation, data_list)
return results, time.time() - start_time
def process_parallel_batched(self, data_list):
"""Optimized batched parallelization"""
batches = self.create_batches(data_list)
start_time = time.time()
with mp.Pool(processes=self.num_processes) as pool:
batch_results = pool.map(batch_computation, batches)
# Flatten results
all_results = []
total_processing_time = 0
for batch_result in batch_results:
all_results.extend(batch_result['results'])
total_processing_time += batch_result['processing_time']
total_time = time.time() - start_time
return all_results, total_time, {
'batches_processed': len(batch_results),
'avg_batch_size': statistics.mean(len(b['results']) for b in batch_results),
'total_cpu_time': total_processing_time
}
def adaptive_batch_sizing(data_size, target_batch_time=0.1):
"""Determine optimal batch size based on target processing time"""
# Sample processing to estimate task duration
sample_data = list(range(100))
start_time = time.time()
for x in sample_data:
simple_computation(x)
sample_time = time.time() - start_time
# Calculate optimal batch size
time_per_item = sample_time / len(sample_data)
optimal_batch_size = max(1, int(target_batch_time / time_per_item))
print(f"Estimated time per item: {time_per_item*1000:.3f}ms")
print(f"Optimal batch size for {target_batch_time}s batches: {optimal_batch_size}")
return optimal_batch_size
def benchmark_batch_strategies():
"""Compare different batching strategies"""
# Generate test data
test_data = list(range(1, 10001)) # 10,000 small tasks
print(f"Benchmarking with {len(test_data)} tasks")
print("-" * 50)
# Determine optimal batch size
optimal_batch_size = adaptive_batch_sizing(len(test_data))
# Test different batch sizes
batch_sizes = [100, 500, optimal_batch_size, 2000, 5000]
results = {}
for batch_size in batch_sizes:
processor = BatchProcessor(batch_size=batch_size)
# Run batched processing
_, batch_time, batch_stats = processor.process_parallel_batched(test_data)
results[batch_size] = {
'time': batch_time,
'stats': batch_stats,
'throughput': len(test_data) / batch_time
}
print(f"Batch size {batch_size:,}: {batch_time:.3f}s "
f"({results[batch_size]['throughput']:.0f} items/s)")
# Find optimal performance
best_batch_size = min(results.keys(), key=lambda k: results[k]['time'])
print(f"\nBest performance: batch size {best_batch_size:,}")
# Compare with sequential and naive parallel
processor = BatchProcessor(batch_size=best_batch_size)
print("\nComparison with other approaches:")
# Sequential
_, seq_time = processor.process_sequential(test_data)
print(f"Sequential: {seq_time:.3f}s")
# Naive parallel (if dataset is small enough)
if len(test_data) <= 1000: # Avoid overwhelming system
_, naive_time = processor.process_parallel_naive(test_data[:1000])
print(f"Naive parallel (1000 items): {naive_time:.3f}s")
# Best batched
best_time = results[best_batch_size]['time']
print(f"Optimized batched: {best_time:.3f}s")
print(f"Speedup vs sequential: {seq_time/best_time:.2f}x")
if __name__ == "__main__":
benchmark_batch_strategies()
Batch processing reduces the overhead-to-work ratio and enables efficient parallelization of small tasks. The optimal batch size depends on task complexity and system characteristics.
Inter-Process Communication Patterns
Complex workflows often require coordination between parallel processes. Pipes, shared state, and synchronization primitives enable sophisticated inter-process communication patterns.
python
import multiprocessing as mp
import time
import random
import threading
from queue import Queue
import json
class SharedCounter:
"""Thread-safe counter for inter-process coordination"""
def __init__(self, initial_value=0):
self.value = mp.Value('i', initial_value)
def increment(self):
with self.value.get_lock():
self.value.value += 1
return self.value.value
def get_value(self):
with self.value.get_lock():
return self.value.value
class WorkflowCoordinator:
"""Coordinates complex multi-stage processing workflow"""
def __init__(self, num_workers=4):
self.num_workers = num_workers
# Inter-process communication primitives
self.raw_queue = mp.Queue()
self.processed_queue = mp.Queue()
self.result_queue = mp.Queue()
# Shared state
self.items_processed = SharedCounter()
self.items_analyzed = SharedCounter()
self.workflow_complete = mp.Event()
# Coordination pipes
self.coordinator_pipes = []
self.worker_pipes = []
for _ in range(num_workers):
coordinator_end, worker_end = mp.Pipe()
self.coordinator_pipes.append(coordinator_end)
self.worker_pipes.append(worker_end)
def data_generator(self, num_items=50):
"""Generate raw data items"""
for i in range(num_items):
item = {
'id': i,
'data': [random.randint(1, 100) for _ in range(50)],
'timestamp': time.time(),
'stage': 'raw'
}
self.raw_queue.put(item)
time.sleep(0.1) # Simulate variable data arrival
# Signal completion
for _ in range(self.num_workers):
self.raw_queue.put(None)
print("Data generation completed")
def processing_worker(self, worker_id, pipe_connection):
"""First stage: data processing worker"""
while True:
item = self.raw_queue.get()
if item is None:
self.raw_queue.put(None) # Propagate shutdown signal
break
# Simulate CPU-intensive processing
processed_data = {
'id': item['id'],
'sum': sum(item['data']),
'mean': sum(item['data']) / len(item['data']),
'max': max(item['data']),
'min': min(item['data']),
'processed_by': worker_id,
'stage': 'processed',
'processing_time': time.time()
}
# Simulate processing time
time.sleep(random.uniform(0.05, 0.15))
processed_data['processing_
---
📘 **Checkout my [latest ebook](https://youtu.be/WpR6F4ky4uM) for free on my channel!**
Be sure to **like**, **share**, **comment**, and **subscribe** to the channel!
---
## 101 Books
**101 Books** is an AI-driven publishing company co-founded by author **Aarav Joshi**. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as **$4**—making quality knowledge accessible to everyone.
Check out our book **[Golang Clean Code](https://www.amazon.com/dp/B0DQQF9K3Z)** available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for **Aarav Joshi** to find more of our titles. Use the provided link to enjoy **special discounts**!
## Our Creations
Be sure to check out our creations:
**[Investor Central](https://www.investorcentral.co.uk/)** | **[Investor Central Spanish](https://spanish.investorcentral.co.uk/)** | **[Investor Central German](https://german.investorcentral.co.uk/)** | **[Smart Living](https://smartliving.investorcentral.co.uk/)** | **[Epochs & Echoes](https://epochsandechoes.com/)** | **[Puzzling Mysteries](https://www.puzzlingmysteries.com/)** | **[Hindutva](http://hindutva.epochsandechoes.com/)** | **[Elite Dev](https://elitedev.in/)** | **[JS Schools](https://jsschools.com/)**
---
### We are on Medium
**[Tech Koala Insights](https://techkoalainsights.com/)** | **[Epochs & Echoes World](https://world.epochsandechoes.com/)** | **[Investor Central Medium](https://medium.investorcentral.co.uk/)** | **[Puzzling Mysteries Medium](https://medium.com/puzzling-mysteries)** | **[Science & Epochs Medium](https://science.epochsandechoes.com/)** | **[Modern Hindutva](https://modernhindutva.substack.com/)**
Top comments (0)