DEV Community

Lucas Braun
Lucas Braun

Posted on

AsyncIO in production: event loop, tasks, and the traps no one warns you about

You've shipped async code. It works in dev. Then production hits — and it hangs, leaks memory, or silently swallows exceptions. Here's what actually matters.


The mental model most tutorials skip

When you write async def and await, you're not getting parallelism — you're getting cooperative multitasking. The event loop runs one coroutine at a time. Every await is a voluntary yield: "I'm waiting for I/O, go run something else."

This means:

  • CPU-bound work blocks the entire loop — no other coroutine runs while you're crunching numbers
  • A single time.sleep() inside an async function freezes everything
  • "Async" doesn't mean "fast" — it means "efficient while waiting"
import asyncio
import time

async def bad():
    time.sleep(2)  # blocks the entire event loop for 2 seconds
    return "done"

async def good():
    await asyncio.sleep(2)  # yields control — other tasks run
    return "done"
Enter fullscreen mode Exit fullscreen mode

Internalize this and half the async bugs you'll ever face become obvious.


asyncio.create_task() vs asyncio.gather() vs TaskGroup

Three ways to run coroutines concurrently. They are not interchangeable.

asyncio.gather()

Runs coroutines concurrently and collects results. By default, if one raises, it cancels nothing — the others keep running. With return_exceptions=True, exceptions come back as values.

results = await asyncio.gather(
    fetch_user(user_id),
    fetch_orders(user_id),
    fetch_preferences(user_id),
    return_exceptions=True,
)

for r in results:
    if isinstance(r, Exception):
        logger.error("subtask failed", exc_info=r)
Enter fullscreen mode Exit fullscreen mode

The problem: with return_exceptions=False (the default), a single failure raises immediately and the remaining coroutines are not awaited — they're abandoned, which can cause resource leaks.

asyncio.create_task()

Schedules a coroutine as a background task. It starts running immediately, independently of what you await next.

task = asyncio.create_task(send_notification(user_id))
# do other work here
await task  # or don't — and risk losing the exception
Enter fullscreen mode Exit fullscreen mode

The trap: if you never await a task and it raises, the exception disappears silently. You'll see a Task exception was never retrieved warning in logs — easy to miss.

Always keep a reference to tasks you create. The event loop only holds a weak reference.

# wrong — task may be garbage collected mid-flight
asyncio.create_task(background_job())

# right — keep a strong reference
_background_tasks: set[asyncio.Task] = set()

def fire_and_forget(coro):
    task = asyncio.create_task(coro)
    _background_tasks.add(task)
    task.add_done_callback(_background_tasks.discard)
Enter fullscreen mode Exit fullscreen mode

asyncio.TaskGroup (Python 3.11+)

The modern, correct way to run structured concurrent tasks. If any task fails, all others are cancelled. No exceptions get swallowed.

async def fetch_all(user_id: int):
    async with asyncio.TaskGroup() as tg:
        user_task = tg.create_task(fetch_user(user_id))
        orders_task = tg.create_task(fetch_orders(user_id))

    # both tasks are done here — or an ExceptionGroup was raised
    return user_task.result(), orders_task.result()
Enter fullscreen mode Exit fullscreen mode

Use TaskGroup for new code. Use gather only when you need return_exceptions=True semantics or are on Python < 3.11.


Timeouts: don't trust your dependencies

External services lie. They accept your connection and then stop responding. Without timeouts, you get coroutines waiting forever, exhausting your connection pool.

asyncio.timeout() (Python 3.11+)

async def fetch_with_timeout(url: str) -> dict:
    async with asyncio.timeout(5.0):
        async with httpx.AsyncClient() as client:
            response = await client.get(url)
            return response.json()
Enter fullscreen mode Exit fullscreen mode

Raises asyncio.TimeoutError if the block takes more than 5 seconds. Clean, readable, composable.

asyncio.wait_for() for older Python

try:
    result = await asyncio.wait_for(fetch_data(), timeout=5.0)
except asyncio.TimeoutError:
    logger.warning("fetch_data timed out")
    raise
Enter fullscreen mode Exit fullscreen mode

The trap: cancellation and cleanup

When a timeout fires, the inner coroutine receives a CancelledError. If it catches Exception broadly and swallows it, the cancellation is suppressed — your timeout does nothing.

