DEV Community

Cover image for The Task Scheduler: Managing Multiple Coroutines with asyncio
Aaron Rose
Aaron Rose

Posted on

The Task Scheduler: Managing Multiple Coroutines with asyncio

Timothy stared at his screen, frustrated. The library's new automated inventory system needed to check the status of books across three different sources: the main catalog, the reserve collection, and the inter-library loan system. He'd written async functions for each source, but his code was taking forever to run.

"Margaret, I don't understand," Timothy said, pulling up his code. "I'm using async/await, but it's still running everything sequentially. It's taking just as long as synchronous code!"

import asyncio
import time

async def check_main_catalog():
    """Check main catalog - takes 2 seconds"""
    print("  Checking main catalog...")
    await asyncio.sleep(2)
    return {"main": 1247}

async def check_reserves():
    """Check reserve collection - takes 2 seconds"""
    print("  Checking reserves...")
    await asyncio.sleep(2)
    return {"reserves": 89}

async def check_interlibrary():
    """Check inter-library loans - takes 2 seconds"""
    print("  Checking inter-library...")
    await asyncio.sleep(2)
    return {"interlibrary": 34}

async def get_inventory_slow():
    """Timothy's sequential approach"""
    print("Starting inventory check (slow way)...")
    start = time.time()

    main = await check_main_catalog()
    reserves = await check_reserves()
    interlibrary = await check_interlibrary()

    elapsed = time.time() - start
    print(f"Completed in {elapsed:.1f} seconds")
    return {**main, **reserves, **interlibrary}

asyncio.run(get_inventory_slow())
Enter fullscreen mode Exit fullscreen mode

Output:

Starting inventory check (slow way)...
  Checking main catalog...
  Checking reserves...
  Checking inter-library...
Completed in 6.0 seconds
Enter fullscreen mode Exit fullscreen mode

Margaret looked over his shoulder. "You're using await on each call, one after another. That means you're waiting for each to finish before starting the next. You're not running them concurrently—you're running them sequentially."

"But they're all async functions!" Timothy protested.

"Being async-capable doesn't make them automatically concurrent," Margaret explained. "You need to explicitly schedule them to run together. That's what Tasks are for."

Understanding Coroutines vs Tasks

"Let me show you the difference," Margaret said, opening a new file.

import asyncio

async def my_coroutine():
    """Just a coroutine function"""
    print("  Coroutine running")
    await asyncio.sleep(1)
    return "done"

async def demo_coroutine_vs_task():
    print("Creating a coroutine object:")
    coro = my_coroutine()
    print(f"  Type: {type(coro)}")
    print(f"  It hasn't run yet!")

    print("\nCreating a Task:")
    task = asyncio.create_task(my_coroutine())
    print(f"  Type: {type(task)}")
    print(f"  It's already scheduled and running!")

    # Give it time to complete
    result = await task
    print(f"  Result: {result}")

    # Clean up the unused coroutine
    coro.close()

asyncio.run(demo_coroutine_vs_task())
Enter fullscreen mode Exit fullscreen mode

Output:

Creating a coroutine object:
  Type: <class 'coroutine'>
  It hasn't run yet!

Creating a Task:
  Type: <class 'Task'>
  It's already scheduled and running!
  Coroutine running
  Result: done
Enter fullscreen mode Exit fullscreen mode

"See the difference?" Margaret asked. "A coroutine is just a recipe. Calling my_coroutine() doesn't run anything—it just gives you a coroutine object. But asyncio.create_task() wraps that coroutine in a Task, which immediately schedules it on the event loop to run."

Running Tasks Concurrently

Margaret refactored Timothy's inventory code:

import asyncio
import time

async def check_main_catalog():
    """Check main catalog - takes 2 seconds"""
    print("  Checking main catalog...")
    await asyncio.sleep(2)
    return {"main": 1247}

async def check_reserves():
    """Check reserve collection - takes 2 seconds"""
    print("  Checking reserves...")
    await asyncio.sleep(2)
    return {"reserves": 89}

async def check_interlibrary():
    """Check inter-library loans - takes 2 seconds"""
    print("  Checking inter-library...")
    await asyncio.sleep(2)
    return {"interlibrary": 34}

