Timothy stared at his screen, frustrated. The library's new automated inventory system needed to check the status of books across three different sources: the main catalog, the reserve collection, and the inter-library loan system. He'd written async functions for each source, but his code was taking forever to run.
"Margaret, I don't understand," Timothy said, pulling up his code. "I'm using async/await, but it's still running everything sequentially. It's taking just as long as synchronous code!"
import asyncio
import time
async def check_main_catalog():
"""Check main catalog - takes 2 seconds"""
print(" Checking main catalog...")
await asyncio.sleep(2)
return {"main": 1247}
async def check_reserves():
"""Check reserve collection - takes 2 seconds"""
print(" Checking reserves...")
await asyncio.sleep(2)
return {"reserves": 89}
async def check_interlibrary():
"""Check inter-library loans - takes 2 seconds"""
print(" Checking inter-library...")
await asyncio.sleep(2)
return {"interlibrary": 34}
async def get_inventory_slow():
"""Timothy's sequential approach"""
print("Starting inventory check (slow way)...")
start = time.time()
main = await check_main_catalog()
reserves = await check_reserves()
interlibrary = await check_interlibrary()
elapsed = time.time() - start
print(f"Completed in {elapsed:.1f} seconds")
return {**main, **reserves, **interlibrary}
asyncio.run(get_inventory_slow())
Output:
Starting inventory check (slow way)...
Checking main catalog...
Checking reserves...
Checking inter-library...
Completed in 6.0 seconds
Margaret looked over his shoulder. "You're using await on each call, one after another. That means you're waiting for each to finish before starting the next. You're not running them concurrently—you're running them sequentially."
"But they're all async functions!" Timothy protested.
"Being async-capable doesn't make them automatically concurrent," Margaret explained. "You need to explicitly schedule them to run together. That's what Tasks are for."
Understanding Coroutines vs Tasks
"Let me show you the difference," Margaret said, opening a new file.
import asyncio
async def my_coroutine():
"""Just a coroutine function"""
print(" Coroutine running")
await asyncio.sleep(1)
return "done"
async def demo_coroutine_vs_task():
print("Creating a coroutine object:")
coro = my_coroutine()
print(f" Type: {type(coro)}")
print(f" It hasn't run yet!")
print("\nCreating a Task:")
task = asyncio.create_task(my_coroutine())
print(f" Type: {type(task)}")
print(f" It's already scheduled and running!")
# Give it time to complete
result = await task
print(f" Result: {result}")
# Clean up the unused coroutine
coro.close()
asyncio.run(demo_coroutine_vs_task())
Output:
Creating a coroutine object:
Type: <class 'coroutine'>
It hasn't run yet!
Creating a Task:
Type: <class 'Task'>
It's already scheduled and running!
Coroutine running
Result: done
"See the difference?" Margaret asked. "A coroutine is just a recipe. Calling my_coroutine() doesn't run anything—it just gives you a coroutine object. But asyncio.create_task() wraps that coroutine in a Task, which immediately schedules it on the event loop to run."
Running Tasks Concurrently
Margaret refactored Timothy's inventory code:
import asyncio
import time
async def check_main_catalog():
"""Check main catalog - takes 2 seconds"""
print(" Checking main catalog...")
await asyncio.sleep(2)
return {"main": 1247}
async def check_reserves():
"""Check reserve collection - takes 2 seconds"""
print(" Checking reserves...")
await asyncio.sleep(2)
return {"reserves": 89}
async def check_interlibrary():
"""Check inter-library loans - takes 2 seconds"""
print(" Checking inter-library...")
await asyncio.sleep(2)
return {"interlibrary": 34}
async def get_inventory_fast():
"""Concurrent approach with Tasks"""
print("Starting inventory check (concurrent way)...")
start = time.time()
# Create tasks - they start running immediately!
task1 = asyncio.create_task(check_main_catalog(), name="catalog")
task2 = asyncio.create_task(check_reserves(), name="reserves")
task3 = asyncio.create_task(check_interlibrary(), name="interlibrary")
# Now wait for all of them to complete
main = await task1
reserves = await task2
interlibrary = await task3
elapsed = time.time() - start
print(f"Completed in {elapsed:.1f} seconds")
return {**main, **reserves, **interlibrary}
asyncio.run(get_inventory_fast())
Output:
Starting inventory check (concurrent way)...
Checking main catalog...
Checking reserves...
Checking inter-library...
Completed in 2.0 seconds
Timothy's eyes widened. "Two seconds instead of six! They all ran at the same time!"
"Exactly," Margaret said. "When you call create_task(), the event loop immediately starts executing that coroutine. All three tasks begin their work concurrently. Then we await each task to collect the results."
"Notice I added name= parameters," Margaret added. "That helps with debugging—you can see which task is which in error messages and when inspecting running tasks."
Background Tasks and Garbage Collection
"One important warning," Margaret said, leaning forward. "If you create a task but don't keep a reference to it or await it, Python might garbage collect it. The event loop only keeps weak references to tasks, so if your code doesn't hold a strong reference, the garbage collector can remove the task before it completes."
import asyncio
async def background_work(task_id):
"""Some background work"""
print(f" Task {task_id} starting...")
await asyncio.sleep(2)
print(f" Task {task_id} done!")
async def demo_garbage_collection_problem():
"""BAD: Task might be garbage collected"""
print("Creating background task (no reference)...")
asyncio.create_task(background_work(1)) # Might be GC'd!
print("Doing other work...")
await asyncio.sleep(0.5)
print("Main work done")
# Background task might not complete!
async def demo_garbage_collection_solution():
"""GOOD: Keep references to background tasks"""
print("\nCreating background tasks (with references)...")
background_tasks = set()
for i in range(3):
task = asyncio.create_task(background_work(i), name=f"bg-{i}")
background_tasks.add(task)
# Remove from set when done
task.add_done_callback(background_tasks.discard)
print("Doing other work...")
await asyncio.sleep(0.5)
print("Waiting for background tasks...")
await asyncio.sleep(2)
print("All done!")
asyncio.run(demo_garbage_collection_problem())
asyncio.run(demo_garbage_collection_solution())
Output:
Creating background task (no reference)...
Doing other work...
Main work done
Creating background tasks (with references)...
Task 0 starting...
Task 1 starting...
Task 2 starting...
Doing other work...
Waiting for background tasks...
Task 0 done!
Task 1 done!
Task 2 done!
All done!
"Always keep references to tasks you want to complete," Margaret emphasized. "The pattern with the set and add_done_callback() is common for managing background tasks."
Using asyncio.gather() for Cleaner Code
"There's an even cleaner way," Margaret continued. "When you want to run multiple coroutines concurrently and wait for all of them, use asyncio.gather()."
import asyncio
import time
async def check_main_catalog():
print(" Checking main catalog...")
await asyncio.sleep(2)
return {"main": 1247}
async def check_reserves():
print(" Checking reserves...")
await asyncio.sleep(2)
return {"reserves": 89}
async def check_interlibrary():
print(" Checking inter-library...")
await asyncio.sleep(2)
return {"interlibrary": 34}
async def get_inventory_with_gather():
"""Using gather() for cleaner concurrent execution"""
print("Starting inventory check (gather)...")
start = time.time()
# Pass coroutines directly to gather - it creates tasks automatically
results = await asyncio.gather(
check_main_catalog(),
check_reserves(),
check_interlibrary()
)
elapsed = time.time() - start
print(f"Completed in {elapsed:.1f} seconds")
# Combine the results
combined = {}
for result in results:
combined.update(result)
return combined
result = asyncio.run(get_inventory_with_gather())
print(f"Final result: {result}")
Output:
Starting inventory check (gather)...
Checking main catalog...
Checking reserves...
Checking inter-library...
Completed in 2.0 seconds
Final result: {'main': 1247, 'reserves': 89, 'interlibrary': 34}
"Much cleaner!" Timothy said. "No manual task creation, and the results come back as a list in order."
"Right," Margaret said. "gather() handles the task creation for you and returns results in the same order you passed the coroutines. That's a key convenience—you can pass coroutines directly, and gather() wraps them in tasks internally."
Exception Handling with gather()
"But what happens if one of the tasks fails?" Timothy asked.
Margaret demonstrated:
import asyncio
async def task_succeeds():
await asyncio.sleep(1)
return "success"
async def task_fails():
await asyncio.sleep(0.5)
raise ValueError("Something went wrong!")
async def task_also_succeeds():
await asyncio.sleep(1)
return "also success"
async def demo_gather_exception():
"""What happens when a task raises an exception?"""
print("Running tasks with gather...")
try:
results = await asyncio.gather(
task_succeeds(),
task_fails(),
task_also_succeeds()
)
print(f"Results: {results}")
except ValueError as e:
print(f"Caught exception: {e}")
print("Note: Other tasks still completed!")
asyncio.run(demo_gather_exception())
Output:
Running tasks with gather...
Caught exception: Something went wrong!
Note: Other tasks still completed!
"By default, gather() propagates the first exception it encounters," Margaret explained. "But here's the important part: the other tasks still run to completion. The exception just stops gather() from returning their results."
She showed an alternative:
import asyncio
async def task_succeeds():
await asyncio.sleep(1)
return "success"
async def task_fails():
await asyncio.sleep(0.5)
raise ValueError("Something went wrong!")
async def task_also_succeeds():
await asyncio.sleep(1)
return "also success"
async def demo_gather_return_exceptions():
"""Collect exceptions as values instead of raising"""
print("Running tasks with return_exceptions=True...")
results = await asyncio.gather(
task_succeeds(),
task_fails(),
task_also_succeeds(),
return_exceptions=True
)
print("Results:")
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f" Task {i}: FAILED with {result}")
else:
print(f" Task {i}: {result}")
asyncio.run(demo_gather_return_exceptions())
Output:
Running tasks with return_exceptions=True...
Results:
Task 0: success
Task 1: FAILED with Something went wrong!
Task 2: also success
"With return_exceptions=True, exceptions become part of the results list instead of being raised," Margaret said. "This is useful when you want to handle each failure individually."
Using asyncio.wait() for Fine-Grained Control
"Sometimes you need more control than gather() provides," Margaret said. "That's where asyncio.wait() comes in. Unlike gather(), which accepts coroutines directly, wait() requires Task objects—you must create them manually first."
import asyncio
async def quick_task():
await asyncio.sleep(1)
return "quick"
async def slow_task():
await asyncio.sleep(3)
return "slow"
async def medium_task():
await asyncio.sleep(2)
return "medium"
async def demo_wait_first_completed():
"""Process results as they complete"""
print("Starting tasks...")
# wait() needs Tasks, not coroutines - must create them manually
tasks = {
asyncio.create_task(quick_task(), name="quick"),
asyncio.create_task(slow_task(), name="slow"),
asyncio.create_task(medium_task(), name="medium")
}
# Wait for first completion
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"\nFirst task completed:")
for task in done:
print(f" {task.get_name()}: {task.result()}")
print(f"\nStill pending: {len(pending)} tasks")
# Wait for the rest
done, pending = await asyncio.wait(pending)
print(f"\nAll remaining tasks completed:")
for task in done:
print(f" {task.get_name()}: {task.result()}")
asyncio.run(demo_wait_first_completed())
Output:
Starting tasks...
First task completed:
quick: quick
Still pending: 2 tasks
All remaining tasks completed:
medium: medium
slow: slow
"Unlike gather(), which returns results in order, wait() returns two sets: completed tasks and pending tasks," Margaret explained. "You can specify when to return:"
import asyncio
async def task_that_fails():
await asyncio.sleep(1)
raise ValueError("Failed!")
async def task_that_succeeds():
await asyncio.sleep(2)
return "success"
async def demo_wait_first_exception():
"""Return as soon as any task raises an exception"""
print("Starting tasks...")
tasks = {
asyncio.create_task(task_that_fails(), name="failing"),
asyncio.create_task(task_that_succeeds(), name="succeeding")
}
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
print(f"\nCompleted: {len(done)}, Pending: {len(pending)}")
for task in done:
if task.exception():
print(f" {task.get_name()} raised: {task.exception()}")
else:
print(f" {task.get_name()}: {task.result()}")
# Cancel pending tasks
for task in pending:
task.cancel()
asyncio.run(demo_wait_first_exception())
Output:
Starting tasks...
Completed: 1, Pending: 1
failing raised: Failed!
"The return_when options are:"
-
asyncio.FIRST_COMPLETED- Return when any task finishes -
asyncio.FIRST_EXCEPTION- Return when any task raises an exception (or all complete) -
asyncio.ALL_COMPLETED- Wait for all tasks (default)
Processing Results as They Complete
"What if I want to process each result immediately when it's ready, not wait for all of them?" Timothy asked.
import asyncio
async def fetch_book_data(book_id, delay):
"""Fetch book data with variable delays"""
await asyncio.sleep(delay)
return {"id": book_id, "title": f"Book {book_id}", "delay": delay}
async def demo_as_completed():
"""Process results as they complete"""
print("Starting book fetches...")
# Different delays - will complete in different order
coroutines = [
fetch_book_data(1, 3),
fetch_book_data(2, 1),
fetch_book_data(3, 2),
fetch_book_data(4, 0.5),
]
# Process as each completes
for coro in asyncio.as_completed(coroutines):
result = await coro
print(f" Received: {result['title']} (took {result['delay']}s)")
asyncio.run(demo_as_completed())
Output:
Starting book fetches...
Received: Book 4 (took 0.5s)
Received: Book 2 (took 1s)
Received: Book 3 (took 2s)
Received: Book 1 (took 3s)
"Perfect!" Timothy said. "Results come back in completion order, not submission order."
"Right," Margaret said. "as_completed() returns an iterator of awaitables. Each await gives you the next result that completes."
Task Groups: Structured Concurrency (Python 3.11+)
"There's a newer pattern that's even better for managing multiple tasks," Margaret said. "Task Groups provide structured concurrency."
import asyncio
async def check_source(name, delay):
"""Simulate checking a data source"""
print(f" Starting {name}...")
await asyncio.sleep(delay)
print(f" Finished {name}")
return {name: delay * 100}
async def inventory_with_taskgroup():
"""Using TaskGroup for structured concurrency"""
print("Starting inventory check with TaskGroup...")
async with asyncio.TaskGroup() as group:
task1 = group.create_task(check_source("catalog", 2))
task2 = group.create_task(check_source("reserves", 1))
task3 = group.create_task(check_source("interlibrary", 1.5))
# After the 'async with' block, all tasks are guaranteed complete
print("All tasks completed!")
# Collect results
results = {}
results.update(task1.result())
results.update(task2.result())
results.update(task3.result())
return results
result = asyncio.run(inventory_with_taskgroup())
print(f"Final results: {result}")
Output:
Starting inventory check with TaskGroup...
Starting catalog...
Starting reserves...
Starting interlibrary...
Finished reserves
Finished interlibrary...
Finished catalog
All tasks completed!
Final results: {'catalog': 200, 'reserves': 100, 'interlibrary': 150}
"TaskGroup provides three key guarantees," Margaret explained:
- All tasks complete before exiting the
async withblock - If any task fails, all other tasks are cancelled
- You can't accidentally forget to await a task
She demonstrated the cancellation behavior:
import asyncio
async def slow_task(name, delay):
try:
print(f" {name} starting...")
await asyncio.sleep(delay)
print(f" {name} completed")
return name
except asyncio.CancelledError:
print(f" {name} was cancelled!")
raise
async def failing_task():
await asyncio.sleep(1)
raise ValueError("Task failed!")
async def demo_taskgroup_cancellation():
"""If one task fails, others are cancelled"""
print("Starting tasks...")
try:
async with asyncio.TaskGroup() as group:
group.create_task(slow_task("Task A", 5))
group.create_task(slow_task("Task B", 5))
group.create_task(failing_task())
except ExceptionGroup as e:
print(f"\nCaught ExceptionGroup with {len(e.exceptions)} exception(s):")
for exc in e.exceptions:
print(f" - {type(exc).__name__}: {exc}")
asyncio.run(demo_taskgroup_cancellation())
Output:
Starting tasks...
Task A starting...
Task B starting...
Task A was cancelled!
Task B was cancelled!
Caught ExceptionGroup with 1 exception(s):
- ValueError: Task failed!
"Notice that when the failing task raised an exception, TaskGroup automatically cancelled the other tasks," Margaret said. "This is structured concurrency—when something goes wrong, everything in that scope gets cleaned up."
Task Cancellation
"Speaking of cancellation," Timothy said, "how do I cancel a task myself?"
"Good question," Margaret replied. "Sometimes you need to manually cancel tasks—for example, if a user cancels an operation or a timeout expires."
import asyncio
async def long_running_task():
"""A task that takes a long time"""
try:
print(" Task starting...")
for i in range(10):
print(f" Working... step {i}")
await asyncio.sleep(1)
print(" Task completed!")
return "finished"
except asyncio.CancelledError:
print(" Task was cancelled!")
# Do cleanup here if needed
raise # Re-raise to properly propagate cancellation
async def demo_cancellation():
"""Demonstrate manual task cancellation"""
print("Starting task...")
task = asyncio.create_task(long_running_task())
# Let it run for 2 seconds
await asyncio.sleep(2)
# Cancel it
print("\nCancelling task...")
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Confirmed: Task was cancelled\n")
asyncio.run(demo_cancellation())
Output:
Starting task...
Task starting...
Working... step 0
Working... step 1
Cancelling task...
Task was cancelled!
Confirmed: Task was cancelled
"The task receives a CancelledError exception when cancelled," Margaret explained. "You can catch it to do cleanup, but you should re-raise it to properly propagate the cancellation."
Protecting Tasks from Cancellation
"What if I have a critical operation that shouldn't be cancelled?" Timothy asked.
import asyncio
async def critical_operation():
"""This operation must complete"""
print(" Starting critical operation...")
await asyncio.sleep(2)
print(" Critical operation completed")
return "important result"
async def demo_shield():
"""Use shield to protect from cancellation"""
print("Starting task with shield...")
task = asyncio.create_task(asyncio.shield(critical_operation()))
await asyncio.sleep(1)
print("Attempting to cancel...")
task.cancel()
try:
result = await task
print(f"Got result despite cancellation: {result}")
except asyncio.CancelledError:
print("Task was cancelled")
asyncio.run(demo_shield())
Output:
Starting task with shield...
Starting critical operation...
Attempting to cancel...
Critical operation completed
Got result despite cancellation: important result
"Use asyncio.shield() sparingly," Margaret warned. "It protects the inner task from cancellation, but the outer task can still be cancelled. It's mainly for cleanup operations that must finish."
Timeouts with asyncio.wait_for()
"What if I want to automatically cancel a task if it takes too long?" Timothy asked.
import asyncio
async def slow_operation():
"""Operation that takes 5 seconds"""
print(" Starting slow operation...")
await asyncio.sleep(5)
print(" Slow operation completed")
return "success"
async def demo_timeout():
"""Use wait_for to enforce a timeout"""
print("Starting operation with 2-second timeout...")
try:
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
print(f"Result: {result}")
except asyncio.TimeoutError:
print("Operation timed out after 2 seconds!")
asyncio.run(demo_timeout())
Output:
Starting operation with 2-second timeout...
Starting slow operation...
Operation timed out after 2 seconds!
"Perfect for when you don't want to wait forever," Margaret said. "The task is automatically cancelled if the timeout expires."
Checking Task Status
"How do I check if a task is done without awaiting it?" Timothy asked.
import asyncio
async def some_work(duration):
await asyncio.sleep(duration)
return f"Completed after {duration}s"
async def demo_task_status():
"""Check task status without blocking"""
task = asyncio.create_task(some_work(2))
print(f"Task created. Done? {task.done()}")
await asyncio.sleep(1)
print(f"After 1 second. Done? {task.done()}")
await asyncio.sleep(1.5)
print(f"After 2.5 seconds. Done? {task.done()}")
if task.done():
result = task.result()
print(f"Result: {result}")
asyncio.run(demo_task_status())
Output:
Task created. Done? False
After 1 second. Done? False
After 2.5 seconds. Done? True
Result: Completed after 2s
"Use task.done() to check completion status," Margaret said. "Once it's done, call task.result() to get the return value or task.exception() to get any exception that was raised."
import asyncio
async def failing_work():
await asyncio.sleep(1)
raise ValueError("Something broke!")
async def demo_task_exception():
"""Get exception from a completed task"""
task = asyncio.create_task(failing_work())
try:
await task
except ValueError:
pass # Exception already happened
if task.done():
exc = task.exception()
if exc:
print(f"Task failed with: {type(exc).__name__}: {exc}")
asyncio.run(demo_task_exception())
Output:
Task failed with: ValueError: Something broke!
Real-World Pattern: Progress Tracking
"Let me show you a practical pattern," Margaret said. "Tracking progress across multiple concurrent tasks."
import asyncio
import random
async def process_book(book_id, progress_tracker):
"""Process a book and report progress"""
steps = 5
for step in range(steps):
await asyncio.sleep(random.uniform(0.1, 0.3))
progress = (step + 1) / steps * 100
progress_tracker[book_id] = progress
return f"Book {book_id} processed"
async def monitor_progress(progress_tracker, total_books):
"""Monitor overall progress"""
while True:
await asyncio.sleep(0.5)
completed = sum(1 for p in progress_tracker.values() if p >= 100)
avg_progress = sum(progress_tracker.values()) / len(progress_tracker) if progress_tracker else 0
print(f" Progress: {completed}/{total_books} complete, {avg_progress:.1f}% average")
if completed == total_books:
break
async def process_library():
"""Process multiple books concurrently with progress tracking"""
book_ids = range(1, 6)
progress_tracker = {book_id: 0.0 for book_id in book_ids}
print("Starting concurrent book processing...")
async with asyncio.TaskGroup() as group:
# Start progress monitor
group.create_task(monitor_progress(progress_tracker, len(book_ids)))
# Start book processing tasks
for book_id in book_ids:
group.create_task(process_book(book_id, progress_tracker))
print("All books processed!")
asyncio.run(process_library())
Output:
Starting concurrent book processing...
Progress: 0/5 complete, 20.0% average
Progress: 2/5 complete, 56.0% average
Progress: 4/5 complete, 88.0% average
Progress: 5/5 complete, 100.0% average
All books processed!
When to Use Each Pattern
Margaret summarized on a whiteboard:
Task Management Patterns:
-
asyncio.create_task()- For explicit control- When you need to reference the task later
- When tasks have different lifetimes
- When you need fine-grained control
- Always use
name=parameter for debugging
-
asyncio.gather()- For simple concurrent execution- When you want all results in order
- When tasks are independent
- When you might use
return_exceptions=True - Simplest API for common case
- Accepts coroutines directly (creates tasks internally)
-
asyncio.wait()- For fine-grained control- When you need FIRST_COMPLETED or FIRST_EXCEPTION behavior
- When you want done/pending sets instead of ordered results
- When you need more control than gather() provides
- Requires Task objects (must create manually)
-
asyncio.as_completed()- For streaming results- When you want to process results as they arrive
- When order of completion matters more than submission order
- Good for showing progress
-
asyncio.TaskGroup()- For structured concurrency (Python 3.11+)- When you want automatic cleanup on failure
- When tasks should be treated as a unit
- When you want guarantees about completion
- Preferred for new code when available
-
asyncio.wait_for()- For timeout enforcement- When operations might hang
- When you have time budgets
- Wrap any awaitable with a timeout
-
asyncio.shield()- For critical operations- Rare: only for operations that must complete
- Typically for cleanup or commit operations
- Use sparingly
The Takeaway
Timothy closed his laptop, his inventory system now running three times faster.
Key insights:
Coroutines are recipes; Tasks schedule them to run on the event loop
asyncio.create_task() immediately schedules a coroutine for execution
Always name tasks with name= parameter for easier debugging
Keep references to background tasks or they may be garbage collected; the event loop only holds weak references
asyncio.gather() runs multiple coroutines concurrently and collects results in order
gather() accepts coroutines directly and creates tasks internally; wait() requires Task objects
Tasks complete even if gather() raises an exception; use return_exceptions=True to handle failures individually
asyncio.wait() provides fine-grained control with FIRST_COMPLETED, FIRST_EXCEPTION, and ALL_COMPLETED options
asyncio.wait() returns done/pending sets, not ordered results like gather()
asyncio.as_completed() lets you process results as they arrive, not in submission order
TaskGroup provides structured concurrency with automatic cancellation on failure (Python 3.11+)
Use task.cancel() for manual cancellation; tasks receive CancelledError
Always re-raise CancelledError after cleanup to properly propagate cancellation
asyncio.shield() protects critical operations from cancellation (use sparingly)
asyncio.wait_for() automatically cancels tasks that exceed a timeout
Check task status with task.done(), get results with task.result(), get exceptions with task.exception()
TaskGroups raise ExceptionGroup containing all exceptions from failed tasks
Choose the pattern based on your needs: create_task() for control, gather() for simplicity, wait() for flexibility, TaskGroup for safety
Managing Multiple async Operations
Margaret and Timothy had transformed his sequential code into truly concurrent execution. The library's inventory system now checked all sources simultaneously, and Timothy understood how to manage multiple async operations, handle their failures, enforce timeouts, and choose the right pattern for each situation.
As Timothy reviewed the code, he realized that async/await wasn't just about making code non-blocking—it was about orchestrating multiple concurrent operations, managing their lifecycles, handling failures gracefully, and ensuring critical operations complete even when things go wrong. Tasks were the bridge between writing async code and truly running it concurrently.
Aaron Rose is a software engineer and technology writer at tech-reader.blog and the author of Think Like a Genius.
Top comments (0)