DEV Community

Cover image for Coroutine series 2) Useful Asyncio Functions
Jun Bae
Jun Bae

Posted on

Coroutine series 2) Useful Asyncio Functions

This is the second post in the series Coroutine, IO bound and Asyncio for AI.

Introduction

I explained coroutines and asyncio in the previous post: https://dev.to/jun07/coroutine-series-1-what-are-coroutine-asyncio-io-bound-2gh5

In this post, I’m going to show some functions of the asyncio library that are very useful for production-level development.


Coroutine functions

I briefly introduced simple coroutine functions such as async, await, gather, create_task. There are more functions in the asyncio library and you can do so many things with these powerful tools.

gather and TaskGroup

I demonstrated how to use gather but I didn’t explain much about it. After you make some coroutine functions, you have to make them coordinate well. Often, several coroutine functions should be executed asynchronously. In that case, you can utilize the functions called gather or TaskGroup in asyncio. But are they exactly the same?

Actually, they have a very crucial difference. When one of the tasks in gather fails, the other tasks keep running. On the other hand, TaskGroup cancels the remaining tasks immediately. Look at this example.

import asyncio
import time

async def example_task(name, delay, should_fail=False):
    print(f"Task {name} starting (delay={delay}s)")
    await asyncio.sleep(delay)
    if should_fail:
        print(f"Task {name} failing")
        raise ValueError(f"Task {name} failed")
    print(f"Task {name} finished")
    return f"Result {name}"

async def demonstrate_gather():
    print("\n--- Demonstrating asyncio.gather ---")
    start_time = time.time()

    # Create tasks: B will fail
    tasks = [
        example_task("A", 1),
        example_task("B", 2, should_fail=True),
        example_task("C", 3)
    ]

    # asyncio.gather without return_exceptions=True raises the first exception immediately
    # BUT it does NOT cancel the other tasks. They keep running in the background.
    print("Starting gather with a failing task...")
    try:
        results = await asyncio.gather(*tasks)
    except Exception as e:
        print(f"Gather caught exception: {e}")

    await asyncio.sleep(2)
    print(f"Gather finished (after waiting). Note if other tasks continued printing above.")
    print(f"Gather took: {time.time() - start_time:.2f}s")

async def demonstrate_task_group():
    print("\n--- Demonstrating asyncio.TaskGroup ---")
    start_time = time.time()

    # TaskGroup cancels all other tasks immediately when one fails
    print("Starting TaskGroup with a failing task...")
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(example_task("X", 1))
            tg.create_task(example_task("Y", 2, should_fail=True))
            tg.create_task(example_task("Z", 3))

    except Exception as e:
        print(f"TaskGroup caught exception: {type(e).__name__}: {e}")

    print(f"TaskGroup finished. Other tasks should have been cancelled (no 'finished' print for Z).")
    print(f"TaskGroup took: {time.time() - start_time:.2f}s")

async def main():
    await demonstrate_gather()
    await demonstrate_task_group()

if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Output

--- Demonstrating asyncio.gather ---
Starting gather with a failing task...
Task A starting (delay=1s)
Task B starting (delay=2s)
Task C starting (delay=3s)
Task A finished
Task B failing
Gather caught exception: Task B failed
Task C finished
Gather finished (after waiting). Note if other tasks continued printing above.
Gather took: 4.00s

--- Demonstrating asyncio.TaskGroup ---
Starting TaskGroup with a failing task...
Task X starting (delay=1s)
Task Y starting (delay=2s)
Task Z starting (delay=3s)
Task X finished
Task Y failing
TaskGroup caught exception: ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
TaskGroup finished. Other tasks should have been cancelled (no 'finished' print for Z).
TaskGroup took: 1.99s
Enter fullscreen mode Exit fullscreen mode

As you can see, gather keeps running even if one of the tasks fails while TaskGroup raises an exception and cancels the remaining tasks immediately.

Differences: TaskGroup vs. gather

  • Cancellation: In asyncio.gather(), if one task fails, the others continue running (unless explicitly cancelled). TaskGroup cancels siblings immediately upon the first failure, conserving resources.

  • Error Reporting: gather() returns a list of results or raises the first exception. TaskGroup raises an ExceptionGroup containing all exceptions that occurred, providing a complete picture of the failure state.

