DEV Community

Cover image for 8 Python Concurrency Patterns That Bypass GIL Limitations for CPU-Intensive Tasks
Nithin Bharadwaj
Nithin Bharadwaj

Posted on

8 Python Concurrency Patterns That Bypass GIL Limitations for CPU-Intensive Tasks

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

Processing CPU-intensive tasks efficiently remains one of Python's most challenging aspects. The Global Interpreter Lock creates bottlenecks that prevent true multithreading for computational workloads. However, several concurrency patterns can overcome these limitations and harness the full power of modern multi-core processors.

I've spent years optimizing Python applications for heavy computational work, and these eight patterns consistently deliver significant performance improvements. Each pattern addresses specific scenarios where CPU-bound operations can benefit from parallel execution.

Process Pool Pattern for Maximum Parallelism

The process pool pattern distributes work across multiple Python processes, completely bypassing GIL restrictions. This approach works exceptionally well for embarrassingly parallel problems where tasks operate independently.

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import time
import math

def prime_factorization(n):
    """CPU-intensive prime factorization"""
    factors = []
    d = 2
    while d * d <= n:
        while n % d == 0:
            factors.append(d)
            n //= d
        d += 1
    if n > 1:
        factors.append(n)
    return factors

def batch_factorize_sequential(numbers):
    """Sequential processing baseline"""
    start_time = time.time()
    results = []
    for num in numbers:
        results.append(prime_factorization(num))
    return results, time.time() - start_time

def batch_factorize_parallel(numbers, workers=None):
    """Parallel processing with automatic worker scaling"""
    if workers is None:
        workers = mp.cpu_count()

    start_time = time.time()
    with ProcessPoolExecutor(max_workers=workers) as executor:
        results = list(executor.map(prime_factorization, numbers))
    return results, time.time() - start_time

# Performance comparison
if __name__ == "__main__":
    test_numbers = [982451653, 982451654, 982451655, 982451656, 
                   982451657, 982451658, 982451659, 982451660]

    seq_results, seq_time = batch_factorize_sequential(test_numbers)
    par_results, par_time = batch_factorize_parallel(test_numbers)

    print(f"Sequential: {seq_time:.3f}s")
    print(f"Parallel: {par_time:.3f}s")
    print(f"Speedup: {seq_time/par_time:.2f}x")
Enter fullscreen mode Exit fullscreen mode

Process pools automatically manage worker lifecycles and distribute tasks evenly. The pattern scales naturally with available CPU cores while handling process creation overhead transparently.

Shared Memory Arrays for Large Datasets

When processing large arrays or matrices, copying data between processes creates significant overhead. Shared memory arrays eliminate this bottleneck by allowing multiple processes to access the same memory region directly.

import multiprocessing as mp
import numpy as np
from multiprocessing import shared_memory
import time

def create_shared_array(shape, dtype=np.float64):
    """Create a numpy array backed by shared memory"""
    size = np.prod(shape) * np.dtype(dtype).itemsize
    shm = shared_memory.SharedMemory(create=True, size=size)
    array = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
    return shm, array

def worker_process_chunk(shm_name, shape, dtype, start_row, end_row):
    """Worker function that operates on shared memory"""
    # Attach to existing shared memory
    shm = shared_memory.SharedMemory(name=shm_name)
    array = np.ndarray(shape, dtype=dtype, buffer=shm.buf)

    # Process assigned rows with expensive computation
    for i in range(start_row, end_row):
        # Simulate complex mathematical operation
        array[i] = np.sin(array[i]) * np.cos(array[i]) + np.sqrt(np.abs(array[i]))

    shm.close()

def parallel_matrix_processing(matrix_size=(1000, 1000), num_processes=None):
    """Process large matrix using shared memory"""
    if num_processes is None:
        num_processes = mp.cpu_count()

    # Create shared memory array
    shm, shared_array = create_shared_array(matrix_size)

    # Initialize with random data
    shared_array[:] = np.random.randn(*matrix_size)

    # Calculate work distribution
    rows_per_process = matrix_size[0] // num_processes
    processes = []

    start_time = time.time()

    # Launch worker processes
    for i in range(num_processes):
        start_row = i * rows_per_process
        end_row = start_row + rows_per_process
        if i == num_processes - 1:  # Last process handles remainder
            end_row = matrix_size[0]

        p = mp.Process(
            target=worker_process_chunk,
            args=(shm.name, matrix_size, np.float64, start_row, end_row)
        )
        processes.append(p)
        p.start()

    # Wait for completion
    for p in processes:
        p.join()

    processing_time = time.time() - start_time

    # Cleanup
    result_copy = shared_array.copy()
    shm.close()
    shm.unlink()

    return result_copy, processing_time