async def get_inventory_fast():
    """Concurrent approach with Tasks"""
    print("Starting inventory check (concurrent way)...")
    start = time.time()

    # Create tasks - they start running immediately!
    task1 = asyncio.create_task(check_main_catalog(), name="catalog")
    task2 = asyncio.create_task(check_reserves(), name="reserves")
    task3 = asyncio.create_task(check_interlibrary(), name="interlibrary")

    # Now wait for all of them to complete
    main = await task1
    reserves = await task2
    interlibrary = await task3

    elapsed = time.time() - start
    print(f"Completed in {elapsed:.1f} seconds")
    return {**main, **reserves, **interlibrary}

asyncio.run(get_inventory_fast())
Enter fullscreen mode Exit fullscreen mode

Output:

Starting inventory check (concurrent way)...
  Checking main catalog...
  Checking reserves...
  Checking inter-library...
Completed in 2.0 seconds
Enter fullscreen mode Exit fullscreen mode

Timothy's eyes widened. "Two seconds instead of six! They all ran at the same time!"

"Exactly," Margaret said. "When you call create_task(), the event loop immediately starts executing that coroutine. All three tasks begin their work concurrently. Then we await each task to collect the results."

"Notice I added name= parameters," Margaret added. "That helps with debugging—you can see which task is which in error messages and when inspecting running tasks."

Background Tasks and Garbage Collection

"One important warning," Margaret said, leaning forward. "If you create a task but don't keep a reference to it or await it, Python might garbage collect it. The event loop only keeps weak references to tasks, so if your code doesn't hold a strong reference, the garbage collector can remove the task before it completes."

import asyncio

async def background_work(task_id):
    """Some background work"""
    print(f"  Task {task_id} starting...")
    await asyncio.sleep(2)
    print(f"  Task {task_id} done!")

async def demo_garbage_collection_problem():
    """BAD: Task might be garbage collected"""
    print("Creating background task (no reference)...")
    asyncio.create_task(background_work(1))  # Might be GC'd!

    print("Doing other work...")
    await asyncio.sleep(0.5)
    print("Main work done")
    # Background task might not complete!

async def demo_garbage_collection_solution():
    """GOOD: Keep references to background tasks"""
    print("\nCreating background tasks (with references)...")
    background_tasks = set()

    for i in range(3):
        task = asyncio.create_task(background_work(i), name=f"bg-{i}")
        background_tasks.add(task)
        # Remove from set when done
        task.add_done_callback(background_tasks.discard)

    print("Doing other work...")
    await asyncio.sleep(0.5)
    print("Waiting for background tasks...")
    await asyncio.sleep(2)
    print("All done!")

asyncio.run(demo_garbage_collection_problem())
asyncio.run(demo_garbage_collection_solution())
Enter fullscreen mode Exit fullscreen mode

Output:

Creating background task (no reference)...
Doing other work...
Main work done

Creating background tasks (with references)...
  Task 0 starting...
  Task 1 starting...
  Task 2 starting...
Doing other work...
Waiting for background tasks...
  Task 0 done!
  Task 1 done!
  Task 2 done!
All done!
Enter fullscreen mode Exit fullscreen mode

"Always keep references to tasks you want to complete," Margaret emphasized. "The pattern with the set and add_done_callback() is common for managing background tasks."

Using asyncio.gather() for Cleaner Code

"There's an even cleaner way," Margaret continued. "When you want to run multiple coroutines concurrently and wait for all of them, use asyncio.gather()."

import asyncio
import time

async def check_main_catalog():
    print("  Checking main catalog...")
    await asyncio.sleep(2)
    return {"main": 1247}

async def check_reserves():
    print("  Checking reserves...")
    await asyncio.sleep(2)
    return {"reserves": 89}

async def check_interlibrary():
    print("  Checking inter-library...")
    await asyncio.sleep(2)
    return {"interlibrary": 34}

async def get_inventory_with_gather():
    """Using gather() for cleaner concurrent execution"""
    print("Starting inventory check (gather)...")
    start = time.time()

    # Pass coroutines directly to gather - it creates tasks automatically
    results = await asyncio.gather(
        check_main_catalog(),
        check_reserves(),
        check_interlibrary()
    )

    elapsed = time.time() - start
    print(f"Completed in {elapsed:.1f} seconds")

    # Combine the results
    combined = {}
    for result in results:
        combined.update(result)
    return combined