Why is TaskGroup favored over gather?

TaskGroup enforces a "parent-child" relationship where the parent cannot exit until all children are finished. This eliminates "orphaned tasks"—background processes that silently keep running (and consuming resources) after your main function has finished or crashed. That is why modern high-reliability Python architecture favors Structured Concurrency(asyncio.TaskGroup) over unstructured create_task + gather.

By design, TaskGroup operates on a "fail-fast" principle. If one task fails, the group immediately sends a cancellation signal to all other running tasks in that group.

But what if I don’t want to cancel the whole task group even if one fails? Should I just use just gather? In that case, you can still utilize TaskGroup by handling exceptions with try and except blocks inside the individual tasks.

Note: asyncio.TaskGroup is available in Python 3.11+. For older versions, you may need third-party libraries or stick to gather.

Synchronization Primitives: Queue and Event

What if I want to run some tasks synchronously in certain parts? To do this, asyncio provides synchronization primitives.

  1. asyncio.Queue (The Conveyor Belt): Used for Data Transfer. It allows you to pass data safely between tasks. It handles "backpressure" (slowing down the producer if the consumer is full).

  2. asyncio.Event (The Traffic Light): Used for Signaling. It holds a simple boolean state (True/False). Tasks "wait" for the light to turn green before proceeding.

Asyncio.Queue

As you know, a Queue is a First-In-First-Out(FIFO) data structure. But what is asyncio.Queue then? It is designed to decouple tasks that produce work from tasks that perform work.

What is Queue used for?

  • Load Balancing: If the producer is faster than the consumer, the queue buffers the excess.

  • Flow Control: If you set a maxsize, the producer will pause (await put()) until the consumer clears space. This prevents memory explosions.

Before getting into the example code, here are some key methods:

Key Methods
  • await q.put(item): Add an item. Blocks if the queue is full.

  • await q.get(): Remove and return an item. Blocks if the queue is empty.

  • q.task_done(): Tells the queue "I finished processing the item I just got."

  • await q.join(): Blocks until all items in the queue have been processed (marked via task_done).

Example

Input

import asyncio
import logging
import random
import time

logging.basicConfig(
    level=logging.INFO,
    format='%(relativeCreated)d ms - [%(levelname)s] - %(message)s',
)


async def producer(name: str, queue: asyncio.Queue, items_to_produce: int):
    """Generates work items and puts them into the queue."""
    for i in range(items_to_produce):
        item = f"{name}-item-{i}"

        # Simulate varying production time
        await asyncio.sleep(random.uniform(0.2, 0.8))

        await queue.put(item)
        logging.info(f"[{name}] Added: {item} (Q size: {queue.qsize()})")

async def consumer(name: str, queue: asyncio.Queue):
    """Processes items from the queue continuously."""
    while True:
        # Waits here if queue is empty
        item = await queue.get()

        # Simulate processing work
        logging.info(f"[{name}] Processing {item}...")
        await asyncio.sleep(random.uniform(1.0, 2.0))

        # Signal that this specific item is fully processed
        queue.task_done()
        logging.info(f"[{name}] Finished {item}")

async def main():
    # maxsize=3 to easily demonstrate backpressure (producers waiting for consumers)
    q = asyncio.Queue(maxsize=5)

    logging.info("--- Starting Pipeline ---")

    # 1. Start Consumers as daemon tasks
    # They run forever until cancelled.
    consumers = asyncio.create_task(consumer(f"Consumer-0", q))

    # 2. Run Producers to completion
    # We use TaskGroup to wait for all producers to finish.
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer("Producer-A", q, 5))
        tg.create_task(producer("Producer-B", q, 5))
        # Total 10 items will be produced

    logging.info("--- All producers finished. Waiting for processing... ---")

    # 3. Wait for the queue to be fully processed
    # execution blocks here until q.task_done() is called for every item
    await q.join()

    logging.info("--- All work completed. helper tasks cancelled ---")

    # 4. Cancel the consumer tasks since they are permanent loops
    consumers.cancel()

if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Output