# Example usage
if __name__ == "__main__":
    result, duration = parallel_matrix_processing((2000, 2000))
    print(f"Processed {result.shape} matrix in {duration:.3f}s")
    print(f"Result sample: {result[0, :5]}")
Enter fullscreen mode Exit fullscreen mode

Shared memory arrays provide near-zero overhead data access across processes. This pattern excels when the computation-to-data ratio is high and copying costs would otherwise dominate execution time.

Producer-Consumer Queue Pattern

Continuous processing workloads benefit from producer-consumer patterns using multiprocessing queues. This approach maintains steady throughput while handling variable input rates and processing times.

import multiprocessing as mp
import queue
import time
import random
import json

def data_producer(input_queue, num_items=100):
    """Generate work items continuously"""
    for i in range(num_items):
        # Simulate variable data generation rate
        time.sleep(random.uniform(0.01, 0.05))

        work_item = {
            'id': i,
            'data': [random.randint(1, 1000) for _ in range(100)],
            'timestamp': time.time()
        }
        input_queue.put(work_item)

        if i % 20 == 0:
            print(f"Produced {i} items")

    # Signal completion
    input_queue.put(None)

def cpu_worker(input_queue, output_queue, worker_id):
    """Process work items from queue"""
    processed_count = 0

    while True:
        try:
            work_item = input_queue.get(timeout=1.0)
            if work_item is None:  # Shutdown signal
                input_queue.put(None)  # Propagate to other workers
                break

            # Simulate CPU-intensive processing
            start_time = time.time()
            data = work_item['data']

            # Complex computation
            result = {
                'id': work_item['id'],
                'sum': sum(data),
                'mean': sum(data) / len(data),
                'sorted_sample': sorted(data[:10]),
                'processing_time': 0,
                'worker_id': worker_id
            }

            # Simulate variable processing time
            time.sleep(random.uniform(0.02, 0.08))
            result['processing_time'] = time.time() - start_time

            output_queue.put(result)
            processed_count += 1

        except queue.Empty:
            continue

    print(f"Worker {worker_id} processed {processed_count} items")

def result_collector(output_queue, expected_items):
    """Collect and aggregate results"""
    results = []
    collected = 0

    while collected < expected_items:
        try:
            result = output_queue.get(timeout=2.0)
            results.append(result)
            collected += 1

            if collected % 25 == 0:
                print(f"Collected {collected} results")

        except queue.Empty:
            print("Timeout waiting for results")
            break

    return results

def run_producer_consumer_pipeline(num_workers=4, num_items=100):
    """Orchestrate the entire pipeline"""
    # Create queues
    input_queue = mp.Queue(maxsize=20)  # Limited size for backpressure
    output_queue = mp.Queue()

    # Start processes
    producer = mp.Process(target=data_producer, args=(input_queue, num_items))

    workers = []
    for i in range(num_workers):
        worker = mp.Process(target=cpu_worker, args=(input_queue, output_queue, i))
        workers.append(worker)

    collector = mp.Process(target=result_collector, args=(output_queue, num_items))

    # Launch all processes
    start_time = time.time()

    producer.start()
    collector.start()
    for worker in workers:
        worker.start()

    # Wait for completion
    producer.join()
    for worker in workers:
        worker.join()
    collector.join()

    total_time = time.time() - start_time
    print(f"Pipeline completed in {total_time:.3f}s")

    return total_time

# Execute pipeline
if __name__ == "__main__":
    execution_time = run_producer_consumer_pipeline(num_workers=4, num_items=100)
    print(f"Throughput: {100/execution_time:.2f} items/second")
Enter fullscreen mode Exit fullscreen mode

Queue-based patterns handle backpressure naturally and adapt to varying processing speeds. The bounded queue size prevents memory exhaustion when producers outpace consumers.

Hybrid Asyncio with Process Delegation

Combining asyncio with process pools enables applications to handle I/O operations asynchronously while delegating CPU-intensive work to separate processes. This pattern maximizes utilization of both I/O and CPU resources.

import asyncio
import aiohttp
import aiofiles
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import json
import time
import hashlib