result = asyncio.run(get_inventory_with_gather())
print(f"Final result: {result}")
Enter fullscreen mode Exit fullscreen mode

Output:

Starting inventory check (gather)...
  Checking main catalog...
  Checking reserves...
  Checking inter-library...
Completed in 2.0 seconds
Final result: {'main': 1247, 'reserves': 89, 'interlibrary': 34}
Enter fullscreen mode Exit fullscreen mode

"Much cleaner!" Timothy said. "No manual task creation, and the results come back as a list in order."

"Right," Margaret said. "gather() handles the task creation for you and returns results in the same order you passed the coroutines. That's a key convenience—you can pass coroutines directly, and gather() wraps them in tasks internally."

Exception Handling with gather()

"But what happens if one of the tasks fails?" Timothy asked.

Margaret demonstrated:

import asyncio

async def task_succeeds():
    await asyncio.sleep(1)
    return "success"

async def task_fails():
    await asyncio.sleep(0.5)
    raise ValueError("Something went wrong!")

async def task_also_succeeds():
    await asyncio.sleep(1)
    return "also success"

async def demo_gather_exception():
    """What happens when a task raises an exception?"""
    print("Running tasks with gather...")
    try:
        results = await asyncio.gather(
            task_succeeds(),
            task_fails(),
            task_also_succeeds()
        )
        print(f"Results: {results}")
    except ValueError as e:
        print(f"Caught exception: {e}")
        print("Note: Other tasks still completed!")

asyncio.run(demo_gather_exception())
Enter fullscreen mode Exit fullscreen mode

Output:

Running tasks with gather...
Caught exception: Something went wrong!
Note: Other tasks still completed!
Enter fullscreen mode Exit fullscreen mode

"By default, gather() propagates the first exception it encounters," Margaret explained. "But here's the important part: the other tasks still run to completion. The exception just stops gather() from returning their results."

She showed an alternative:

import asyncio

async def task_succeeds():
    await asyncio.sleep(1)
    return "success"

async def task_fails():
    await asyncio.sleep(0.5)
    raise ValueError("Something went wrong!")

async def task_also_succeeds():
    await asyncio.sleep(1)
    return "also success"

async def demo_gather_return_exceptions():
    """Collect exceptions as values instead of raising"""
    print("Running tasks with return_exceptions=True...")
    results = await asyncio.gather(
        task_succeeds(),
        task_fails(),
        task_also_succeeds(),
        return_exceptions=True
    )

    print("Results:")
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"  Task {i}: FAILED with {result}")
        else:
            print(f"  Task {i}: {result}")

asyncio.run(demo_gather_return_exceptions())
Enter fullscreen mode Exit fullscreen mode

Output:

Running tasks with return_exceptions=True...
Results:
  Task 0: success
  Task 1: FAILED with Something went wrong!
  Task 2: also success
Enter fullscreen mode Exit fullscreen mode

"With return_exceptions=True, exceptions become part of the results list instead of being raised," Margaret said. "This is useful when you want to handle each failure individually."

Using asyncio.wait() for Fine-Grained Control

"Sometimes you need more control than gather() provides," Margaret said. "That's where asyncio.wait() comes in. Unlike gather(), which accepts coroutines directly, wait() requires Task objects—you must create them manually first."

import asyncio

async def quick_task():
    await asyncio.sleep(1)
    return "quick"

async def slow_task():
    await asyncio.sleep(3)
    return "slow"

async def medium_task():
    await asyncio.sleep(2)
    return "medium"

async def demo_wait_first_completed():
    """Process results as they complete"""
    print("Starting tasks...")
    # wait() needs Tasks, not coroutines - must create them manually
    tasks = {
        asyncio.create_task(quick_task(), name="quick"),
        asyncio.create_task(slow_task(), name="slow"),
        asyncio.create_task(medium_task(), name="medium")
    }

    # Wait for first completion
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

    print(f"\nFirst task completed:")
    for task in done:
        print(f"  {task.get_name()}: {task.result()}")

    print(f"\nStill pending: {len(pending)} tasks")

    # Wait for the rest
    done, pending = await asyncio.wait(pending)
    print(f"\nAll remaining tasks completed:")
    for task in done:
        print(f"  {task.get_name()}: {task.result()}")