# wrong — suppresses CancelledError
async def bad_fetch():
    try:
        return await httpx_client.get(url)
    except Exception:
        return None  # this catches CancelledError too

# right — let CancelledError propagate
async def good_fetch():
    try:
        return await httpx_client.get(url)
    except httpx.HTTPError as e:
        logger.error("HTTP error", exc_info=e)
        return None
    # CancelledError is NOT caught here — it propagates correctly
Enter fullscreen mode Exit fullscreen mode

asyncio.shield(): protecting critical work from cancellation

Sometimes a task gets cancelled (timeout, parent TaskGroup failure) but you need a specific operation to finish — like writing to a database or sending a notification.

async def save_and_notify(data: dict):
    # Even if this coroutine is cancelled, save_to_db will complete
    await asyncio.shield(save_to_db(data))
    await send_notification(data["user_id"])  # this CAN be cancelled
Enter fullscreen mode Exit fullscreen mode

Important: shield() does not prevent the outer coroutine from being cancelled. It only protects the inner task from receiving the cancellation signal. The outer coroutine still gets CancelledError — you need to handle it.

Use shield() sparingly. If you're shielding a lot, your cancellation design is probably wrong.


Debugging stuck coroutines in production

Enable slow callback logging

The event loop logs a warning when a callback takes too long. Set the threshold:

import asyncio
import logging

logging.basicConfig(level=logging.DEBUG)

loop = asyncio.get_event_loop()
loop.slow_callback_duration = 0.1  # warn if any callback blocks > 100ms
Enter fullscreen mode Exit fullscreen mode

This catches the most common issue: a sync library call or CPU-bound operation blocking the loop.

Dump all running tasks

When your service hangs, dump what's running:

import asyncio

def dump_tasks():
    for task in asyncio.all_tasks():
        print(task.get_name())
        task.print_stack()
Enter fullscreen mode Exit fullscreen mode

In FastAPI, wire this to a debug endpoint or a signal handler:

import signal

def setup_debug_signal():
    def handler(sig, frame):
        dump_tasks()
    signal.signal(signal.SIGUSR1, handler)

# kill -USR1 <pid> to trigger
Enter fullscreen mode Exit fullscreen mode

asyncio.Runner for scripts (Python 3.11+)

Instead of asyncio.run(), use Runner for more control:

with asyncio.Runner(debug=True) as runner:
    runner.run(main())
Enter fullscreen mode Exit fullscreen mode

Debug mode enables:

  • Slow callback warnings
  • Coroutine origin tracking (shows where each task was created)
  • Unawaited coroutine detection

The FastAPI-specific traps

Sync dependencies in async routes

FastAPI runs sync dependencies in a thread pool. But if you mix sync and async carelessly, you can block the event loop.

# wrong — sync DB call inside async route blocks the loop
@app.get("/users/{id}")
async def get_user(id: int, db: Session = Depends(get_db)):
    return db.query(User).filter(User.id == id).first()

# right — use async session with SQLAlchemy async
@app.get("/users/{id}")
async def get_user(id: int, db: AsyncSession = Depends(get_async_db)):
    result = await db.execute(select(User).where(User.id == id))
    return result.scalar_one_or_none()
Enter fullscreen mode Exit fullscreen mode

Async generator leaks

FastAPI uses async generators for dependencies (the yield pattern). If the dependency is never properly closed — due to a timeout or cancellation — cleanup code after yield doesn't run.

async def get_db():
    async with AsyncSession(engine) as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        # this finally block ALWAYS runs — even on cancellation
        finally:
            await session.close()
Enter fullscreen mode Exit fullscreen mode

Always use try/finally in async generator dependencies. Never assume the happy path.


Summary

Scenario Use
Run N coroutines, collect all results asyncio.gather() with return_exceptions=True
Run N coroutines, fail fast on first error asyncio.TaskGroup
Fire-and-forget background task create_task() + strong reference
Timeout an operation asyncio.timeout() or wait_for()
Protect critical work from cancellation asyncio.shield()
Debug a hung event loop all_tasks() + signal handler

AsyncIO's model is simple once you internalize it. The bugs don't come from complexity — they come from assuming it works like threading, or from libraries that weren't built for async. Know your primitives, handle cancellation explicitly, and always keep a reference to your tasks.


Have a war story about async bugs in production? Drop it in the comments — the more specific, the better.


Tags: python asyncio fastapi backend productivity

Top comments (0)