Timothy found Margaret reviewing performance metrics, looking troubled. "I've built this beautiful async system," he said, "but I need to integrate with an old library that does synchronous file processing. When I call it, my entire event loop freezes. Everything stops until the blocking operation completes."
Margaret nodded knowingly. "You need an Executor. Think of it as hiring temporary workers who handle blocking tasks in separate threads or processes, so your main async workflow keeps flowing smoothly."
The Problem: Blocking Code Blocks Everything
Timothy showed Margaret his broken async system:
import asyncio
import time
def blocking_file_operation(filename):
"""Legacy function that blocks for 3 seconds"""
print(f"Starting blocking operation on {filename}")
time.sleep(3) # Simulates slow I/O
print(f"Finished blocking operation on {filename}")
return f"Processed: {filename}"
async def async_worker(worker_id):
"""Fast async worker"""
print(f"Async worker {worker_id} starting")
await asyncio.sleep(0.5)
print(f"Async worker {worker_id} finished")
async def broken_integration():
print("Starting mixed async/sync workflow")
# Start async workers
workers = [
asyncio.create_task(async_worker(i))
for i in range(3)
]
# Call blocking function - this FREEZES everything!
result = blocking_file_operation("data.csv") # ❌ BLOCKS EVENT LOOP
print(f"Got result: {result}")
await asyncio.gather(*workers)
# asyncio.run(broken_integration())
Output (broken):
Starting mixed async/sync workflow
Starting blocking operation on data.csv
# Everything frozen for 3 seconds...
Finished blocking operation on data.csv
Got result: Processed: data.csv
Async worker 0 starting # Finally runs after blocking completes
Async worker 1 starting
Async worker 2 starting
Async worker 0 finished
Async worker 1 finished
Async worker 2 finished
"See the problem?" Timothy pointed. "The async workers should run immediately, but they're blocked waiting for the synchronous operation. My event loop is completely frozen."
Enter the ThreadPoolExecutor
"For I/O-bound blocking operations like file I/O or legacy libraries," Margaret explained, "use ThreadPoolExecutor with loop.run_in_executor(). This runs the blocking code in a separate thread, keeping your event loop responsive."
She rewrote Timothy's code:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_file_operation(filename):
"""Legacy function that blocks for 3 seconds"""
print(f"Starting blocking operation on {filename}")
time.sleep(3) # Simulates slow I/O
print(f"Finished blocking operation on {filename}")
return f"Processed: {filename}"
async def async_worker(worker_id):
"""Fast async worker"""
print(f"Async worker {worker_id} starting")
await asyncio.sleep(0.5)
print(f"Async worker {worker_id} finished")
async def fixed_integration():
print("Starting mixed async/sync workflow")
# Create thread pool executor
executor = ThreadPoolExecutor(max_workers=2)
# Start async workers
workers = [
asyncio.create_task(async_worker(i))
for i in range(3)
]
# Run blocking function in separate thread
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
executor,
blocking_file_operation,
"data.csv"
)
print(f"Got result: {result}")
await asyncio.gather(*workers)
executor.shutdown(wait=True)
asyncio.run(fixed_integration())
Output (correct):
Starting mixed async/sync workflow
Async worker 0 starting # ✅ Starts immediately
Async worker 1 starting
Async worker 2 starting
Starting blocking operation on data.csv
Async worker 0 finished # ✅ Completes while blocking op runs
Async worker 1 finished
Async worker 2 finished
Finished blocking operation on data.csv
Got result: Processed: data.csv
"Perfect!" Timothy exclaimed. "The async workers run immediately and complete while the blocking operation happens in the background."
Understanding run_in_executor()
Margaret explained the key concepts:
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def demonstrate_executor_pattern():
"""
loop.run_in_executor(executor, func, *args) signature:
- executor: ThreadPoolExecutor or ProcessPoolExecutor (or None for default)
- func: The synchronous function to run
- *args: Arguments to pass to func
Returns: A coroutine that resolves to func's return value
"""
def synchronous_operation(x, y):
"""Regular sync function"""
import time
time.sleep(1)
return x + y
loop = asyncio.get_running_loop()
executor = ThreadPoolExecutor(max_workers=3)
# Run in executor - returns immediately, resolves when complete
result = await loop.run_in_executor(
executor,
synchronous_operation,
10,
20
)
print(f"Result: {result}") # 30
executor.shutdown(wait=True)
asyncio.run(demonstrate_executor_pattern())
"The key insight," Margaret said, "is that run_in_executor() takes a synchronous function and makes it awaitable. You get back a coroutine that you can await, and the event loop continues running while the blocking work happens elsewhere."
"One detail," Margaret added. "If your function needs keyword arguments, use functools.partial since run_in_executor() only accepts positional arguments:"
import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import partial
def process_data(data, multiplier=2, offset=0):
"""Function with keyword arguments"""
import time
time.sleep(1)
return (data * multiplier) + offset
async def use_partial_for_kwargs():
executor = ThreadPoolExecutor(max_workers=2)
loop = asyncio.get_running_loop()
# Use partial to bind keyword arguments
func_with_kwargs = partial(process_data, multiplier=3, offset=10)
result = await loop.run_in_executor(
executor,
func_with_kwargs,
5 # The 'data' positional argument
)
print(f"Result: {result}") # (5 * 3) + 10 = 25
executor.shutdown(wait=True)
asyncio.run(use_partial_for_kwargs())
Multiple Concurrent Blocking Operations
Timothy asked, "What if I need to run multiple blocking operations concurrently?"
Margaret showed him the pattern:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def process_file(filename, delay):
"""Simulates slow file processing"""
print(f"Processing {filename} (will take {delay}s)")
time.sleep(delay)
print(f"Finished {filename}")
return f"Result from {filename}"
async def process_multiple_files():
executor = ThreadPoolExecutor(max_workers=4)
loop = asyncio.get_running_loop()
files = [
("file1.txt", 2),
("file2.txt", 3),
("file3.txt", 1),
("file4.txt", 2)
]
# Create tasks for all blocking operations
tasks = [
loop.run_in_executor(
executor,
process_file,
filename,
delay
)
for filename, delay in files
]
# Wait for all to complete
results = await asyncio.gather(*tasks)
print(f"\nAll results: {results}")
executor.shutdown(wait=True)
asyncio.run(process_multiple_files())
Output:
Processing file1.txt (will take 2s)
Processing file2.txt (will take 3s)
Processing file3.txt (will take 1s)
Processing file4.txt (will take 2s)
Finished file3.txt
Finished file1.txt
Finished file4.txt
Finished file2.txt
All results: ['Result from file1.txt', 'Result from file2.txt',
'Result from file3.txt', 'Result from file4.txt']
"With max_workers=4," Margaret explained, "all four files process simultaneously in separate threads. The gather() waits for all of them, just like with regular async operations."
Using the Default Executor
"You can also pass None as the executor," Margaret mentioned, "to use asyncio's default thread pool:"
import asyncio
import time
def blocking_operation(value):
time.sleep(1)
return value * 2
async def use_default_executor():
loop = asyncio.get_running_loop()
# None means use the default ThreadPoolExecutor
result = await loop.run_in_executor(
None, # Use default executor
blocking_operation,
21
)
print(f"Result: {result}") # 42
asyncio.run(use_default_executor())
"The default executor," she explained, "is convenient for simple cases. It's a ThreadPoolExecutor with a worker count based on your machine: min(32, (os.cpu_count() or 1) + 4). For production code with specific threading needs or when you need more control over the thread pool size, create your own executor with explicit max_workers."
Timothy nodded. "So the default is fine for light usage, but I should create my own for anything serious."
ProcessPoolExecutor for CPU-Bound Work
"What about CPU-intensive operations?" Timothy asked. "Like heavy computations that actually need CPU time, not I/O?"
Margaret switched to a different pattern:
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
def cpu_intensive_task(n):
"""Compute-heavy function that uses actual CPU"""
print(f"Computing factorial of {n} in process {id(n)}")
# Simulate heavy computation
result = 1
for i in range(1, n + 1):
result *= i
# Add artificial delay to show parallelism
time.sleep(2)
print(f"Finished computing {n}!")
return result
async def process_cpu_intensive_tasks():
# Use ProcessPoolExecutor for CPU-bound work
executor = ProcessPoolExecutor(max_workers=4)
loop = asyncio.get_running_loop()
numbers = [5, 6, 7, 8]
print("Starting CPU-intensive tasks across multiple processes")
tasks = [
loop.run_in_executor(executor, cpu_intensive_task, n)
for n in numbers
]
results = await asyncio.gather(*tasks)
print(f"\nResults: {results}")
executor.shutdown(wait=True)
asyncio.run(process_cpu_intensive_tasks())
Output:
Starting CPU-intensive tasks across multiple processes
Computing factorial of 5 in process ...
Computing factorial of 6 in process ...
Computing factorial of 7 in process ...
Computing factorial of 8 in process ...
Finished computing 5!
Finished computing 6!
Finished computing 7!
Finished computing 8!
Results: [120, 720, 5040, 40320]
"The key difference," Margaret emphasized, "is that ProcessPoolExecutor creates separate Python processes, each with its own GIL. This gives you true parallel CPU execution, unlike threads which are limited by the GIL."
"One critical constraint," Margaret warned. "ProcessPoolExecutor requires that functions and their arguments be pickleable - Python must be able to serialize them to send across process boundaries."
import asyncio
from concurrent.futures import ProcessPoolExecutor
# ✓ This works - module-level function is pickleable
def pickleable_function(x):
return x * 2
# ❌ These don't work with ProcessPoolExecutor
lambda_func = lambda x: x * 2 # Lambdas aren't pickleable
local_func = None # Functions defined inside functions aren't pickleable
async def demonstrate_pickling():
executor = ProcessPoolExecutor(max_workers=2)
loop = asyncio.get_running_loop()
# This works
result = await loop.run_in_executor(
executor,
pickleable_function,
10
)
print(f"Success: {result}")
# This fails with pickle error
try:
result = await loop.run_in_executor(
executor,
lambda x: x * 2, # ❌ Can't pickle lambda
10
)
except Exception as e:
print(f"Lambda failed: {type(e).__name__}")
executor.shutdown(wait=True)
# asyncio.run(demonstrate_pickling())
"For ProcessPoolExecutor," Margaret explained, "always use module-level functions. If you need flexibility, pass configuration as arguments rather than using closures or lambdas."
Wrapping Blocking Libraries
Timothy showed Margaret a common real-world problem:
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
class LegacyDatabaseClient:
"""A synchronous database client we can't modify"""
def connect(self):
print("Connecting to database...")
time.sleep(1)
print("Connected!")
def query(self, sql):
print(f"Executing: {sql}")
time.sleep(2) # Simulates network I/O
return [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
def close(self):
print("Closing connection...")
time.sleep(0.5)
class AsyncDatabaseWrapper:
"""Async wrapper around legacy sync client"""
def __init__(self):
self.client = LegacyDatabaseClient()
self.executor = ThreadPoolExecutor(max_workers=4)
self.loop = None
async def connect(self):
self.loop = asyncio.get_running_loop()
await self.loop.run_in_executor(
self.executor,
self.client.connect
)
async def query(self, sql):
return await self.loop.run_in_executor(
self.executor,
self.client.query,
sql
)
async def close(self):
await self.loop.run_in_executor(
self.executor,
self.client.close
)
self.executor.shutdown(wait=True)
async def demonstrate_wrapper():
db = AsyncDatabaseWrapper()
await db.connect()
# Multiple concurrent queries - possible because of executor!
results = await asyncio.gather(
db.query("SELECT * FROM users"),
db.query("SELECT * FROM orders"),
db.query("SELECT * FROM products")
)
for i, result in enumerate(results, 1):
print(f"Query {i} results: {len(result)} rows")
await db.close()
asyncio.run(demonstrate_wrapper())
Output:
Connecting to database...
Connected!
Executing: SELECT * FROM users
Executing: SELECT * FROM orders
Executing: SELECT * FROM products
Query 1 results: 2 rows
Query 2 results: 2 rows
Query 3 results: 2 rows
Closing connection...
"This pattern," Margaret explained, "is incredibly useful. You wrap a blocking library with async methods that use executors internally. Users of your wrapper get clean async code, and the blocking operations run safely in threads."
Handling Exceptions
"What about errors?" Timothy asked. "What if the blocking function raises an exception?"
import asyncio
from concurrent.futures import ThreadPoolExecutor
def failing_operation(value):
if value < 0:
raise ValueError("Value must be positive!")
return value * 2
async def handle_executor_exceptions():
executor = ThreadPoolExecutor(max_workers=2)
loop = asyncio.get_running_loop()
async def safe_execute(value):
try:
result = await loop.run_in_executor(
executor,
failing_operation,
value
)
print(f"Success: {value} -> {result}")
return result
except ValueError as e:
print(f"Error with {value}: {e}")
return None
results = await asyncio.gather(
safe_execute(10),
safe_execute(-5),
safe_execute(20)
)
print(f"Results: {results}")
executor.shutdown(wait=True)
asyncio.run(handle_executor_exceptions())
Output:
Success: 10 -> 20
Error with -5: Value must be positive!
Success: 20 -> 40
Results: [20, None, 40]
"Exceptions propagate naturally," Margaret said. "When you await the executor result, any exception raised in the thread gets re-raised in your async code. Handle them the same way you would for any async operation."
Context Manager Pattern
Margaret showed Timothy a clean pattern for executor lifecycle management:
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from contextlib import asynccontextmanager
@asynccontextmanager
async def executor_context(executor_class=ThreadPoolExecutor, max_workers=4):
"""Context manager for executor lifecycle
Works with both ThreadPoolExecutor and ProcessPoolExecutor
"""
executor = executor_class(max_workers=max_workers)
try:
yield executor
finally:
executor.shutdown(wait=True)
def blocking_task(task_id):
import time
time.sleep(1)
return f"Task {task_id} complete"
async def use_executor_context():
# Use with ThreadPoolExecutor
async with executor_context(ThreadPoolExecutor, max_workers=3) as executor:
loop = asyncio.get_running_loop()
tasks = [
loop.run_in_executor(executor, blocking_task, i)
for i in range(5)
]
results = await asyncio.gather(*tasks)
print(f"Thread results: {results}")
print("Thread executor automatically shut down")
# Also works with ProcessPoolExecutor
async with executor_context(ProcessPoolExecutor, max_workers=2) as executor:
loop = asyncio.get_running_loop()
tasks = [
loop.run_in_executor(executor, blocking_task, i)
for i in range(3)
]
results = await asyncio.gather(*tasks)
print(f"Process results: {results}")
print("Process executor automatically shut down")
asyncio.run(use_executor_context())
"The context manager," Margaret explained, "ensures proper cleanup even if exceptions occur. The executor shuts down automatically when you exit the async with block, and this pattern works identically for both ThreadPoolExecutor and ProcessPoolExecutor."
Executor Shutdown Behavior
"Let's talk about proper cleanup," Margaret said. "The shutdown() method has an important parameter:"
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
def slow_task(task_id):
print(f"Task {task_id} starting")
time.sleep(2)
print(f"Task {task_id} finished")
return task_id
async def demonstrate_shutdown_wait_true():
"""shutdown(wait=True) waits for all tasks to complete"""
executor = ThreadPoolExecutor(max_workers=2)
loop = asyncio.get_running_loop()
# Start 3 tasks (only 2 run immediately due to max_workers=2)
tasks = [
loop.run_in_executor(executor, slow_task, i)
for i in range(3)
]
# Don't await - just start them
print("Tasks started, shutting down with wait=True")
executor.shutdown(wait=True) # Blocks until all complete
print("Shutdown complete - all tasks finished")
async def demonstrate_shutdown_wait_false():
"""shutdown(wait=False) returns immediately, may abandon tasks"""
executor = ThreadPoolExecutor(max_workers=2)
loop = asyncio.get_running_loop()
# Start tasks but don't await
for i in range(3):
loop.run_in_executor(executor, slow_task, i)
print("Tasks started, shutting down with wait=False")
executor.shutdown(wait=False) # Returns immediately
print("Shutdown returned - but tasks may still be running!")
# Give tasks a chance to finish for demo
await asyncio.sleep(3)
# asyncio.run(demonstrate_shutdown_wait_true())
# asyncio.run(demonstrate_shutdown_wait_false())
"Always use wait=True in production," Margaret emphasized. "With wait=False, pending tasks might be abandoned, and resources might not be properly cleaned up. The only time to use wait=False is in emergency shutdown scenarios where you can't wait."
Performance Considerations
Margaret created a comparison chart:
"""
ThreadPoolExecutor vs ProcessPoolExecutor:
ThreadPoolExecutor:
✓ Use for I/O-bound operations (network, file I/O, database)
✓ Lower overhead - threads share memory
✓ Good for operations that wait on external resources
✓ Subject to Python's GIL - no true parallelism for CPU work
✗ Not good for CPU-intensive computation
ProcessPoolExecutor:
✓ Use for CPU-bound operations (computation, data processing)
✓ True parallelism - each process has its own GIL
✓ Can fully utilize multiple CPU cores
✗ Higher overhead - inter-process communication costs
✗ Arguments must be pickleable
✗ Not good for I/O-bound work (unnecessary overhead)
Rule of thumb:
- File I/O, network, legacy sync libraries → ThreadPoolExecutor
- Heavy computation, data transformation → ProcessPoolExecutor
- Quick operations (<10ms) → Just run them, overhead isn't worth it
"""
Real-World Pattern: Database Connection Pool
"Here's a production pattern," Margaret said, showing Timothy a complete example:
import asyncio
from concurrent.futures import ThreadPoolExecutor
import time
from queue import Queue
class DatabaseConnectionPool:
"""Thread-safe connection pool for blocking database"""
def __init__(self, size=5):
self.executor = ThreadPoolExecutor(max_workers=size)
self.connections = Queue(maxsize=size)
# Pre-populate pool
for i in range(size):
self.connections.put(self._create_connection(i))
def _create_connection(self, conn_id):
"""Simulate connection creation"""
return f"Connection-{conn_id}"
def _execute_query_sync(self, query):
"""Synchronous query execution"""
conn = self.connections.get()
try:
print(f"Executing '{query}' on {conn}")
time.sleep(1) # Simulate query
return f"Results for: {query}"
finally:
self.connections.put(conn)
async def execute_query(self, query):
"""Async wrapper for query execution"""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
self.executor,
self._execute_query_sync,
query
)
def shutdown(self):
self.executor.shutdown(wait=True)
async def demonstrate_connection_pool():
pool = DatabaseConnectionPool(size=3)
queries = [
"SELECT * FROM users",
"SELECT * FROM orders",
"SELECT * FROM products",
"SELECT * FROM invoices",
"SELECT * FROM customers"
]
print("Executing 5 queries with 3 connection pool\n")
tasks = [pool.execute_query(q) for q in queries]
results = await asyncio.gather(*tasks)
for query, result in zip(queries, results):
print(f"{query[:20]}... -> {result}")
pool.shutdown()
asyncio.run(demonstrate_connection_pool())
Output:
Executing 5 queries with 3 connection pool
Executing 'SELECT * FROM users' on Connection-0
Executing 'SELECT * FROM orders' on Connection-1
Executing 'SELECT * FROM products' on Connection-2
Executing 'SELECT * FROM invoices' on Connection-0
Executing 'SELECT * FROM customers' on Connection-1
SELECT * FROM users... -> Results for: SELECT * FROM users
SELECT * FROM orders... -> Results for: SELECT * FROM orders
...
Testing Executor-Based Code
Margaret showed Timothy testing patterns:
import asyncio
import pytest
from concurrent.futures import ThreadPoolExecutor
from unittest.mock import Mock, patch
def blocking_operation(value):
import time
time.sleep(0.1)
return value * 2
@pytest.mark.asyncio
async def test_executor_basic():
"""Test basic executor usage"""
executor = ThreadPoolExecutor(max_workers=2)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
executor,
blocking_operation,
21
)
assert result == 42
executor.shutdown(wait=True)
@pytest.mark.asyncio
async def test_executor_concurrent():
"""Test multiple concurrent operations"""
executor = ThreadPoolExecutor(max_workers=3)
loop = asyncio.get_running_loop()
tasks = [
loop.run_in_executor(executor, blocking_operation, i)
for i in range(5)
]
results = await asyncio.gather(*tasks)
assert results == [0, 2, 4, 6, 8]
executor.shutdown(wait=True)
@pytest.mark.asyncio
async def test_executor_exception_handling():
"""Test that exceptions propagate correctly"""
def failing_func():
raise ValueError("Test error")
executor = ThreadPoolExecutor(max_workers=1)
loop = asyncio.get_running_loop()
with pytest.raises(ValueError, match="Test error"):
await loop.run_in_executor(executor, failing_func)
executor.shutdown(wait=True)
@pytest.mark.asyncio
async def test_mock_executor():
"""Test by mocking the blocking operation"""
mock_operation = Mock(return_value="mocked result")
executor = ThreadPoolExecutor(max_workers=1)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
executor,
mock_operation,
"test input"
)
assert result == "mocked result"
mock_operation.assert_called_once_with("test input")
executor.shutdown(wait=True)
# Run tests with: pytest test_executors.py -v
Common Pitfalls
Margaret created a warning list:
"""
❌ COMMON MISTAKES:
1. Using ProcessPoolExecutor for I/O operations
executor = ProcessPoolExecutor() # Wrong for I/O!
# Use ThreadPoolExecutor instead
2. Forgetting to shutdown executor
executor = ThreadPoolExecutor()
# Use it...
# ❌ Memory leak - executor never cleaned up
executor.shutdown(wait=True) # Always call this!
# shutdown(wait=True) - Waits for all tasks to complete (recommended)
# shutdown(wait=False) - Returns immediately, abandons pending tasks (dangerous!)
3. Passing non-pickleable objects to ProcessPoolExecutor
executor = ProcessPoolExecutor()
await loop.run_in_executor(executor, lambda x: x*2, 10) # ❌ Can't pickle lambda
4. Too many workers
executor = ThreadPoolExecutor(max_workers=1000) # ❌ Wasteful overhead
# Use reasonable worker count (typically 2-10 for I/O)
5. Using executor for fast operations
def quick_op(x):
return x * 2 # Takes microseconds
await loop.run_in_executor(executor, quick_op, 5) # ❌ Overhead > benefit
# Just run it directly: result = quick_op(5)
6. Not handling executor exceptions
result = await loop.run_in_executor(executor, risky_func)
# ❌ No try/except - exception crashes program
"""
When to Use Executors
Timothy pulled out his decision tree:
"""
Should I use an Executor?
QUESTIONS:
1. Is the operation already async? → No executor needed
2. Is it a quick operation (<10ms)? → No executor needed
3. Is it CPU-bound computation? → ProcessPoolExecutor
4. Is it I/O-bound blocking code? → ThreadPoolExecutor
5. Can you make it async natively? → Prefer that over executor
EXECUTOR DECISION TREE:
My operation is...
├─ Already async (asyncio/aiohttp/etc)
│ └─> No executor needed! ✓
│
├─ Very fast (<10ms)
│ └─> Run directly, overhead not worth it
│
├─ CPU-intensive (math, data processing)
│ └─> ProcessPoolExecutor ✓
│
├─ I/O-bound blocking (legacy libs, sync file I/O)
│ └─> ThreadPoolExecutor ✓
│
└─ Could be rewritten as async
└─> Prefer async version (aiofiles, aiohttp, etc)
If not possible → ThreadPoolExecutor
"""
Margaret smiled. "The key insight: executors are a bridge between sync and async worlds. They let you integrate blocking code without sacrificing the responsiveness of your event loop."
Timothy studied the patterns. "So executors are like temporary workers - threads for I/O work that waits, processes for CPU work that computes. They keep my main async flow responsive while handling the blocking operations safely on the side."
"Exactly," Margaret said. "Master executors, and you can build async systems that seamlessly integrate with any legacy code or CPU-intensive operations."
Key Takeaways
-
Use
loop.run_in_executor()to run blocking code without blocking the event loop - ThreadPoolExecutor for I/O-bound blocking operations (file I/O, network, legacy libraries)
- ProcessPoolExecutor for CPU-bound operations (heavy computation, data processing)
-
Always shutdown executors - use context managers or explicit
shutdown(wait=True) - Exceptions propagate naturally from executor to async code
- Wrap blocking libraries with async interfaces using executors
- Don't overuse - for quick operations, overhead isn't worth it
- Test thoroughly - executors add concurrency complexity
With Executors in his toolkit, Timothy could now build truly responsive async systems that integrate smoothly with any blocking code - threading the needle between Python's async and sync worlds with confidence.
Aaron Rose is a software engineer and technology writer at tech-reader.blog and the author of Think Like a Genius.
Top comments (0)