asyncio.run(demo_wait_first_completed())
Enter fullscreen mode Exit fullscreen mode

Output:

Starting tasks...

First task completed:
  quick: quick

Still pending: 2 tasks

All remaining tasks completed:
  medium: medium
  slow: slow
Enter fullscreen mode Exit fullscreen mode

"Unlike gather(), which returns results in order, wait() returns two sets: completed tasks and pending tasks," Margaret explained. "You can specify when to return:"

import asyncio

async def task_that_fails():
    await asyncio.sleep(1)
    raise ValueError("Failed!")

async def task_that_succeeds():
    await asyncio.sleep(2)
    return "success"

async def demo_wait_first_exception():
    """Return as soon as any task raises an exception"""
    print("Starting tasks...")
    tasks = {
        asyncio.create_task(task_that_fails(), name="failing"),
        asyncio.create_task(task_that_succeeds(), name="succeeding")
    }

    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)

    print(f"\nCompleted: {len(done)}, Pending: {len(pending)}")

    for task in done:
        if task.exception():
            print(f"  {task.get_name()} raised: {task.exception()}")
        else:
            print(f"  {task.get_name()}: {task.result()}")

    # Cancel pending tasks
    for task in pending:
        task.cancel()

asyncio.run(demo_wait_first_exception())
Enter fullscreen mode Exit fullscreen mode

Output:

Starting tasks...

Completed: 1, Pending: 1
  failing raised: Failed!
Enter fullscreen mode Exit fullscreen mode

"The return_when options are:"

  • asyncio.FIRST_COMPLETED - Return when any task finishes
  • asyncio.FIRST_EXCEPTION - Return when any task raises an exception (or all complete)
  • asyncio.ALL_COMPLETED - Wait for all tasks (default)

Processing Results as They Complete

"What if I want to process each result immediately when it's ready, not wait for all of them?" Timothy asked.

import asyncio

async def fetch_book_data(book_id, delay):
    """Fetch book data with variable delays"""
    await asyncio.sleep(delay)
    return {"id": book_id, "title": f"Book {book_id}", "delay": delay}

async def demo_as_completed():
    """Process results as they complete"""
    print("Starting book fetches...")

    # Different delays - will complete in different order
    coroutines = [
        fetch_book_data(1, 3),
        fetch_book_data(2, 1),
        fetch_book_data(3, 2),
        fetch_book_data(4, 0.5),
    ]

    # Process as each completes
    for coro in asyncio.as_completed(coroutines):
        result = await coro
        print(f"  Received: {result['title']} (took {result['delay']}s)")

asyncio.run(demo_as_completed())
Enter fullscreen mode Exit fullscreen mode

Output:

Starting book fetches...
  Received: Book 4 (took 0.5s)
  Received: Book 2 (took 1s)
  Received: Book 3 (took 2s)
  Received: Book 1 (took 3s)
Enter fullscreen mode Exit fullscreen mode

"Perfect!" Timothy said. "Results come back in completion order, not submission order."

"Right," Margaret said. "as_completed() returns an iterator of awaitables. Each await gives you the next result that completes."

Task Groups: Structured Concurrency (Python 3.11+)

"There's a newer pattern that's even better for managing multiple tasks," Margaret said. "Task Groups provide structured concurrency."

import asyncio

async def check_source(name, delay):
    """Simulate checking a data source"""
    print(f"  Starting {name}...")
    await asyncio.sleep(delay)
    print(f"  Finished {name}")
    return {name: delay * 100}

async def inventory_with_taskgroup():
    """Using TaskGroup for structured concurrency"""
    print("Starting inventory check with TaskGroup...")

    async with asyncio.TaskGroup() as group:
        task1 = group.create_task(check_source("catalog", 2))
        task2 = group.create_task(check_source("reserves", 1))
        task3 = group.create_task(check_source("interlibrary", 1.5))

    # After the 'async with' block, all tasks are guaranteed complete
    print("All tasks completed!")

    # Collect results
    results = {}
    results.update(task1.result())
    results.update(task2.result())
    results.update(task3.result())

    return results