def cpu_intensive_analysis(data_chunk):
    """CPU-bound data analysis function"""
    # Simulate complex analysis: cryptographic hashing and statistics
    hasher = hashlib.sha256()

    analysis_result = {
        'chunk_size': len(data_chunk),
        'hash': '',
        'word_count': 0,
        'char_frequency': {},
        'processing_time': time.time()
    }

    # Hash computation
    hasher.update(data_chunk.encode('utf-8'))
    analysis_result['hash'] = hasher.hexdigest()

    # Word analysis
    words = data_chunk.lower().split()
    analysis_result['word_count'] = len(words)

    # Character frequency analysis
    for char in data_chunk.lower():
        if char.isalpha():
            analysis_result['char_frequency'][char] = \
                analysis_result['char_frequency'].get(char, 0) + 1

    analysis_result['processing_time'] = time.time() - analysis_result['processing_time']
    return analysis_result

class HybridProcessor:
    def __init__(self, max_workers=None):
        self.process_executor = ProcessPoolExecutor(max_workers=max_workers)
        self.session = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
        self.process_executor.shutdown(wait=True)

    async def fetch_data(self, url):
        """Asynchronous data fetching"""
        try:
            async with self.session.get(url) as response:
                if response.status == 200:
                    return await response.text()
                else:
                    return f"Error {response.status}"
        except Exception as e:
            return f"Fetch error: {str(e)}"

    async def process_url(self, url, chunk_size=1000):
        """Fetch data and process in CPU-intensive manner"""
        print(f"Starting processing for {url}")

        # Asynchronous I/O operation
        data = await self.fetch_data(url)

        if data.startswith("Error") or data.startswith("Fetch error"):
            return {'url': url, 'error': data}

        # Split data into chunks for parallel processing
        chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]

        # Delegate CPU-intensive work to process pool
        loop = asyncio.get_event_loop()
        tasks = []

        for i, chunk in enumerate(chunks):
            task = loop.run_in_executor(
                self.process_executor,
                cpu_intensive_analysis,
                chunk
            )
            tasks.append(task)

        # Wait for all CPU-intensive operations to complete
        chunk_results = await asyncio.gather(*tasks)

        # Aggregate results
        total_result = {
            'url': url,
            'total_chunks': len(chunks),
            'total_words': sum(r['word_count'] for r in chunk_results),
            'combined_hash': '',
            'total_processing_time': sum(r['processing_time'] for r in chunk_results),
            'char_frequency_summary': {}
        }

        # Combine character frequencies
        for result in chunk_results:
            for char, count in result['char_frequency'].items():
                total_result['char_frequency_summary'][char] = \
                    total_result['char_frequency_summary'].get(char, 0) + count

        # Combined hash of all chunk hashes
        combined_hashes = ''.join(r['hash'] for r in chunk_results)
        total_result['combined_hash'] = hashlib.sha256(combined_hashes.encode()).hexdigest()

        print(f"Completed processing for {url}")
        return total_result