48 ms - [INFO] - --- Starting Pipeline ---
608 ms - [INFO] - [Producer-A] Added: Producer-A-item-0 (Q size: 1)
609 ms - [INFO] - [Consumer-0] Processing Producer-A-item-0...
788 ms - [INFO] - [Producer-B] Added: Producer-B-item-0 (Q size: 1)
1079 ms - [INFO] - [Producer-A] Added: Producer-A-item-1 (Q size: 2)
1325 ms - [INFO] - [Producer-A] Added: Producer-A-item-2 (Q size: 3)
1325 ms - [INFO] - [Producer-B] Added: Producer-B-item-1 (Q size: 4)
1587 ms - [INFO] - [Producer-B] Added: Producer-B-item-2 (Q size: 5)
1850 ms - [INFO] - [Consumer-0] Finished Producer-A-item-0
1851 ms - [INFO] - [Consumer-0] Processing Producer-B-item-0...
1851 ms - [INFO] - [Producer-A] Added: Producer-A-item-3 (Q size: 5)
3511 ms - [INFO] - [Consumer-0] Finished Producer-B-item-0
3511 ms - [INFO] - [Consumer-0] Processing Producer-A-item-1...
3512 ms - [INFO] - [Producer-B] Added: Producer-B-item-3 (Q size: 5)
4822 ms - [INFO] - [Consumer-0] Finished Producer-A-item-1
4822 ms - [INFO] - [Consumer-0] Processing Producer-A-item-2...
4822 ms - [INFO] - [Producer-A] Added: Producer-A-item-4 (Q size: 5)
6038 ms - [INFO] - [Consumer-0] Finished Producer-A-item-2
6038 ms - [INFO] - [Consumer-0] Processing Producer-B-item-1...
6038 ms - [INFO] - [Producer-B] Added: Producer-B-item-4 (Q size: 5)
6038 ms - [INFO] - --- All producers finished. Waiting for processing... ---
7538 ms - [INFO] - [Consumer-0] Finished Producer-B-item-1
7538 ms - [INFO] - [Consumer-0] Processing Producer-B-item-2...
9468 ms - [INFO] - [Consumer-0] Finished Producer-B-item-2
9469 ms - [INFO] - [Consumer-0] Processing Producer-A-item-3...
10698 ms - [INFO] - [Consumer-0] Finished Producer-A-item-3
10699 ms - [INFO] - [Consumer-0] Processing Producer-B-item-3...
11836 ms - [INFO] - [Consumer-0] Finished Producer-B-item-3
11836 ms - [INFO] - [Consumer-0] Processing Producer-A-item-4...
12919 ms - [INFO] - [Consumer-0] Finished Producer-A-item-4
12919 ms - [INFO] - [Consumer-0] Processing Producer-B-item-4...
13927 ms - [INFO] - [Consumer-0] Finished Producer-B-item-4
13927 ms - [INFO] - --- All work completed. helper tasks cancelled ---
Enter fullscreen mode Exit fullscreen mode

In the example above, the producers generate items and put them into the queue, and the consumer processes them. Given that the maximum queue size is 5, some blocking occurs during the process. Overflowing requests wait for the queue to free up space. Thus, you can utilize this pattern as a Message Queue.

Asyncio.Event

An Event is much simpler. It manages an internal flag that can be set to True or False. It is a broadcast mechanism: one event can wake up many waiting tasks. That’s why I called it a ‘Traffic light’.

What is Event used for?

  • Startup Sequences: "Don't start accepting HTTP requests until the Database connection is ready."

  • Graceful Shutdown: "Tell all background workers to stop loop processing."

And here’re some key methods:

Key Methods

  • await event.wait(): Blocks the task until the flag becomes True. If it's already True, it proceeds immediately.

  • event.set(): Sets the flag to True. All tasks waiting on wait() are immediately woken up.

  • event.clear(): Resets the flag to False. Subsequent wait() calls will block again.

Example

Input

import asyncio
import random
import logging
import time

logging.basicConfig(
    level=logging.INFO,
    format='%(relativeCreated)d ms - [%(levelname)s] - %(message)s',
)