result = asyncio.run(inventory_with_taskgroup())
print(f"Final results: {result}")
Enter fullscreen mode Exit fullscreen mode

Output:

Starting inventory check with TaskGroup...
  Starting catalog...
  Starting reserves...
  Starting interlibrary...
  Finished reserves
  Finished interlibrary...
  Finished catalog
All tasks completed!
Final results: {'catalog': 200, 'reserves': 100, 'interlibrary': 150}
Enter fullscreen mode Exit fullscreen mode

"TaskGroup provides three key guarantees," Margaret explained:

  1. All tasks complete before exiting the async with block
  2. If any task fails, all other tasks are cancelled
  3. You can't accidentally forget to await a task

She demonstrated the cancellation behavior:

import asyncio

async def slow_task(name, delay):
    try:
        print(f"  {name} starting...")
        await asyncio.sleep(delay)
        print(f"  {name} completed")
        return name
    except asyncio.CancelledError:
        print(f"  {name} was cancelled!")
        raise

async def failing_task():
    await asyncio.sleep(1)
    raise ValueError("Task failed!")

async def demo_taskgroup_cancellation():
    """If one task fails, others are cancelled"""
    print("Starting tasks...")
    try:
        async with asyncio.TaskGroup() as group:
            group.create_task(slow_task("Task A", 5))
            group.create_task(slow_task("Task B", 5))
            group.create_task(failing_task())
    except ExceptionGroup as e:
        print(f"\nCaught ExceptionGroup with {len(e.exceptions)} exception(s):")
        for exc in e.exceptions:
            print(f"  - {type(exc).__name__}: {exc}")

asyncio.run(demo_taskgroup_cancellation())
Enter fullscreen mode Exit fullscreen mode

Output:

Starting tasks...
  Task A starting...
  Task B starting...
  Task A was cancelled!
  Task B was cancelled!

Caught ExceptionGroup with 1 exception(s):
  - ValueError: Task failed!
Enter fullscreen mode Exit fullscreen mode

"Notice that when the failing task raised an exception, TaskGroup automatically cancelled the other tasks," Margaret said. "This is structured concurrency—when something goes wrong, everything in that scope gets cleaned up."

Task Cancellation

"Speaking of cancellation," Timothy said, "how do I cancel a task myself?"

"Good question," Margaret replied. "Sometimes you need to manually cancel tasks—for example, if a user cancels an operation or a timeout expires."

import asyncio

async def long_running_task():
    """A task that takes a long time"""
    try:
        print("  Task starting...")
        for i in range(10):
            print(f"  Working... step {i}")
            await asyncio.sleep(1)
        print("  Task completed!")
        return "finished"
    except asyncio.CancelledError:
        print("  Task was cancelled!")
        # Do cleanup here if needed
        raise  # Re-raise to properly propagate cancellation

async def demo_cancellation():
    """Demonstrate manual task cancellation"""
    print("Starting task...")
    task = asyncio.create_task(long_running_task())

    # Let it run for 2 seconds
    await asyncio.sleep(2)

    # Cancel it
    print("\nCancelling task...")
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Confirmed: Task was cancelled\n")

asyncio.run(demo_cancellation())
Enter fullscreen mode Exit fullscreen mode

Output:

Starting task...
  Task starting...
  Working... step 0
  Working... step 1

Cancelling task...
  Task was cancelled!
Confirmed: Task was cancelled
Enter fullscreen mode Exit fullscreen mode

"The task receives a CancelledError exception when cancelled," Margaret explained. "You can catch it to do cleanup, but you should re-raise it to properly propagate the cancellation."

Protecting Tasks from Cancellation

"What if I have a critical operation that shouldn't be cancelled?" Timothy asked.

import asyncio

async def critical_operation():
    """This operation must complete"""
    print("  Starting critical operation...")
    await asyncio.sleep(2)
    print("  Critical operation completed")
    return "important result"

async def demo_shield():
    """Use shield to protect from cancellation"""
    print("Starting task with shield...")
    task = asyncio.create_task(asyncio.shield(critical_operation()))

    await asyncio.sleep(1)
    print("Attempting to cancel...")
    task.cancel()

    try:
        result = await task
        print(f"Got result despite cancellation: {result}")
    except asyncio.CancelledError:
        print("Task was cancelled")

