As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
Moving beyond basic async and await in Python feels like graduating from riding a bike with training wheels. You know how to go forward, but the real world has hills, traffic, and the occasional need to carry groceries. You need more sophisticated techniques. In my own work building and maintaining systems that handle thousands of concurrent operations, I've found that the true power of asyncio isn't just in making things fast—it's in making them reliable, manageable, and resilient under pressure.
This shift requires a new toolbox. Basic patterns get you started, but when a database connection fails in the middle of a transaction, or an external API starts rejecting your requests, you need structured ways to respond. The patterns I’ll discuss are less about raw speed and more about building a system that doesn’t fall apart when things go wrong. They help you coordinate work, protect shared resources, and clean up gracefully, which is what separates a proof-of-concept from something you can run in production.
Let's start with one of the most impactful additions to Python's async story: task groups. Before task groups, managing a collection of concurrent tasks was a bit messy. You'd use asyncio.gather() or create tasks manually, but if one failed, you had to carefully catch exceptions and cancel the others yourself to avoid leaving tasks running orphaned in the background. It was easy to accidentally create resource leaks.
The asyncio.TaskGroup changes this. It provides a clean, structured way to run multiple tasks together. Its key feature is that if any task inside the group raises an exception, all other tasks in the group are automatically cancelled. This idea is often called "structured concurrency." The group ensures that tasks are entered and exited in a predictable, controlled way, much like how a context manager controls resource access.
Here's a concrete example. Imagine you're fetching data from multiple API endpoints. If one call fails, you probably want to cancel the remaining calls immediately—why wait for them if you already know part of the overall operation has failed? A task group handles this for you.
import asyncio
import aiohttp
async def fetch_all_urls(url_list):
"""Fetch multiple URLs with coordinated lifecycle."""
results = {}
async with aiohttp.ClientSession() as session:
async with asyncio.TaskGroup() as group:
# Create a task for each URL within the group
tasks = {group.create_task(fetch_one(session, url)): url for url in url_list}
# As tasks complete, collect their results
for task in tasks:
try:
data = await task
results[tasks[task]] = data
except Exception as e:
# Log or handle the failure for this specific task
print(f"Failed to fetch {tasks[task]}: {e}")
results[tasks[task]] = None
return results
async def fetch_one(session, url):
"""Fetch a single URL."""
async with session.get(url) as resp:
resp.raise_for_status()
return await resp.json()
# Use it
async def main():
urls = ["https://api.example.com/data1", "https://api.example.com/data2"]
data = await fetch_all_urls(urls)
print(data)
asyncio.run(main())
In this code, the async with asyncio.TaskGroup() as group: block is the magic. All tasks created with group.create_task() are bound to the life of that block. If fetch_one for data1 fails with a 404 error, the group immediately cancels the task for data2. When the block exits, it waits for all tasks (whether completed or cancelled) to settle, ensuring no loose ends. This pattern has dramatically simplified error handling in my concurrent code.
Now, let's talk about controlling the flow. Concurrency is great, but unlimited concurrency can be destructive. If you fire off 10,000 requests to a small service at once, you'll likely overwhelm it and get rate-limited or blocked. You need a way to limit how many things happen simultaneously. This is where semaphores and rate limiters come in.
A semaphore is like a bouncer at a club with a maximum capacity. It only lets a certain number of tasks proceed at the same time. A rate limiter is more like a metronome, ensuring actions happen at a steady, permitted pace. I often use them together.
import asyncio
import time
class SimpleRateLimiter:
"""Control how often operations can start."""
def __init__(self, calls_per_second):
self.delay = 1.0 / calls_per_second
self.last_call = 0
self._lock = asyncio.Lock()
async def wait(self):
"""Wait until it's permissible to proceed."""
async with self._lock:
now = time.monotonic()
time_since_last = now - self.last_call
wait_needed = self.delay - time_since_last
if wait_needed > 0:
await asyncio.sleep(wait_needed)
self.last_call = time.monotonic()
async def access_limited_api(item_id, limiter, semaphore):
"""An example task that respects both a rate limit and concurrency limit."""
async with semaphore: # Wait for a "spot" to open up
await limiter.wait() # Wait for the next allowed time slot
# Now perform the actual API call
print(f"Fetching item {item_id} at {time.monotonic():.2f}")
await asyncio.sleep(0.1) # Simulate network call
return f"data_for_{item_id}"
async def main():
# Allow max 3 concurrent tasks, and max 2 calls per second
semaphore = asyncio.Semaphore(3)
limiter = SimpleRateLimiter(2.0)
# Create many tasks
tasks = []
for i in range(10):
task = asyncio.create_task(access_limited_api(i, limiter, semaphore))
tasks.append(task)
# Wait for all to finish
results = await asyncio.gather(*tasks)
print(f"Done. Got {len(results)} results.")
asyncio.run(main())
In this example, the semaphore ensures no more than 3 of our access_limited_api coroutines are in the async with semaphore: block at once. Inside that block, the rate limiter makes each task wait enough time to keep the overall rate at 2 calls per second. This pattern is essential for being a good citizen when using external services and for preventing your own system from being flooded by its own work.
Asynchronous context managers are your best friends for resource management. You're probably familiar with regular context managers (with open('file') as f:). Their async cousins (async with) do the same thing but can await during setup and cleanup. This is perfect for database connections, network sessions, or any resource that needs async setup/teardown.
The beauty is that they guarantee cleanup happens, even if an exception occurs inside the block. I use them to wrap almost any operation that acquires and releases something.
import asyncio
import aiosqlite
from contextlib import asynccontextmanager
class AsyncDatabaseSession:
"""Manage a database connection with async context."""
def __init__(self, db_path):
self.db_path = db_path
self.conn = None
async def __aenter__(self):
print("Establishing database connection...")
self.conn = await aiosqlite.connect(self.db_path)
# Perform any other async setup here
await self.conn.execute("PRAGMA foreign_keys = ON")
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing database connection...")
if self.conn:
await self.conn.close()
# The exception (if any) is propagated unless we return True here
# Using it is very clean
async def insert_user(name, email):
async with AsyncDatabaseSession(":memory:") as db:
await db.execute("INSERT INTO users (name, email) VALUES (?, ?)", (name, email))
await db.commit()
print(f"Inserted {name}")
# You can also create lightweight context managers for specific operations
@asynccontextmanager
async def get_transaction(db_connection):
"""A helper to manage a transaction scope."""
try:
await db_connection.execute("BEGIN")
yield # Execution goes back to the code inside the 'async with' block
await db_connection.execute("COMMIT")
print("Transaction committed.")
except Exception:
await db_connection.execute("ROLLBACK")
print("Transaction rolled back.")
raise # Re-raise the exception
async def main():
# Create a table first
async with aiosqlite.connect(":memory:") as db:
await db.execute("CREATE TABLE users (name TEXT, email TEXT)")
await db.commit()
await insert_user("Alice", "alice@example.com")
asyncio.run(main())
The __aenter__ and __aexit__ methods define the async context manager. When the async with block starts, __aenter__ runs and its return value is assigned to the variable after as. When the block ends, __aexit__ runs to clean up. This pattern makes resource leaks much less likely. The @asynccontextmanager decorator is a convenient shortcut for simpler cases.
A common challenge in streaming data is backpressure. What if a producer generates data faster than a consumer can process it? Without control, the queue between them grows infinitely, consuming memory until the program crashes. Backpressure is the feedback mechanism that tells the producer to slow down.
Asynchronous generators are a natural fit for this. You can yield items from a producer, and an async for loop can consume them. The key is that the consumer's await on the next item is the signal. If the consumer is slow, it simply doesn't request the next item, and the producer waits at the yield.
import asyncio
import random
async def fast_producer():
"""Simulates a fast data source."""
for i in range(20):
await asyncio.sleep(0.05) # Produce very fast
yield f"data_chunk_{i}"
# The yield pauses here until the consumer asks for the next item.
async def slow_consumer():
"""Simulates a slow data processor."""
producer = fast_producer()
async for item in producer:
print(f"Consumer received: {item}")
await asyncio.sleep(0.2) # Process slowly
print(f"Consumer processed: {item}")
async def main():
print("Starting. Notice the consumer controls the pace.")
await slow_consumer()
asyncio.run(main())
In this simple example, the producer could create a new item every 0.05 seconds, but the consumer only processes one every 0.25 seconds (0.2 sleep + overhead). The producer doesn't generate the 2nd item until the consumer retrieves the first one from the generator. The flow is self-regulating. For more complex scenarios, you might use an asyncio.Queue with a maxsize parameter. A producer trying to put into a full queue will await until space is free, automatically applying backpressure.
Handling cancellation properly is a mark of robust async code. In Python, tasks can be cancelled using task.cancel(). This raises a CancelledError inside the task. The task should catch this, perform any necessary cleanup, and then re-raise the exception. The problem is that cleanup operations themselves might need to be await ed.
The standard pattern is to use a try...finally block or, more cleanly, an async context manager within your task.
import asyncio
async def resilient_worker(worker_id, stop_event):
"""A worker that cleans up properly on cancellation."""
print(f"Worker {worker_id} starting.")
try:
# Simulate acquiring a resource
resource = f"Resource_lock_{worker_id}"
print(f"Worker {worker_id} acquired {resource}.")
while not stop_event.is_set():
print(f"Worker {worker_id} is working...")
# Use asyncio.wait_for to make long waits cancellable
try:
await asyncio.wait_for(stop_event.wait(), timeout=1.0)
except asyncio.TimeoutError:
pass # This is our normal work loop
except asyncio.CancelledError:
print(f"\nWorker {worker_id} got cancellation signal. Cleaning up...")
# Simulate an async cleanup process (closing files, network connections)
await asyncio.sleep(0.5)
print(f"Worker {worker_id} cleanup complete.")
raise # Must re-raise CancelledError
finally:
# This runs for CancelledError AND normal exit
print(f"Worker {worker_id} releasing {resource}.")
# Final synchronous cleanup
async def main():
stop_event = asyncio.Event()
worker_task = asyncio.create_task(resilient_worker(1, stop_event))
# Let it work for a bit
await asyncio.sleep(2.5)
print("\nMain: Requesting worker cancellation.")
worker_task.cancel()
try:
await worker_task
except asyncio.CancelledError:
print("Main: Worker was cancelled successfully.")
print("Main: All done.")
asyncio.run(main())
The crucial part is catching asyncio.CancelledError, doing your await-able cleanup, and then re-raising the exception. The finally block is also executed, making it good for synchronous cleanup. Never suppress the CancelledError unless you have a very good reason and know the consequences—it can make your program hang.
Most codebases aren't purely async. You have synchronous libraries for file I/O, CPU-heavy calculations, or legacy code. Running them directly in your async event loop will block everything. The solution is to run them in a separate thread or process, and asyncio provides tools for this.
Use loop.run_in_executor() to offload blocking functions. For I/O-bound blocking code (like reading a file with the standard open()), use a ThreadPoolExecutor. For CPU-bound code (like number crunching with NumPy), use a ProcessPoolExecutor to avoid the GIL.
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def blocking_io_task(filename):
"""A synchronous, blocking file operation."""
time.sleep(1) # Simulate slow I/O
with open(filename, 'w') as f:
f.write("Some data")
return f"Wrote to {filename}"
def cpu_intensive_task(number):
"""A synchronous, CPU-heavy calculation."""
return sum(i * i for i in range(number))
async def main():
loop = asyncio.get_running_loop()
# Option 1: Run blocking I/O in a thread
print("Starting blocking I/O in thread...")
# Create a default thread pool executor is fine for many cases
io_result = await loop.run_in_executor(None, blocking_io_task, "test.txt")
print(f"IO result: {io_result}")
# Option 2: Run CPU work in a separate process
print("\nStarting CPU work in process...")
# It's better to create and reuse a ProcessPoolExecutor
with ProcessPoolExecutor() as process_pool:
cpu_result = await loop.run_in_executor(process_pool, cpu_intensive_task, 1000000)
print(f"CPU result sum: {cpu_result}")
# You can run many blocking calls concurrently
print("\nRunning multiple blocking calls concurrently...")
with ThreadPoolExecutor(max_workers=3) as thread_pool:
tasks = [
loop.run_in_executor(thread_pool, blocking_io_task, f"file_{i}.txt")
for i in range(3)
]
results = await asyncio.gather(*tasks)
for r in results:
print(r)
asyncio.run(main())
The None argument to run_in_executor uses the default thread pool. The key point is that await loop.run_in_executor(...) yields control back to the event loop, allowing other async tasks to run while the blocking code executes in the background. This lets you integrate almost any synchronous library into your async application.
Finally, let's look at building a decoupled system with an async event bus. This is a more architectural pattern. Components (like a sensor collector, a data processor, and a logger) don't call each other directly. Instead, they publish events (like "data_received" or "processing_complete") to a central bus. Other components subscribe to events they care about.
This makes the system very flexible. You can add a new monitoring component without touching the collector or processor code. Everything communicates through a well-defined message channel.
import asyncio
from asyncio import Queue
from dataclasses import dataclass
from typing import Any
import time
@dataclass
class Event:
"""A simple event structure."""
type: str
data: Any
timestamp: float = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
class AsyncEventBus:
"""A simple in-memory event bus."""
def __init__(self):
self._subscribers = {}
def subscribe(self, event_type: str) -> Queue:
"""Subscribe to an event type. Returns a queue to listen on."""
if event_type not in self._subscribers:
self._subscribers[event_type] = []
queue = Queue()
self._subscribers[event_type].append(queue)
return queue
async def publish(self, event: Event):
"""Publish an event to all subscribers of its type."""
queues = self._subscribers.get(event.type, [])
for queue in queues:
await queue.put(event) # This will wait if a queue is full
async def data_generator(bus, name):
"""A component that generates data events."""
for i in range(5):
await asyncio.sleep(0.3)
event = Event(type="new_data", data={"source": name, "value": i})
await bus.publish(event)
print(f"[Generator {name}] Published: {event.data}")
async def data_processor(bus, name):
"""A component that processes 'new_data' events."""
queue = bus.subscribe("new_data")
print(f"[Processor {name}] Started, waiting for data.")
while True:
event = await queue.get()
# Simulate processing time
await asyncio.sleep(0.1)
print(f"[Processor {name}] Processed {event.data} from {event.timestamp:.2f}")
# Could publish a "data_processed" event here
queue.task_done()
async def main():
bus = AsyncEventBus()
# Start the components
generator_tasks = [asyncio.create_task(data_generator(bus, f"Gen_{i}")) for i in range(2)]
processor_tasks = [asyncio.create_task(data_processor(bus, f"Proc_{i}")) for i in range(3)]
# Let the generators finish their work
await asyncio.gather(*generator_tasks)
print("\nAll data generated. Waiting a moment for processing to finish...")
await asyncio.sleep(1)
# Cancel the processors (they run forever in this simple example)
for task in processor_tasks:
task.cancel()
await asyncio.gather(*processor_tasks, return_exceptions=True)
print("Done.")
asyncio.run(main())
In this system, the data_generator and data_processor have no direct knowledge of each other. The bus manages the connections. The processors get their events from their dedicated Queue object. This pattern scales well and keeps components testable in isolation. You can imagine extending it to use a distributed message queue like Redis or RabbitMQ for a multi-service architecture.
These patterns—task groups for structured concurrency, semaphores for limiting, context managers for resources, generators for backpressure, careful cancellation, executor bridging, and event-driven design—form a toolkit for advanced async programming. They move you from writing code that just works to writing systems that are predictable, maintainable, and resilient under the complex conditions of real-world use. I've found that applying them thoughtfully transforms the challenge of concurrency from a source of bugs into a structured and manageable part of system design.
📘 Checkout my latest ebook for free on my channel!
Be sure to like, share, comment, and subscribe to the channel!
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva
Top comments (0)