async def database_connection_manager(event: asyncio.Event):
    """
    Simulates a database connection manager that handles connection states.
    It manages the event flag to signal when the DB is ready or down.
    """
    logging.info("Manager: Database connection initializing...")
    await asyncio.sleep(2)  # Simulate startup time

    # 1. Event Set: Signal that the resource is ready
    logging.info("Manager: Database CONNECTED. Setting event (Green Light).")
    event.set()

    # Let workers process for a while
    await asyncio.sleep(3)

    # 2. Event Clear: Signal that the resource is unavailable (e.g., maintenance or crash)
    logging.info("Manager: Connection LOST! Clearing event (Red Light). blocking workers...")
    event.clear()

    # Simulate downtime loop
    await asyncio.sleep(3)

    logging.info("Manager: Reconnecting...")
    await asyncio.sleep(2)

    # 3. Event Set again: Recovery
    logging.info("Manager: Database RECONNECTED. Setting event again.")
    event.set()

async def query_worker(worker_id: int, event: asyncio.Event):
    """
    Simulates a worker that needs the database to be ready to process queries.
    """
    logging.info(f"Worker {worker_id}: Ready to start processing.")

    for i in range(5):
        # 4. Event Wait: Block until the event is set
        # If event.is_set() is True, it returns immediately.
        # If False, it waits until some other coroutine calls event.set().
        logging.info(f"Worker {worker_id}: Waiting for DB connection to process request {i+1}...")

        await event.wait()

        # If we reach here, the event is set!
        logging.info(f"Worker {worker_id}: Processing request {i+1} (DB is Online)")

        # Simulate processing time
        process_time = random.uniform(0.5, 1.5)
        await asyncio.sleep(process_time)

async def main():
    # Create the Event object
    # Internal flag is initially False
    db_ready_event = asyncio.Event()

    async with asyncio.TaskGroup() as tg:
        # Create tasks
        tg.create_task(database_connection_manager(db_ready_event))

        workers = [
            tg.create_task(query_worker(i, db_ready_event))
            for i in range(1, 4)
        ]

if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Output

48 ms - [INFO] - Manager: Database connection initializing...
49 ms - [INFO] - Worker 1: Ready to start processing.
49 ms - [INFO] - Worker 1: Waiting for DB connection to process request 1...
49 ms - [INFO] - Worker 2: Ready to start processing.
49 ms - [INFO] - Worker 2: Waiting for DB connection to process request 1...
49 ms - [INFO] - Worker 3: Ready to start processing.
49 ms - [INFO] - Worker 3: Waiting for DB connection to process request 1...
2057 ms - [INFO] - Manager: Database CONNECTED. Setting event (Green Light).
2057 ms - [INFO] - Worker 1: Processing request 1 (DB is Online)
2057 ms - [INFO] - Worker 2: Processing request 1 (DB is Online)
2058 ms - [INFO] - Worker 3: Processing request 1 (DB is Online)
2702 ms - [INFO] - Worker 1: Waiting for DB connection to process request 2...
2702 ms - [INFO] - Worker 1: Processing request 2 (DB is Online)
2959 ms - [INFO] - Worker 3: Waiting for DB connection to process request 2...
2959 ms - [INFO] - Worker 3: Processing request 2 (DB is Online)
3501 ms - [INFO] - Worker 2: Waiting for DB connection to process request 2...
3501 ms - [INFO] - Worker 2: Processing request 2 (DB is Online)
3945 ms - [INFO] - Worker 1: Waiting for DB connection to process request 3...
3945 ms - [INFO] - Worker 1: Processing request 3 (DB is Online)
4315 ms - [INFO] - Worker 3: Waiting for DB connection to process request 3...
4315 ms - [INFO] - Worker 3: Processing request 3 (DB is Online)
4632 ms - [INFO] - Worker 1: Waiting for DB connection to process request 4...
4632 ms - [INFO] - Worker 1: Processing request 4 (DB is Online)
4785 ms - [INFO] - Worker 2: Waiting for DB connection to process request 3...
4785 ms - [INFO] - Worker 2: Processing request 3 (DB is Online)
4836 ms - [INFO] - Worker 3: Waiting for DB connection to process request 4...
4836 ms - [INFO] - Worker 3: Processing request 4 (DB is Online)
5056 ms - [INFO] - Manager: Connection LOST! Clearing event (Red Light). blocking workers...
5834 ms - [INFO] - Worker 1: Waiting for DB connection to process request 5...
5890 ms - [INFO] - Worker 2: Waiting for DB connection to process request 4...
6070 ms - [INFO] - Worker 3: Waiting for DB connection to process request 5...
8056 ms - [INFO] - Manager: Reconnecting...
10063 ms - [INFO] - Manager: Database RECONNECTED. Setting event again.
10063 ms - [INFO] - Worker 1: Processing request 5 (DB is Online)
10064 ms - [INFO] - Worker 2: Processing request 4 (DB is Online)
10064 ms - [INFO] - Worker 3: Processing request 5 (DB is Online)
10848 ms - [INFO] - Worker 2: Waiting for DB connection to process request 5...
10848 ms - [INFO] - Worker 2: Processing request 5 (DB is Online)
Enter fullscreen mode Exit fullscreen mode