async def process_multiple_sources(urls):
    """Process multiple data sources concurrently"""
    async with HybridProcessor(max_workers=mp.cpu_count()) as processor:
        start_time = time.time()

        # Process all URLs concurrently
        tasks = [processor.process_url(url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        total_time = time.time() - start_time

        # Filter successful results
        successful_results = [r for r in results if isinstance(r, dict) and 'error' not in r]

        return successful_results, total_time

# Example usage with mock data
async def main():
    # Using httpbin for testing (replace with real URLs)
    test_urls = [
        'https://httpbin.org/base64/SGVsbG8gV29ybGQh',  # Returns "Hello World!"
        'https://jsonplaceholder.typicode.com/posts/1',
        'https://jsonplaceholder.typicode.com/posts/2',
    ]

    results, duration = await process_multiple_sources(test_urls)

    print(f"\nProcessed {len(results)} sources in {duration:.3f}s")
    for result in results:
        if 'error' not in result:
            print(f"URL: {result['url']}")
            print(f"  Words: {result['total_words']}")
            print(f"  Processing time: {result['total_processing_time']:.3f}s")
            print(f"  Top chars: {sorted(result['char_frequency_summary'].items(), key=lambda x: x[1], reverse=True)[:5]}")

if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

This hybrid approach prevents I/O operations from blocking CPU-intensive computations and vice versa. The pattern scales well for applications that need to process data from multiple sources simultaneously.

Memory Mapping for Large File Processing

Memory mapping enables efficient processing of large files without loading entire datasets into memory. Multiple processes can access mapped regions concurrently with minimal overhead.

import mmap
import multiprocessing as mp
import os
import time
import re
from pathlib import Path

def create_test_file(filename, size_mb=100):
    """Create a large test file for demonstration"""
    with open(filename, 'w') as f:
        # Generate repetitive content with patterns
        base_content = "The quick brown fox jumps over the lazy dog. " * 100 + "\n"
        lines_needed = (size_mb * 1024 * 1024) // len(base_content)

        for i in range(lines_needed):
            f.write(f"Line {i}: {base_content}")

    return os.path.getsize(filename)

def process_file_chunk(filename, start_pos, end_pos, pattern):
    """Process a specific chunk of the file using memory mapping"""
    matches = []

    with open(filename, 'rb') as f:
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mmapped_file:
            # Extract the chunk
            chunk = mmapped_file[start_pos:end_pos]
            text = chunk.decode('utf-8', errors='ignore')

            # Find pattern matches
            for match in re.finditer(pattern, text, re.IGNORECASE):
                matches.append({
                    'position': start_pos + match.start(),
                    'text': match.group(),
                    'line_context': text[max(0, match.start()-50):match.end()+50]
                })

    return {
        'start_pos': start_pos,
        'end_pos': end_pos,
        'matches': matches,
        'chunk_size': end_pos - start_pos
    }

def parallel_file_search(filename, pattern, num_processes=None):
    """Search for pattern in large file using parallel memory mapping"""
    if num_processes is None:
        num_processes = mp.cpu_count()

    file_size = os.path.getsize(filename)
    chunk_size = file_size // num_processes

    # Calculate chunk boundaries
    chunks = []
    for i in range(num_processes):
        start_pos = i * chunk_size
        end_pos = start_pos + chunk_size

        if i == num_processes - 1:  # Last chunk gets remainder
            end_pos = file_size

        chunks.append((start_pos, end_pos))

    print(f"Processing {file_size:,} bytes in {len(chunks)} chunks")

    # Process chunks in parallel
    start_time = time.time()
    with mp.Pool(processes=num_processes) as pool:
        tasks = []
        for start_pos, end_pos in chunks:
            task = pool.apply_async(
                process_file_chunk,
                (filename, start_pos, end_pos, pattern)
            )
            tasks.append(task)

        # Collect results
        chunk_results = [task.get() for task in tasks]

    processing_time = time.time() - start_time

    # Aggregate results
    all_matches = []
    total_bytes_processed = 0

    for result in chunk_results:
        all_matches.extend(result['matches'])
        total_bytes_processed += result['chunk_size']

    return {
        'matches': all_matches,
        'processing_time': processing_time,
        'bytes_processed': total_bytes_processed,
        'throughput_mb_per_sec': (total_bytes_processed / (1024 * 1024)) / processing_time
    }

def demonstrate_mmap_performance():
    """Compare memory-mapped processing with traditional file reading"""
    filename = "large_test_file.txt"
    pattern = r'\b[A-Z][a-z]+ \d+:'  # Pattern like "Line 123:"

    # Create test file if it doesn't exist
    if not Path(filename).exists():
        print("Creating test file...")
        file_size = create_test_file(filename, size_mb=50)
        print(f"Created {file_size:,} byte test file")

    # Parallel memory-mapped search
    print("\nRunning parallel memory-mapped search...")
    mmap_result = parallel_file_search(filename, pattern)

    print(f"Found {len(mmap_result['matches'])} matches")
    print(f"Processing time: {mmap_result['processing_time']:.3f}s")
    print(f"Throughput: {mmap_result['throughput_mb_per_sec']:.2f} MB/s")

    # Show sample matches
    if mmap_result['matches']:
        print("\nSample matches:")
        for match in mmap_result['matches'][:5]:
            print(f"  Position {match['position']:,}: {match['text']}")

    # Cleanup
    try:
        os.remove(filename)
        print(f"\nCleaned up {filename}")
    except OSError:
        pass

if __name__ == "__main__":
    demonstrate_mmap_performance()
Enter fullscreen mode Exit fullscreen mode

Memory mapping scales efficiently with file size since it doesn't require loading data into Python objects. This pattern excels for log analysis, data mining, and any scenario involving large text or binary files.

Batch Processing for Small Task Optimization

When individual tasks are small, the overhead of process creation can outweigh parallelization benefits. Batching groups multiple small tasks into larger work units to improve efficiency.

import multiprocessing as mp
import time
import math
import statistics
from typing import List, Callable, Any

def simple_computation(x):
    """Small CPU task - too small for individual parallelization"""
    return math.sqrt(x) * math.sin(x) + math.cos(x ** 2)

def batch_computation(batch_data):
    """Process a batch of small computations"""
    results = []
    start_time = time.time()

    for item in batch_data:
        result = simple_computation(item)
        results.append(result)

    processing_time = time.time() - start_time
    return {
        'results': results,
        'batch_size': len(batch_data),
        'processing_time': processing_time
    }

class BatchProcessor:
    def __init__(self, batch_size=1000, num_processes=None):
        self.batch_size = batch_size
        self.num_processes = num_processes or mp.cpu_count()

    def create_batches(self, data_list):
        """Split data into optimally sized batches"""
        batches = []
        for i in range(0, len(data_list), self.batch_size):
            batch = data_list[i:i + self.batch_size]
            batches.append(batch)
        return batches

    def process_sequential(self, data_list):
        """Sequential processing baseline"""
        start_time = time.time()
        results = [simple_computation(x) for x in data_list]
        return results, time.time() - start_time

    def process_parallel_naive(self, data_list):
        """Naive parallelization - one process per item"""
        start_time = time.time()
        with mp.Pool(processes=self.num_processes) as pool:
            results = pool.map(simple_computation, data_list)
        return results, time.time() - start_time

    def process_parallel_batched(self, data_list):
        """Optimized batched parallelization"""
        batches = self.create_batches(data_list)

        start_time = time.time()
        with mp.Pool(processes=self.num_processes) as pool:
            batch_results = pool.map(batch_computation, batches)

        # Flatten results
        all_results = []
        total_processing_time = 0

        for batch_result in batch_results:
            all_results.extend(batch_result['results'])
            total_processing_time += batch_result['processing_time']

        total_time = time.time() - start_time

        return all_results, total_time, {
            'batches_processed': len(batch_results),
            'avg_batch_size': statistics.mean(len(b['results']) for b in batch_results),
            'total_cpu_time': total_processing_time
        }

def adaptive_batch_sizing(data_size, target_batch_time=0.1):
    """Determine optimal batch size based on target processing time"""
    # Sample processing to estimate task duration
    sample_data = list(range(100))
    start_time = time.time()
    for x in sample_data:
        simple_computation(x)
    sample_time = time.time() - start_time

    # Calculate optimal batch size
    time_per_item = sample_time / len(sample_data)
    optimal_batch_size = max(1, int(target_batch_time / time_per_item))

    print(f"Estimated time per item: {time_per_item*1000:.3f}ms")
    print(f"Optimal batch size for {target_batch_time}s batches: {optimal_batch_size}")

    return optimal_batch_size

def benchmark_batch_strategies():
    """Compare different batching strategies"""
    # Generate test data
    test_data = list(range(1, 10001))  # 10,000 small tasks

    print(f"Benchmarking with {len(test_data)} tasks")
    print("-" * 50)

    # Determine optimal batch size
    optimal_batch_size = adaptive_batch_sizing(len(test_data))

    # Test different batch sizes
    batch_sizes = [100, 500, optimal_batch_size, 2000, 5000]
    results = {}

    for batch_size in batch_sizes:
        processor = BatchProcessor(batch_size=batch_size)

        # Run batched processing
        _, batch_time, batch_stats = processor.process_parallel_batched(test_data)

        results[batch_size] = {
            'time': batch_time,
            'stats': batch_stats,
            'throughput': len(test_data) / batch_time
        }

        print(f"Batch size {batch_size:,}: {batch_time:.3f}s "
              f"({results[batch_size]['throughput']:.0f} items/s)")

    # Find optimal performance
    best_batch_size = min(results.keys(), key=lambda k: results[k]['time'])
    print(f"\nBest performance: batch size {best_batch_size:,}")

    # Compare with sequential and naive parallel
    processor = BatchProcessor(batch_size=best_batch_size)

    print("\nComparison with other approaches:")

    # Sequential
    _, seq_time = processor.process_sequential(test_data)
    print(f"Sequential: {seq_time:.3f}s")

    # Naive parallel (if dataset is small enough)
    if len(test_data) <= 1000:  # Avoid overwhelming system
        _, naive_time = processor.process_parallel_naive(test_data[:1000])
        print(f"Naive parallel (1000 items): {naive_time:.3f}s")

    # Best batched
    best_time = results[best_batch_size]['time']
    print(f"Optimized batched: {best_time:.3f}s")
    print(f"Speedup vs sequential: {seq_time/best_time:.2f}x")

if __name__ == "__main__":
    benchmark_batch_strategies()
Enter fullscreen mode Exit fullscreen mode

Batch processing reduces the overhead-to-work ratio and enables efficient parallelization of small tasks. The optimal batch size depends on task complexity and system characteristics.

Inter-Process Communication Patterns

Complex workflows often require coordination between parallel processes. Pipes, shared state, and synchronization primitives enable sophisticated inter-process communication patterns.


python
import multiprocessing as mp
import time
import random
import threading
from queue import Queue
import json

class SharedCounter:
    """Thread-safe counter for inter-process coordination"""
    def __init__(self, initial_value=0):
        self.value = mp.Value('i', initial_value)

    def increment(self):
        with self.value.get_lock():
            self.value.value += 1
            return self.value.value

    def get_value(self):
        with self.value.get_lock():
            return self.value.value

class WorkflowCoordinator:
    """Coordinates complex multi-stage processing workflow"""

    def __init__(self, num_workers=4):
        self.num_workers = num_workers

        # Inter-process communication primitives
        self.raw_queue = mp.Queue()
        self.processed_queue = mp.Queue()
        self.result_queue = mp.Queue()

        # Shared state
        self.items_processed = SharedCounter()
        self.items_analyzed = SharedCounter()
        self.workflow_complete = mp.Event()

        # Coordination pipes
        self.coordinator_pipes = []
        self.worker_pipes = []

        for _ in range(num_workers):
            coordinator_end, worker_end = mp.Pipe()
            self.coordinator_pipes.append(coordinator_end)
            self.worker_pipes.append(worker_end)

    def data_generator(self, num_items=50):
        """Generate raw data items"""
        for i in range(num_items):
            item = {
                'id': i,
                'data': [random.randint(1, 100) for _ in range(50)],
                'timestamp': time.time(),
                'stage': 'raw'
            }
            self.raw_queue.put(item)
            time.sleep(0.1)  # Simulate variable data arrival

        # Signal completion
        for _ in range(self.num_workers):
            self.raw_queue.put(None)

        print("Data generation completed")

    def processing_worker(self, worker_id, pipe_connection):
        """First stage: data processing worker"""
        while True:
            item = self.raw_queue.get()
            if item is None:
                self.raw_queue.put(None)  # Propagate shutdown signal
                break

            # Simulate CPU-intensive processing
            processed_data = {
                'id': item['id'],
                'sum': sum(item['data']),
                'mean': sum(item['data']) / len(item['data']),
                'max': max(item['data']),
                'min': min(item['data']),
                'processed_by': worker_id,
                'stage': 'processed',
                'processing_time': time.time()
            }

            # Simulate processing time
            time.sleep(random.uniform(0.05, 0.15))
            processed_data['processing_
---
📘 **Checkout my [latest ebook](https://youtu.be/WpR6F4ky4uM) for free on my channel!**  
Be sure to **like**, **share**, **comment**, and **subscribe** to the channel!

---
## 101 Books

**101 Books** is an AI-driven publishing company co-founded by author **Aarav Joshi**. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as **$4**—making quality knowledge accessible to everyone.

Check out our book **[Golang Clean Code](https://www.amazon.com/dp/B0DQQF9K3Z)** available on Amazon. 

Stay tuned for updates and exciting news. When shopping for books, search for **Aarav Joshi** to find more of our titles. Use the provided link to enjoy **special discounts**!

## Our Creations

Be sure to check out our creations:

**[Investor Central](https://www.investorcentral.co.uk/)** | **[Investor Central Spanish](https://spanish.investorcentral.co.uk/)** | **[Investor Central German](https://german.investorcentral.co.uk/)** | **[Smart Living](https://smartliving.investorcentral.co.uk/)** | **[Epochs & Echoes](https://epochsandechoes.com/)** | **[Puzzling Mysteries](https://www.puzzlingmysteries.com/)** | **[Hindutva](http://hindutva.epochsandechoes.com/)** | **[Elite Dev](https://elitedev.in/)** | **[JS Schools](https://jsschools.com/)**

---

### We are on Medium

**[Tech Koala Insights](https://techkoalainsights.com/)** | **[Epochs & Echoes World](https://world.epochsandechoes.com/)** | **[Investor Central Medium](https://medium.investorcentral.co.uk/)** | **[Puzzling Mysteries Medium](https://medium.com/puzzling-mysteries)** | **[Science & Epochs Medium](https://science.epochsandechoes.com/)** | **[Modern Hindutva](https://modernhindutva.substack.com/)**

Enter fullscreen mode Exit fullscreen mode

Top comments (0)