DEV Community

Cover image for Advanced Python Async Patterns: Task Groups, Backpressure Control, and Production-Ready Concurrency Techniques
Nithin Bharadwaj
Nithin Bharadwaj

Posted on

Advanced Python Async Patterns: Task Groups, Backpressure Control, and Production-Ready Concurrency Techniques

As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!

Moving beyond basic async and await in Python feels like graduating from riding a bike with training wheels. You know how to go forward, but the real world has hills, traffic, and the occasional need to carry groceries. You need more sophisticated techniques. In my own work building and maintaining systems that handle thousands of concurrent operations, I've found that the true power of asyncio isn't just in making things fast—it's in making them reliable, manageable, and resilient under pressure.

This shift requires a new toolbox. Basic patterns get you started, but when a database connection fails in the middle of a transaction, or an external API starts rejecting your requests, you need structured ways to respond. The patterns I’ll discuss are less about raw speed and more about building a system that doesn’t fall apart when things go wrong. They help you coordinate work, protect shared resources, and clean up gracefully, which is what separates a proof-of-concept from something you can run in production.

Let's start with one of the most impactful additions to Python's async story: task groups. Before task groups, managing a collection of concurrent tasks was a bit messy. You'd use asyncio.gather() or create tasks manually, but if one failed, you had to carefully catch exceptions and cancel the others yourself to avoid leaving tasks running orphaned in the background. It was easy to accidentally create resource leaks.

The asyncio.TaskGroup changes this. It provides a clean, structured way to run multiple tasks together. Its key feature is that if any task inside the group raises an exception, all other tasks in the group are automatically cancelled. This idea is often called "structured concurrency." The group ensures that tasks are entered and exited in a predictable, controlled way, much like how a context manager controls resource access.

Here's a concrete example. Imagine you're fetching data from multiple API endpoints. If one call fails, you probably want to cancel the remaining calls immediately—why wait for them if you already know part of the overall operation has failed? A task group handles this for you.

import asyncio
import aiohttp

async def fetch_all_urls(url_list):
    """Fetch multiple URLs with coordinated lifecycle."""
    results = {}
    async with aiohttp.ClientSession() as session:
        async with asyncio.TaskGroup() as group:
            # Create a task for each URL within the group
            tasks = {group.create_task(fetch_one(session, url)): url for url in url_list}

            # As tasks complete, collect their results
            for task in tasks:
                try:
                    data = await task
                    results[tasks[task]] = data
                except Exception as e:
                    # Log or handle the failure for this specific task
                    print(f"Failed to fetch {tasks[task]}: {e}")
                    results[tasks[task]] = None

    return results

async def fetch_one(session, url):
    """Fetch a single URL."""
    async with session.get(url) as resp:
        resp.raise_for_status()
        return await resp.json()

# Use it
async def main():
    urls = ["https://api.example.com/data1", "https://api.example.com/data2"]
    data = await fetch_all_urls(urls)
    print(data)

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

In this code, the async with asyncio.TaskGroup() as group: block is the magic. All tasks created with group.create_task() are bound to the life of that block. If fetch_one for data1 fails with a 404 error, the group immediately cancels the task for data2. When the block exits, it waits for all tasks (whether completed or cancelled) to settle, ensuring no loose ends. This pattern has dramatically simplified error handling in my concurrent code.

Now, let's talk about controlling the flow. Concurrency is great, but unlimited concurrency can be destructive. If you fire off 10,000 requests to a small service at once, you'll likely overwhelm it and get rate-limited or blocked. You need a way to limit how many things happen simultaneously. This is where semaphores and rate limiters come in.

A semaphore is like a bouncer at a club with a maximum capacity. It only lets a certain number of tasks proceed at the same time. A rate limiter is more like a metronome, ensuring actions happen at a steady, permitted pace. I often use them together.

import asyncio
import time

class SimpleRateLimiter:
    """Control how often operations can start."""
    def __init__(self, calls_per_second):
        self.delay = 1.0 / calls_per_second
        self.last_call = 0
        self._lock = asyncio.Lock()

    async def wait(self):
        """Wait until it's permissible to proceed."""
        async with self._lock:
            now = time.monotonic()
            time_since_last = now - self.last_call
            wait_needed = self.delay - time_since_last

            if wait_needed > 0:
                await asyncio.sleep(wait_needed)

            self.last_call = time.monotonic()