asyncio.run(demo_shield())
Enter fullscreen mode Exit fullscreen mode

Output:

Starting task with shield...
  Starting critical operation...
Attempting to cancel...
  Critical operation completed
Got result despite cancellation: important result
Enter fullscreen mode Exit fullscreen mode

"Use asyncio.shield() sparingly," Margaret warned. "It protects the inner task from cancellation, but the outer task can still be cancelled. It's mainly for cleanup operations that must finish."

Timeouts with asyncio.wait_for()

"What if I want to automatically cancel a task if it takes too long?" Timothy asked.

import asyncio

async def slow_operation():
    """Operation that takes 5 seconds"""
    print("  Starting slow operation...")
    await asyncio.sleep(5)
    print("  Slow operation completed")
    return "success"

async def demo_timeout():
    """Use wait_for to enforce a timeout"""
    print("Starting operation with 2-second timeout...")
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=2.0)
        print(f"Result: {result}")
    except asyncio.TimeoutError:
        print("Operation timed out after 2 seconds!")

asyncio.run(demo_timeout())
Enter fullscreen mode Exit fullscreen mode

Output:

Starting operation with 2-second timeout...
  Starting slow operation...
Operation timed out after 2 seconds!
Enter fullscreen mode Exit fullscreen mode

"Perfect for when you don't want to wait forever," Margaret said. "The task is automatically cancelled if the timeout expires."

Checking Task Status

"How do I check if a task is done without awaiting it?" Timothy asked.

import asyncio

async def some_work(duration):
    await asyncio.sleep(duration)
    return f"Completed after {duration}s"

async def demo_task_status():
    """Check task status without blocking"""
    task = asyncio.create_task(some_work(2))

    print(f"Task created. Done? {task.done()}")

    await asyncio.sleep(1)
    print(f"After 1 second. Done? {task.done()}")

    await asyncio.sleep(1.5)
    print(f"After 2.5 seconds. Done? {task.done()}")

    if task.done():
        result = task.result()
        print(f"Result: {result}")

asyncio.run(demo_task_status())
Enter fullscreen mode Exit fullscreen mode

Output:

Task created. Done? False
After 1 second. Done? False
After 2.5 seconds. Done? True
Result: Completed after 2s
Enter fullscreen mode Exit fullscreen mode

"Use task.done() to check completion status," Margaret said. "Once it's done, call task.result() to get the return value or task.exception() to get any exception that was raised."

import asyncio

async def failing_work():
    await asyncio.sleep(1)
    raise ValueError("Something broke!")

async def demo_task_exception():
    """Get exception from a completed task"""
    task = asyncio.create_task(failing_work())

    try:
        await task
    except ValueError:
        pass  # Exception already happened

    if task.done():
        exc = task.exception()
        if exc:
            print(f"Task failed with: {type(exc).__name__}: {exc}")

asyncio.run(demo_task_exception())
Enter fullscreen mode Exit fullscreen mode

Output:

Task failed with: ValueError: Something broke!
Enter fullscreen mode Exit fullscreen mode

Real-World Pattern: Progress Tracking

"Let me show you a practical pattern," Margaret said. "Tracking progress across multiple concurrent tasks."

import asyncio
import random

async def process_book(book_id, progress_tracker):
    """Process a book and report progress"""
    steps = 5
    for step in range(steps):
        await asyncio.sleep(random.uniform(0.1, 0.3))
        progress = (step + 1) / steps * 100
        progress_tracker[book_id] = progress
    return f"Book {book_id} processed"

async def monitor_progress(progress_tracker, total_books):
    """Monitor overall progress"""
    while True:
        await asyncio.sleep(0.5)
        completed = sum(1 for p in progress_tracker.values() if p >= 100)
        avg_progress = sum(progress_tracker.values()) / len(progress_tracker) if progress_tracker else 0
        print(f"  Progress: {completed}/{total_books} complete, {avg_progress:.1f}% average")

        if completed == total_books:
            break