Asyncio.to_thread

Sometimes, you might encounter a problem where the functions you want to use do not have asynchronous capabilities. These days, many libraries and frameworks support async operations, but some legacy libraries still do not. In this situation, you can use to_thread.

asyncio.to_thread is a bridge. It allows you to take Blocking I/O (like reading a huge file or using a synchronous library like requests) or Light CPU work and run it without "stopping the world" for your other async tasks.

The Mechanics of asyncio.to_thread

When you call to_thread, the following sequence occurs:

  1. The Wrapper: asyncio wraps your synchronous function in a coroutine object.

  2. The Context: It captures the current contextvars (so your database sessions or tokens stay available).

  3. The Executor: It pushes the function into a separate ThreadPoolExecutor.

  4. The Yield: The calling coroutine awaits a Future object. This allows the Event Loop to go do other work while the thread is busy.

import asyncio
import time


def brew_coffee(n: int) -> None:
    print(f"Start brewing coffee #{n}...")

    time.sleep(5)

    print(f"Coffee #{n} is ready!")


async def main() -> None:
    start = time.time()

    tasks = [asyncio.to_thread(brew_coffee, i + 1) for i in range(3)]

    await asyncio.gather(*tasks)

    end = time.time()
    print("Coffee ready!")
    print(f"Total time: {end - start:.2f} seconds")


if __name__ == "__main__":
    asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

This code is a similar example to the one in the previous post. When we ran that code with asyncio.create_task, it executed synchronously because of the blocking I/O function: time.sleep.

But in this example, if we run this code, it returns

Start brewing coffee #1...
Start brewing coffee #2...
Start brewing coffee #3...
Coffee #2 is ready!
Coffee #1 is ready!
Coffee #3 is ready!
Coffee ready!
Total time: 5.00 seconds
Enter fullscreen mode Exit fullscreen mode

As you can see, the tasks ran asynchronously. You can run legacy blocking I/O functions asynchronously by leveraging to_thread.

Is this the same as Multi-threading? What is the difference?

Feature with ThreadPoolExecutor() asyncio.to_thread()
Context Switching OS-level (Preemptive) OS-level (Preemptive execution)
Context Variables Lost (e.g., Trace IDs, Auth tokens) Preserved (Propagates contextvars)
Error Handling Needs manual future.exception() check Integrated with try/except and TaskGroups
Lifecycle Manual (must use with or .shutdown()) Automatic (managed by the event loop)
Configuration Explicit (max_workers=...) Uses the loop's default executor

Actually, to_thread uses multi-threading, but wraps it in a coroutine interface. It doesn’t incur data loss because it automatically propagates context variables, unlike a raw ThreadPool.

Someone might ask “Then, I don’t need to use async coroutine functions, right? Because I can run both sync and async functions asynchronously with to_thread.”

Limitation of to_thread

That is absolutely not true. to_thread utilizes threads for I/O bound work, whereas native async tasks are single-threaded. to_thread is a hybrid feature.
It utilizes a global thread pool for processing I/O bound work with legacy blocking I/O functions. However, it can’t process more tasks than the number of threads in the global pool. In contrast, native coroutines can process a much larger number of I/O bound tasks with just one thread.

Note: Default numberof threads in the pool

Nthreads=min(32,os.cpu_count()+4) \mathbf{N}_{threads}=\mathrm{min}(32,\, \mathrm{os.cpu\_count}() + 4)

Conclusion

I have introduced some useful functions of asyncio. In the next post, I will demonstrate how to apply these functions to LLM inferences in the next post.

Top comments (0)

The discussion has been locked. New comments can't be added.