You've shipped async code. It works in dev. Then production hits — and it hangs, leaks memory, or silently swallows exceptions. Here's what actually matters.
The mental model most tutorials skip
When you write async def and await, you're not getting parallelism — you're getting cooperative multitasking. The event loop runs one coroutine at a time. Every await is a voluntary yield: "I'm waiting for I/O, go run something else."
This means:
- CPU-bound work blocks the entire loop — no other coroutine runs while you're crunching numbers
- A single
time.sleep()inside an async function freezes everything - "Async" doesn't mean "fast" — it means "efficient while waiting"
import asyncio
import time
async def bad():
time.sleep(2) # blocks the entire event loop for 2 seconds
return "done"
async def good():
await asyncio.sleep(2) # yields control — other tasks run
return "done"
Internalize this and half the async bugs you'll ever face become obvious.
asyncio.create_task() vs asyncio.gather() vs TaskGroup
Three ways to run coroutines concurrently. They are not interchangeable.
asyncio.gather()
Runs coroutines concurrently and collects results. By default, if one raises, it cancels nothing — the others keep running. With return_exceptions=True, exceptions come back as values.
results = await asyncio.gather(
fetch_user(user_id),
fetch_orders(user_id),
fetch_preferences(user_id),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
logger.error("subtask failed", exc_info=r)
The problem: with return_exceptions=False (the default), a single failure raises immediately and the remaining coroutines are not awaited — they're abandoned, which can cause resource leaks.
asyncio.create_task()
Schedules a coroutine as a background task. It starts running immediately, independently of what you await next.
task = asyncio.create_task(send_notification(user_id))
# do other work here
await task # or don't — and risk losing the exception
The trap: if you never await a task and it raises, the exception disappears silently. You'll see a Task exception was never retrieved warning in logs — easy to miss.
Always keep a reference to tasks you create. The event loop only holds a weak reference.
# wrong — task may be garbage collected mid-flight
asyncio.create_task(background_job())
# right — keep a strong reference
_background_tasks: set[asyncio.Task] = set()
def fire_and_forget(coro):
task = asyncio.create_task(coro)
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
asyncio.TaskGroup (Python 3.11+)
The modern, correct way to run structured concurrent tasks. If any task fails, all others are cancelled. No exceptions get swallowed.
async def fetch_all(user_id: int):
async with asyncio.TaskGroup() as tg:
user_task = tg.create_task(fetch_user(user_id))
orders_task = tg.create_task(fetch_orders(user_id))
# both tasks are done here — or an ExceptionGroup was raised
return user_task.result(), orders_task.result()
Use TaskGroup for new code. Use gather only when you need return_exceptions=True semantics or are on Python < 3.11.
Timeouts: don't trust your dependencies
External services lie. They accept your connection and then stop responding. Without timeouts, you get coroutines waiting forever, exhausting your connection pool.
asyncio.timeout() (Python 3.11+)
async def fetch_with_timeout(url: str) -> dict:
async with asyncio.timeout(5.0):
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
Raises asyncio.TimeoutError if the block takes more than 5 seconds. Clean, readable, composable.
asyncio.wait_for() for older Python
try:
result = await asyncio.wait_for(fetch_data(), timeout=5.0)
except asyncio.TimeoutError:
logger.warning("fetch_data timed out")
raise
The trap: cancellation and cleanup
When a timeout fires, the inner coroutine receives a CancelledError. If it catches Exception broadly and swallows it, the cancellation is suppressed — your timeout does nothing.
# wrong — suppresses CancelledError
async def bad_fetch():
try:
return await httpx_client.get(url)
except Exception:
return None # this catches CancelledError too
# right — let CancelledError propagate
async def good_fetch():
try:
return await httpx_client.get(url)
except httpx.HTTPError as e:
logger.error("HTTP error", exc_info=e)
return None
# CancelledError is NOT caught here — it propagates correctly
asyncio.shield(): protecting critical work from cancellation
Sometimes a task gets cancelled (timeout, parent TaskGroup failure) but you need a specific operation to finish — like writing to a database or sending a notification.
async def save_and_notify(data: dict):
# Even if this coroutine is cancelled, save_to_db will complete
await asyncio.shield(save_to_db(data))
await send_notification(data["user_id"]) # this CAN be cancelled
Important: shield() does not prevent the outer coroutine from being cancelled. It only protects the inner task from receiving the cancellation signal. The outer coroutine still gets CancelledError — you need to handle it.
Use shield() sparingly. If you're shielding a lot, your cancellation design is probably wrong.
Debugging stuck coroutines in production
Enable slow callback logging
The event loop logs a warning when a callback takes too long. Set the threshold:
import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
loop.slow_callback_duration = 0.1 # warn if any callback blocks > 100ms
This catches the most common issue: a sync library call or CPU-bound operation blocking the loop.
Dump all running tasks
When your service hangs, dump what's running:
import asyncio
def dump_tasks():
for task in asyncio.all_tasks():
print(task.get_name())
task.print_stack()
In FastAPI, wire this to a debug endpoint or a signal handler:
import signal
def setup_debug_signal():
def handler(sig, frame):
dump_tasks()
signal.signal(signal.SIGUSR1, handler)
# kill -USR1 <pid> to trigger
asyncio.Runner for scripts (Python 3.11+)
Instead of asyncio.run(), use Runner for more control:
with asyncio.Runner(debug=True) as runner:
runner.run(main())
Debug mode enables:
- Slow callback warnings
- Coroutine origin tracking (shows where each task was created)
- Unawaited coroutine detection
The FastAPI-specific traps
Sync dependencies in async routes
FastAPI runs sync dependencies in a thread pool. But if you mix sync and async carelessly, you can block the event loop.
# wrong — sync DB call inside async route blocks the loop
@app.get("/users/{id}")
async def get_user(id: int, db: Session = Depends(get_db)):
return db.query(User).filter(User.id == id).first()
# right — use async session with SQLAlchemy async
@app.get("/users/{id}")
async def get_user(id: int, db: AsyncSession = Depends(get_async_db)):
result = await db.execute(select(User).where(User.id == id))
return result.scalar_one_or_none()
Async generator leaks
FastAPI uses async generators for dependencies (the yield pattern). If the dependency is never properly closed — due to a timeout or cancellation — cleanup code after yield doesn't run.
async def get_db():
async with AsyncSession(engine) as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# this finally block ALWAYS runs — even on cancellation
finally:
await session.close()
Always use try/finally in async generator dependencies. Never assume the happy path.
Summary
| Scenario | Use |
|---|---|
| Run N coroutines, collect all results |
asyncio.gather() with return_exceptions=True
|
| Run N coroutines, fail fast on first error | asyncio.TaskGroup |
| Fire-and-forget background task |
create_task() + strong reference |
| Timeout an operation |
asyncio.timeout() or wait_for()
|
| Protect critical work from cancellation | asyncio.shield() |
| Debug a hung event loop |
all_tasks() + signal handler |
AsyncIO's model is simple once you internalize it. The bugs don't come from complexity — they come from assuming it works like threading, or from libraries that weren't built for async. Know your primitives, handle cancellation explicitly, and always keep a reference to your tasks.
Have a war story about async bugs in production? Drop it in the comments — the more specific, the better.
Tags: python asyncio fastapi backend productivity
Top comments (0)