async def process_library():
    """Process multiple books concurrently with progress tracking"""
    book_ids = range(1, 6)
    progress_tracker = {book_id: 0.0 for book_id in book_ids}

    print("Starting concurrent book processing...")

    async with asyncio.TaskGroup() as group:
        # Start progress monitor
        group.create_task(monitor_progress(progress_tracker, len(book_ids)))

        # Start book processing tasks
        for book_id in book_ids:
            group.create_task(process_book(book_id, progress_tracker))

    print("All books processed!")

asyncio.run(process_library())
Enter fullscreen mode Exit fullscreen mode

Output:

Starting concurrent book processing...
  Progress: 0/5 complete, 20.0% average
  Progress: 2/5 complete, 56.0% average
  Progress: 4/5 complete, 88.0% average
  Progress: 5/5 complete, 100.0% average
All books processed!
Enter fullscreen mode Exit fullscreen mode

When to Use Each Pattern

Margaret summarized on a whiteboard:

Task Management Patterns:

  1. asyncio.create_task() - For explicit control

    • When you need to reference the task later
    • When tasks have different lifetimes
    • When you need fine-grained control
    • Always use name= parameter for debugging
  2. asyncio.gather() - For simple concurrent execution

    • When you want all results in order
    • When tasks are independent
    • When you might use return_exceptions=True
    • Simplest API for common case
    • Accepts coroutines directly (creates tasks internally)
  3. asyncio.wait() - For fine-grained control

    • When you need FIRST_COMPLETED or FIRST_EXCEPTION behavior
    • When you want done/pending sets instead of ordered results
    • When you need more control than gather() provides
    • Requires Task objects (must create manually)
  4. asyncio.as_completed() - For streaming results

    • When you want to process results as they arrive
    • When order of completion matters more than submission order
    • Good for showing progress
  5. asyncio.TaskGroup() - For structured concurrency (Python 3.11+)

    • When you want automatic cleanup on failure
    • When tasks should be treated as a unit
    • When you want guarantees about completion
    • Preferred for new code when available
  6. asyncio.wait_for() - For timeout enforcement

    • When operations might hang
    • When you have time budgets
    • Wrap any awaitable with a timeout
  7. asyncio.shield() - For critical operations

    • Rare: only for operations that must complete
    • Typically for cleanup or commit operations
    • Use sparingly

The Takeaway

Timothy closed his laptop, his inventory system now running three times faster.

Key insights:

Coroutines are recipes; Tasks schedule them to run on the event loop

asyncio.create_task() immediately schedules a coroutine for execution

Always name tasks with name= parameter for easier debugging

Keep references to background tasks or they may be garbage collected; the event loop only holds weak references

asyncio.gather() runs multiple coroutines concurrently and collects results in order

gather() accepts coroutines directly and creates tasks internally; wait() requires Task objects

Tasks complete even if gather() raises an exception; use return_exceptions=True to handle failures individually

asyncio.wait() provides fine-grained control with FIRST_COMPLETED, FIRST_EXCEPTION, and ALL_COMPLETED options

asyncio.wait() returns done/pending sets, not ordered results like gather()

asyncio.as_completed() lets you process results as they arrive, not in submission order

TaskGroup provides structured concurrency with automatic cancellation on failure (Python 3.11+)

Use task.cancel() for manual cancellation; tasks receive CancelledError

Always re-raise CancelledError after cleanup to properly propagate cancellation

asyncio.shield() protects critical operations from cancellation (use sparingly)

asyncio.wait_for() automatically cancels tasks that exceed a timeout

Check task status with task.done(), get results with task.result(), get exceptions with task.exception()

TaskGroups raise ExceptionGroup containing all exceptions from failed tasks

Choose the pattern based on your needs: create_task() for control, gather() for simplicity, wait() for flexibility, TaskGroup for safety

Managing Multiple async Operations

Margaret and Timothy had transformed his sequential code into truly concurrent execution. The library's inventory system now checked all sources simultaneously, and Timothy understood how to manage multiple async operations, handle their failures, enforce timeouts, and choose the right pattern for each situation.

As Timothy reviewed the code, he realized that async/await wasn't just about making code non-blocking—it was about orchestrating multiple concurrent operations, managing their lifecycles, handling failures gracefully, and ensuring critical operations complete even when things go wrong. Tasks were the bridge between writing async code and truly running it concurrently.


Aaron Rose is a software engineer and technology writer at tech-reader.blog and the author of Think Like a Genius.

Top comments (0)