async def access_limited_api(item_id, limiter, semaphore):
    """An example task that respects both a rate limit and concurrency limit."""
    async with semaphore:  # Wait for a "spot" to open up
        await limiter.wait()  # Wait for the next allowed time slot
        # Now perform the actual API call
        print(f"Fetching item {item_id} at {time.monotonic():.2f}")
        await asyncio.sleep(0.1)  # Simulate network call
        return f"data_for_{item_id}"

async def main():
    # Allow max 3 concurrent tasks, and max 2 calls per second
    semaphore = asyncio.Semaphore(3)
    limiter = SimpleRateLimiter(2.0)

    # Create many tasks
    tasks = []
    for i in range(10):
        task = asyncio.create_task(access_limited_api(i, limiter, semaphore))
        tasks.append(task)

    # Wait for all to finish
    results = await asyncio.gather(*tasks)
    print(f"Done. Got {len(results)} results.")

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

In this example, the semaphore ensures no more than 3 of our access_limited_api coroutines are in the async with semaphore: block at once. Inside that block, the rate limiter makes each task wait enough time to keep the overall rate at 2 calls per second. This pattern is essential for being a good citizen when using external services and for preventing your own system from being flooded by its own work.

Asynchronous context managers are your best friends for resource management. You're probably familiar with regular context managers (with open('file') as f:). Their async cousins (async with) do the same thing but can await during setup and cleanup. This is perfect for database connections, network sessions, or any resource that needs async setup/teardown.

The beauty is that they guarantee cleanup happens, even if an exception occurs inside the block. I use them to wrap almost any operation that acquires and releases something.

import asyncio
import aiosqlite
from contextlib import asynccontextmanager

class AsyncDatabaseSession:
    """Manage a database connection with async context."""
    def __init__(self, db_path):
        self.db_path = db_path
        self.conn = None

    async def __aenter__(self):
        print("Establishing database connection...")
        self.conn = await aiosqlite.connect(self.db_path)
        # Perform any other async setup here
        await self.conn.execute("PRAGMA foreign_keys = ON")
        return self.conn

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing database connection...")
        if self.conn:
            await self.conn.close()
        # The exception (if any) is propagated unless we return True here

# Using it is very clean
async def insert_user(name, email):
    async with AsyncDatabaseSession(":memory:") as db:
        await db.execute("INSERT INTO users (name, email) VALUES (?, ?)", (name, email))
        await db.commit()
        print(f"Inserted {name}")

# You can also create lightweight context managers for specific operations
@asynccontextmanager
async def get_transaction(db_connection):
    """A helper to manage a transaction scope."""
    try:
        await db_connection.execute("BEGIN")
        yield  # Execution goes back to the code inside the 'async with' block
        await db_connection.execute("COMMIT")
        print("Transaction committed.")
    except Exception:
        await db_connection.execute("ROLLBACK")
        print("Transaction rolled back.")
        raise  # Re-raise the exception

async def main():
    # Create a table first
    async with aiosqlite.connect(":memory:") as db:
        await db.execute("CREATE TABLE users (name TEXT, email TEXT)")
        await db.commit()

    await insert_user("Alice", "alice@example.com")

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

The __aenter__ and __aexit__ methods define the async context manager. When the async with block starts, __aenter__ runs and its return value is assigned to the variable after as. When the block ends, __aexit__ runs to clean up. This pattern makes resource leaks much less likely. The @asynccontextmanager decorator is a convenient shortcut for simpler cases.

A common challenge in streaming data is backpressure. What if a producer generates data faster than a consumer can process it? Without control, the queue between them grows infinitely, consuming memory until the program crashes. Backpressure is the feedback mechanism that tells the producer to slow down.

Asynchronous generators are a natural fit for this. You can yield items from a producer, and an async for loop can consume them. The key is that the consumer's await on the next item is the signal. If the consumer is slow, it simply doesn't request the next item, and the producer waits at the yield.

import asyncio
import random

async def fast_producer():
    """Simulates a fast data source."""
    for i in range(20):
        await asyncio.sleep(0.05)  # Produce very fast
        yield f"data_chunk_{i}"
        # The yield pauses here until the consumer asks for the next item.

