This is the second post in the series Coroutine, IO bound and Asyncio for AI.
Introduction
I explained coroutines and asyncio in the previous post: https://dev.to/jun07/coroutine-series-1-what-are-coroutine-asyncio-io-bound-2gh5
In this post, I’m going to show some functions of the asyncio library that are very useful for production-level development.
Coroutine functions
I briefly introduced simple coroutine functions such as async, await, gather, create_task. There are more functions in the asyncio library and you can do so many things with these powerful tools.
gather and TaskGroup
I demonstrated how to use gather but I didn’t explain much about it. After you make some coroutine functions, you have to make them coordinate well. Often, several coroutine functions should be executed asynchronously. In that case, you can utilize the functions called gather or TaskGroup in asyncio. But are they exactly the same?
Actually, they have a very crucial difference. When one of the tasks in gather fails, the other tasks keep running. On the other hand, TaskGroup cancels the remaining tasks immediately. Look at this example.
import asyncio
import time
async def example_task(name, delay, should_fail=False):
print(f"Task {name} starting (delay={delay}s)")
await asyncio.sleep(delay)
if should_fail:
print(f"Task {name} failing")
raise ValueError(f"Task {name} failed")
print(f"Task {name} finished")
return f"Result {name}"
async def demonstrate_gather():
print("\n--- Demonstrating asyncio.gather ---")
start_time = time.time()
# Create tasks: B will fail
tasks = [
example_task("A", 1),
example_task("B", 2, should_fail=True),
example_task("C", 3)
]
# asyncio.gather without return_exceptions=True raises the first exception immediately
# BUT it does NOT cancel the other tasks. They keep running in the background.
print("Starting gather with a failing task...")
try:
results = await asyncio.gather(*tasks)
except Exception as e:
print(f"Gather caught exception: {e}")
await asyncio.sleep(2)
print(f"Gather finished (after waiting). Note if other tasks continued printing above.")
print(f"Gather took: {time.time() - start_time:.2f}s")
async def demonstrate_task_group():
print("\n--- Demonstrating asyncio.TaskGroup ---")
start_time = time.time()
# TaskGroup cancels all other tasks immediately when one fails
print("Starting TaskGroup with a failing task...")
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(example_task("X", 1))
tg.create_task(example_task("Y", 2, should_fail=True))
tg.create_task(example_task("Z", 3))
except Exception as e:
print(f"TaskGroup caught exception: {type(e).__name__}: {e}")
print(f"TaskGroup finished. Other tasks should have been cancelled (no 'finished' print for Z).")
print(f"TaskGroup took: {time.time() - start_time:.2f}s")
async def main():
await demonstrate_gather()
await demonstrate_task_group()
if __name__ == "__main__":
asyncio.run(main())
Output
--- Demonstrating asyncio.gather ---
Starting gather with a failing task...
Task A starting (delay=1s)
Task B starting (delay=2s)
Task C starting (delay=3s)
Task A finished
Task B failing
Gather caught exception: Task B failed
Task C finished
Gather finished (after waiting). Note if other tasks continued printing above.
Gather took: 4.00s
--- Demonstrating asyncio.TaskGroup ---
Starting TaskGroup with a failing task...
Task X starting (delay=1s)
Task Y starting (delay=2s)
Task Z starting (delay=3s)
Task X finished
Task Y failing
TaskGroup caught exception: ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
TaskGroup finished. Other tasks should have been cancelled (no 'finished' print for Z).
TaskGroup took: 1.99s
As you can see, gather keeps running even if one of the tasks fails while TaskGroup raises an exception and cancels the remaining tasks immediately.
Differences: TaskGroup vs. gather
Cancellation: In
asyncio.gather(), if one task fails, the others continue running (unless explicitly cancelled).TaskGroupcancels siblings immediately upon the first failure, conserving resources.Error Reporting:
gather()returns a list of results or raises the first exception.TaskGroupraises anExceptionGroupcontaining all exceptions that occurred, providing a complete picture of the failure state.
Why is TaskGroup favored over gather?
TaskGroup enforces a "parent-child" relationship where the parent cannot exit until all children are finished. This eliminates "orphaned tasks"—background processes that silently keep running (and consuming resources) after your main function has finished or crashed. That is why modern high-reliability Python architecture favors Structured Concurrency(asyncio.TaskGroup) over unstructured create_task + gather.
By design, TaskGroup operates on a "fail-fast" principle. If one task fails, the group immediately sends a cancellation signal to all other running tasks in that group.
But what if I don’t want to cancel the whole task group even if one fails? Should I just use just gather? In that case, you can still utilize TaskGroup by handling exceptions with try and except blocks inside the individual tasks.
Note: asyncio.TaskGroup is available in Python 3.11+. For older versions, you may need third-party libraries or stick to gather.
Synchronization Primitives: Queue and Event
What if I want to run some tasks synchronously in certain parts? To do this, asyncio provides synchronization primitives.
asyncio.Queue(The Conveyor Belt): Used for Data Transfer. It allows you to pass data safely between tasks. It handles "backpressure" (slowing down the producer if the consumer is full).asyncio.Event(The Traffic Light): Used for Signaling. It holds a simple boolean state (True/False). Tasks "wait" for the light to turn green before proceeding.
Asyncio.Queue
As you know, a Queue is a First-In-First-Out(FIFO) data structure. But what is asyncio.Queue then? It is designed to decouple tasks that produce work from tasks that perform work.
What is Queue used for?
Load Balancing: If the producer is faster than the consumer, the queue buffers the excess.
Flow Control: If you set a
maxsize, the producer will pause (await put()) until the consumer clears space. This prevents memory explosions.
Before getting into the example code, here are some key methods:
Key Methods
await q.put(item): Add an item. Blocks if the queue is full.await q.get(): Remove and return an item. Blocks if the queue is empty.q.task_done(): Tells the queue "I finished processing the item I just got."await q.join(): Blocks until all items in the queue have been processed (marked viatask_done).
Example
Input
import asyncio
import logging
import random
import time
logging.basicConfig(
level=logging.INFO,
format='%(relativeCreated)d ms - [%(levelname)s] - %(message)s',
)
async def producer(name: str, queue: asyncio.Queue, items_to_produce: int):
"""Generates work items and puts them into the queue."""
for i in range(items_to_produce):
item = f"{name}-item-{i}"
# Simulate varying production time
await asyncio.sleep(random.uniform(0.2, 0.8))
await queue.put(item)
logging.info(f"[{name}] Added: {item} (Q size: {queue.qsize()})")
async def consumer(name: str, queue: asyncio.Queue):
"""Processes items from the queue continuously."""
while True:
# Waits here if queue is empty
item = await queue.get()
# Simulate processing work
logging.info(f"[{name}] Processing {item}...")
await asyncio.sleep(random.uniform(1.0, 2.0))
# Signal that this specific item is fully processed
queue.task_done()
logging.info(f"[{name}] Finished {item}")
async def main():
# maxsize=3 to easily demonstrate backpressure (producers waiting for consumers)
q = asyncio.Queue(maxsize=5)
logging.info("--- Starting Pipeline ---")
# 1. Start Consumers as daemon tasks
# They run forever until cancelled.
consumers = asyncio.create_task(consumer(f"Consumer-0", q))
# 2. Run Producers to completion
# We use TaskGroup to wait for all producers to finish.
async with asyncio.TaskGroup() as tg:
tg.create_task(producer("Producer-A", q, 5))
tg.create_task(producer("Producer-B", q, 5))
# Total 10 items will be produced
logging.info("--- All producers finished. Waiting for processing... ---")
# 3. Wait for the queue to be fully processed
# execution blocks here until q.task_done() is called for every item
await q.join()
logging.info("--- All work completed. helper tasks cancelled ---")
# 4. Cancel the consumer tasks since they are permanent loops
consumers.cancel()
if __name__ == "__main__":
asyncio.run(main())
Output
48 ms - [INFO] - --- Starting Pipeline ---
608 ms - [INFO] - [Producer-A] Added: Producer-A-item-0 (Q size: 1)
609 ms - [INFO] - [Consumer-0] Processing Producer-A-item-0...
788 ms - [INFO] - [Producer-B] Added: Producer-B-item-0 (Q size: 1)
1079 ms - [INFO] - [Producer-A] Added: Producer-A-item-1 (Q size: 2)
1325 ms - [INFO] - [Producer-A] Added: Producer-A-item-2 (Q size: 3)
1325 ms - [INFO] - [Producer-B] Added: Producer-B-item-1 (Q size: 4)
1587 ms - [INFO] - [Producer-B] Added: Producer-B-item-2 (Q size: 5)
1850 ms - [INFO] - [Consumer-0] Finished Producer-A-item-0
1851 ms - [INFO] - [Consumer-0] Processing Producer-B-item-0...
1851 ms - [INFO] - [Producer-A] Added: Producer-A-item-3 (Q size: 5)
3511 ms - [INFO] - [Consumer-0] Finished Producer-B-item-0
3511 ms - [INFO] - [Consumer-0] Processing Producer-A-item-1...
3512 ms - [INFO] - [Producer-B] Added: Producer-B-item-3 (Q size: 5)
4822 ms - [INFO] - [Consumer-0] Finished Producer-A-item-1
4822 ms - [INFO] - [Consumer-0] Processing Producer-A-item-2...
4822 ms - [INFO] - [Producer-A] Added: Producer-A-item-4 (Q size: 5)
6038 ms - [INFO] - [Consumer-0] Finished Producer-A-item-2
6038 ms - [INFO] - [Consumer-0] Processing Producer-B-item-1...
6038 ms - [INFO] - [Producer-B] Added: Producer-B-item-4 (Q size: 5)
6038 ms - [INFO] - --- All producers finished. Waiting for processing... ---
7538 ms - [INFO] - [Consumer-0] Finished Producer-B-item-1
7538 ms - [INFO] - [Consumer-0] Processing Producer-B-item-2...
9468 ms - [INFO] - [Consumer-0] Finished Producer-B-item-2
9469 ms - [INFO] - [Consumer-0] Processing Producer-A-item-3...
10698 ms - [INFO] - [Consumer-0] Finished Producer-A-item-3
10699 ms - [INFO] - [Consumer-0] Processing Producer-B-item-3...
11836 ms - [INFO] - [Consumer-0] Finished Producer-B-item-3
11836 ms - [INFO] - [Consumer-0] Processing Producer-A-item-4...
12919 ms - [INFO] - [Consumer-0] Finished Producer-A-item-4
12919 ms - [INFO] - [Consumer-0] Processing Producer-B-item-4...
13927 ms - [INFO] - [Consumer-0] Finished Producer-B-item-4
13927 ms - [INFO] - --- All work completed. helper tasks cancelled ---
In the example above, the producers generate items and put them into the queue, and the consumer processes them. Given that the maximum queue size is 5, some blocking occurs during the process. Overflowing requests wait for the queue to free up space. Thus, you can utilize this pattern as a Message Queue.
Asyncio.Event
An Event is much simpler. It manages an internal flag that can be set to True or False. It is a broadcast mechanism: one event can wake up many waiting tasks. That’s why I called it a ‘Traffic light’.
What is Event used for?
Startup Sequences: "Don't start accepting HTTP requests until the Database connection is ready."
Graceful Shutdown: "Tell all background workers to stop loop processing."
And here’re some key methods:
Key Methods
await event.wait(): Blocks the task until the flag becomesTrue. If it's alreadyTrue, it proceeds immediately.event.set(): Sets the flag toTrue. All tasks waiting onwait()are immediately woken up.event.clear(): Resets the flag toFalse. Subsequentwait()calls will block again.
Example
Input
import asyncio
import random
import logging
import time
logging.basicConfig(
level=logging.INFO,
format='%(relativeCreated)d ms - [%(levelname)s] - %(message)s',
)
async def database_connection_manager(event: asyncio.Event):
"""
Simulates a database connection manager that handles connection states.
It manages the event flag to signal when the DB is ready or down.
"""
logging.info("Manager: Database connection initializing...")
await asyncio.sleep(2) # Simulate startup time
# 1. Event Set: Signal that the resource is ready
logging.info("Manager: Database CONNECTED. Setting event (Green Light).")
event.set()
# Let workers process for a while
await asyncio.sleep(3)
# 2. Event Clear: Signal that the resource is unavailable (e.g., maintenance or crash)
logging.info("Manager: Connection LOST! Clearing event (Red Light). blocking workers...")
event.clear()
# Simulate downtime loop
await asyncio.sleep(3)
logging.info("Manager: Reconnecting...")
await asyncio.sleep(2)
# 3. Event Set again: Recovery
logging.info("Manager: Database RECONNECTED. Setting event again.")
event.set()
async def query_worker(worker_id: int, event: asyncio.Event):
"""
Simulates a worker that needs the database to be ready to process queries.
"""
logging.info(f"Worker {worker_id}: Ready to start processing.")
for i in range(5):
# 4. Event Wait: Block until the event is set
# If event.is_set() is True, it returns immediately.
# If False, it waits until some other coroutine calls event.set().
logging.info(f"Worker {worker_id}: Waiting for DB connection to process request {i+1}...")
await event.wait()
# If we reach here, the event is set!
logging.info(f"Worker {worker_id}: Processing request {i+1} (DB is Online)")
# Simulate processing time
process_time = random.uniform(0.5, 1.5)
await asyncio.sleep(process_time)
async def main():
# Create the Event object
# Internal flag is initially False
db_ready_event = asyncio.Event()
async with asyncio.TaskGroup() as tg:
# Create tasks
tg.create_task(database_connection_manager(db_ready_event))
workers = [
tg.create_task(query_worker(i, db_ready_event))
for i in range(1, 4)
]
if __name__ == "__main__":
asyncio.run(main())
Output
48 ms - [INFO] - Manager: Database connection initializing...
49 ms - [INFO] - Worker 1: Ready to start processing.
49 ms - [INFO] - Worker 1: Waiting for DB connection to process request 1...
49 ms - [INFO] - Worker 2: Ready to start processing.
49 ms - [INFO] - Worker 2: Waiting for DB connection to process request 1...
49 ms - [INFO] - Worker 3: Ready to start processing.
49 ms - [INFO] - Worker 3: Waiting for DB connection to process request 1...
2057 ms - [INFO] - Manager: Database CONNECTED. Setting event (Green Light).
2057 ms - [INFO] - Worker 1: Processing request 1 (DB is Online)
2057 ms - [INFO] - Worker 2: Processing request 1 (DB is Online)
2058 ms - [INFO] - Worker 3: Processing request 1 (DB is Online)
2702 ms - [INFO] - Worker 1: Waiting for DB connection to process request 2...
2702 ms - [INFO] - Worker 1: Processing request 2 (DB is Online)
2959 ms - [INFO] - Worker 3: Waiting for DB connection to process request 2...
2959 ms - [INFO] - Worker 3: Processing request 2 (DB is Online)
3501 ms - [INFO] - Worker 2: Waiting for DB connection to process request 2...
3501 ms - [INFO] - Worker 2: Processing request 2 (DB is Online)
3945 ms - [INFO] - Worker 1: Waiting for DB connection to process request 3...
3945 ms - [INFO] - Worker 1: Processing request 3 (DB is Online)
4315 ms - [INFO] - Worker 3: Waiting for DB connection to process request 3...
4315 ms - [INFO] - Worker 3: Processing request 3 (DB is Online)
4632 ms - [INFO] - Worker 1: Waiting for DB connection to process request 4...
4632 ms - [INFO] - Worker 1: Processing request 4 (DB is Online)
4785 ms - [INFO] - Worker 2: Waiting for DB connection to process request 3...
4785 ms - [INFO] - Worker 2: Processing request 3 (DB is Online)
4836 ms - [INFO] - Worker 3: Waiting for DB connection to process request 4...
4836 ms - [INFO] - Worker 3: Processing request 4 (DB is Online)
5056 ms - [INFO] - Manager: Connection LOST! Clearing event (Red Light). blocking workers...
5834 ms - [INFO] - Worker 1: Waiting for DB connection to process request 5...
5890 ms - [INFO] - Worker 2: Waiting for DB connection to process request 4...
6070 ms - [INFO] - Worker 3: Waiting for DB connection to process request 5...
8056 ms - [INFO] - Manager: Reconnecting...
10063 ms - [INFO] - Manager: Database RECONNECTED. Setting event again.
10063 ms - [INFO] - Worker 1: Processing request 5 (DB is Online)
10064 ms - [INFO] - Worker 2: Processing request 4 (DB is Online)
10064 ms - [INFO] - Worker 3: Processing request 5 (DB is Online)
10848 ms - [INFO] - Worker 2: Waiting for DB connection to process request 5...
10848 ms - [INFO] - Worker 2: Processing request 5 (DB is Online)
Asyncio.to_thread
Sometimes, you might encounter a problem where the functions you want to use do not have asynchronous capabilities. These days, many libraries and frameworks support async operations, but some legacy libraries still do not. In this situation, you can use to_thread.
asyncio.to_thread is a bridge. It allows you to take Blocking I/O (like reading a huge file or using a synchronous library like requests) or Light CPU work and run it without "stopping the world" for your other async tasks.
The Mechanics of asyncio.to_thread
When you call to_thread, the following sequence occurs:
The Wrapper:
asynciowraps your synchronous function in a coroutine object.The Context: It captures the current
contextvars(so your database sessions or tokens stay available).The Executor: It pushes the function into a separate
ThreadPoolExecutor.The Yield: The calling coroutine
awaitsaFutureobject. This allows the Event Loop to go do other work while the thread is busy.
import asyncio
import time
def brew_coffee(n: int) -> None:
print(f"Start brewing coffee #{n}...")
time.sleep(5)
print(f"Coffee #{n} is ready!")
async def main() -> None:
start = time.time()
tasks = [asyncio.to_thread(brew_coffee, i + 1) for i in range(3)]
await asyncio.gather(*tasks)
end = time.time()
print("Coffee ready!")
print(f"Total time: {end - start:.2f} seconds")
if __name__ == "__main__":
asyncio.run(main())
This code is a similar example to the one in the previous post. When we ran that code with asyncio.create_task, it executed synchronously because of the blocking I/O function: time.sleep.
But in this example, if we run this code, it returns
Start brewing coffee #1...
Start brewing coffee #2...
Start brewing coffee #3...
Coffee #2 is ready!
Coffee #1 is ready!
Coffee #3 is ready!
Coffee ready!
Total time: 5.00 seconds
As you can see, the tasks ran asynchronously. You can run legacy blocking I/O functions asynchronously by leveraging to_thread.
Is this the same as Multi-threading? What is the difference?
| Feature | with ThreadPoolExecutor() | asyncio.to_thread() |
|---|---|---|
| Context Switching | OS-level (Preemptive) | OS-level (Preemptive execution) |
| Context Variables | Lost (e.g., Trace IDs, Auth tokens) |
Preserved (Propagates contextvars) |
| Error Handling | Needs manual future.exception() check |
Integrated with try/except and TaskGroups
|
| Lifecycle | Manual (must use with or .shutdown()) |
Automatic (managed by the event loop) |
| Configuration | Explicit (max_workers=...) |
Uses the loop's default executor |
Actually, to_thread uses multi-threading, but wraps it in a coroutine interface. It doesn’t incur data loss because it automatically propagates context variables, unlike a raw ThreadPool.
Someone might ask “Then, I don’t need to use async coroutine functions, right? Because I can run both sync and async functions asynchronously with to_thread.”
Limitation of to_thread
That is absolutely not true. to_thread utilizes threads for I/O bound work, whereas native async tasks are single-threaded. to_thread is a hybrid feature.
It utilizes a global thread pool for processing I/O bound work with legacy blocking I/O functions. However, it can’t process more tasks than the number of threads in the global pool. In contrast, native coroutines can process a much larger number of I/O bound tasks with just one thread.
Note: Default numberof threads in the pool
Conclusion
I have introduced some useful functions of asyncio. In the next post, I will demonstrate how to apply these functions to LLM inferences in the next post.
Top comments (0)