async def slow_consumer():
    """Simulates a slow data processor."""
    producer = fast_producer()
    async for item in producer:
        print(f"Consumer received: {item}")
        await asyncio.sleep(0.2)  # Process slowly
        print(f"Consumer processed: {item}")

async def main():
    print("Starting. Notice the consumer controls the pace.")
    await slow_consumer()

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

In this simple example, the producer could create a new item every 0.05 seconds, but the consumer only processes one every 0.25 seconds (0.2 sleep + overhead). The producer doesn't generate the 2nd item until the consumer retrieves the first one from the generator. The flow is self-regulating. For more complex scenarios, you might use an asyncio.Queue with a maxsize parameter. A producer trying to put into a full queue will await until space is free, automatically applying backpressure.

Handling cancellation properly is a mark of robust async code. In Python, tasks can be cancelled using task.cancel(). This raises a CancelledError inside the task. The task should catch this, perform any necessary cleanup, and then re-raise the exception. The problem is that cleanup operations themselves might need to be await ed.

The standard pattern is to use a try...finally block or, more cleanly, an async context manager within your task.

import asyncio

async def resilient_worker(worker_id, stop_event):
    """A worker that cleans up properly on cancellation."""
    print(f"Worker {worker_id} starting.")
    try:
        # Simulate acquiring a resource
        resource = f"Resource_lock_{worker_id}"
        print(f"Worker {worker_id} acquired {resource}.")

        while not stop_event.is_set():
            print(f"Worker {worker_id} is working...")
            # Use asyncio.wait_for to make long waits cancellable
            try:
                await asyncio.wait_for(stop_event.wait(), timeout=1.0)
            except asyncio.TimeoutError:
                pass  # This is our normal work loop

    except asyncio.CancelledError:
        print(f"\nWorker {worker_id} got cancellation signal. Cleaning up...")
        # Simulate an async cleanup process (closing files, network connections)
        await asyncio.sleep(0.5)
        print(f"Worker {worker_id} cleanup complete.")
        raise  # Must re-raise CancelledError
    finally:
        # This runs for CancelledError AND normal exit
        print(f"Worker {worker_id} releasing {resource}.")
        # Final synchronous cleanup

async def main():
    stop_event = asyncio.Event()
    worker_task = asyncio.create_task(resilient_worker(1, stop_event))

    # Let it work for a bit
    await asyncio.sleep(2.5)

    print("\nMain: Requesting worker cancellation.")
    worker_task.cancel()

    try:
        await worker_task
    except asyncio.CancelledError:
        print("Main: Worker was cancelled successfully.")

    print("Main: All done.")

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

The crucial part is catching asyncio.CancelledError, doing your await-able cleanup, and then re-raising the exception. The finally block is also executed, making it good for synchronous cleanup. Never suppress the CancelledError unless you have a very good reason and know the consequences—it can make your program hang.

Most codebases aren't purely async. You have synchronous libraries for file I/O, CPU-heavy calculations, or legacy code. Running them directly in your async event loop will block everything. The solution is to run them in a separate thread or process, and asyncio provides tools for this.

Use loop.run_in_executor() to offload blocking functions. For I/O-bound blocking code (like reading a file with the standard open()), use a ThreadPoolExecutor. For CPU-bound code (like number crunching with NumPy), use a ProcessPoolExecutor to avoid the GIL.

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def blocking_io_task(filename):
    """A synchronous, blocking file operation."""
    time.sleep(1)  # Simulate slow I/O
    with open(filename, 'w') as f:
        f.write("Some data")
    return f"Wrote to {filename}"

def cpu_intensive_task(number):
    """A synchronous, CPU-heavy calculation."""
    return sum(i * i for i in range(number))

async def main():
    loop = asyncio.get_running_loop()

    # Option 1: Run blocking I/O in a thread
    print("Starting blocking I/O in thread...")
    # Create a default thread pool executor is fine for many cases
    io_result = await loop.run_in_executor(None, blocking_io_task, "test.txt")
    print(f"IO result: {io_result}")

    # Option 2: Run CPU work in a separate process
    print("\nStarting CPU work in process...")
    # It's better to create and reuse a ProcessPoolExecutor
    with ProcessPoolExecutor() as process_pool:
        cpu_result = await loop.run_in_executor(process_pool, cpu_intensive_task, 1000000)
        print(f"CPU result sum: {cpu_result}")

    # You can run many blocking calls concurrently
    print("\nRunning multiple blocking calls concurrently...")
    with ThreadPoolExecutor(max_workers=3) as thread_pool:
        tasks = [
            loop.run_in_executor(thread_pool, blocking_io_task, f"file_{i}.txt")
            for i in range(3)
        ]
        results = await asyncio.gather(*tasks)
        for r in results:
            print(r)

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

The None argument to run_in_executor uses the default thread pool. The key point is that await loop.run_in_executor(...) yields control back to the event loop, allowing other async tasks to run while the blocking code executes in the background. This lets you integrate almost any synchronous library into your async application.

Finally, let's look at building a decoupled system with an async event bus. This is a more architectural pattern. Components (like a sensor collector, a data processor, and a logger) don't call each other directly. Instead, they publish events (like "data_received" or "processing_complete") to a central bus. Other components subscribe to events they care about.

This makes the system very flexible. You can add a new monitoring component without touching the collector or processor code. Everything communicates through a well-defined message channel.

import asyncio
from asyncio import Queue
from dataclasses import dataclass
from typing import Any
import time

@dataclass
class Event:
    """A simple event structure."""
    type: str
    data: Any
    timestamp: float = None

    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = time.time()

class AsyncEventBus:
    """A simple in-memory event bus."""
    def __init__(self):
        self._subscribers = {}

    def subscribe(self, event_type: str) -> Queue:
        """Subscribe to an event type. Returns a queue to listen on."""
        if event_type not in self._subscribers:
            self._subscribers[event_type] = []
        queue = Queue()
        self._subscribers[event_type].append(queue)
        return queue

    async def publish(self, event: Event):
        """Publish an event to all subscribers of its type."""
        queues = self._subscribers.get(event.type, [])
        for queue in queues:
            await queue.put(event)  # This will wait if a queue is full

async def data_generator(bus, name):
    """A component that generates data events."""
    for i in range(5):
        await asyncio.sleep(0.3)
        event = Event(type="new_data", data={"source": name, "value": i})
        await bus.publish(event)
        print(f"[Generator {name}] Published: {event.data}")

async def data_processor(bus, name):
    """A component that processes 'new_data' events."""
    queue = bus.subscribe("new_data")
    print(f"[Processor {name}] Started, waiting for data.")
    while True:
        event = await queue.get()
        # Simulate processing time
        await asyncio.sleep(0.1)
        print(f"[Processor {name}] Processed {event.data} from {event.timestamp:.2f}")
        # Could publish a "data_processed" event here
        queue.task_done()

async def main():
    bus = AsyncEventBus()

    # Start the components
    generator_tasks = [asyncio.create_task(data_generator(bus, f"Gen_{i}")) for i in range(2)]
    processor_tasks = [asyncio.create_task(data_processor(bus, f"Proc_{i}")) for i in range(3)]

    # Let the generators finish their work
    await asyncio.gather(*generator_tasks)
    print("\nAll data generated. Waiting a moment for processing to finish...")
    await asyncio.sleep(1)

    # Cancel the processors (they run forever in this simple example)
    for task in processor_tasks:
        task.cancel()

    await asyncio.gather(*processor_tasks, return_exceptions=True)
    print("Done.")

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

In this system, the data_generator and data_processor have no direct knowledge of each other. The bus manages the connections. The processors get their events from their dedicated Queue object. This pattern scales well and keeps components testable in isolation. You can imagine extending it to use a distributed message queue like Redis or RabbitMQ for a multi-service architecture.

These patterns—task groups for structured concurrency, semaphores for limiting, context managers for resources, generators for backpressure, careful cancellation, executor bridging, and event-driven design—form a toolkit for advanced async programming. They move you from writing code that just works to writing systems that are predictable, maintainable, and resilient under the complex conditions of real-world use. I've found that applying them thoughtfully transforms the challenge of concurrency from a source of bugs into a structured and manageable part of system design.

📘 Checkout my latest ebook for free on my channel!

Be sure to like, share, comment, and subscribe to the channel!


101 Books

101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.

Check out our book Golang Clean Code available on Amazon.

Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!

Our Creations

Be sure to check out our creations:

Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools


We are on Medium

Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva

Top comments (0)