Here’s what happened: last week my manager asked me to optimize a data aggregation service that calls 20 downstream APIs. Running them sequentially took 18 seconds, and users were ready to smash their keyboards. I took one look and knew it was an IO-bound task — clearly a job for asyncio. I figured I’d wrap it up in half a day. Instead, I fell into one trap after another from 2 p.m. to 5 p.m., and the production service nearly collapsed. This post is a postmortem of the three big pitfalls I hit and how to write async code that actually works in the real world.
Core concepts, straight up
The heart of asyncio is a single-threaded event loop. Think of it as a master scheduler: it lines up all coroutines, and whenever one is waiting for IO it tells it to step aside and runs something that’s ready. The syntax boils down to two things: async def to define a coroutine, and await to yield control, essentially telling the event loop “I’ll be waiting here for a bit — go do something else.”
But most tutorials only show you this ideal scenario:
import asyncio
async def fetch(url):
await asyncio.sleep(1) # simulate network IO
return f"data from {url}"
async def main():
tasks = [fetch(f"api/{i}") for i in range(5)]
results = await asyncio.gather(*tasks) # run concurrently
print(results)
asyncio.run(main())
Clean and elegant — five requests in parallel, only 1 second total. But the moment you try to use this in a real project, the problems start.
Pitfall 1: await inside a synchronous function — instant error
I initially added await fetch() directly inside an existing Flask route function. I was rewarded with SyntaxError: 'await' outside async function. Fine, I changed the route to async def, thinking that would do it. Then the first request hit and I got RuntimeError: There is no current event loop in thread 'Thread-1'.
The reason: Flask uses a thread pool to handle requests by default, and those threads don’t come with an event loop. On top of that, you can’t just call asyncio.run(main()) inside a view that’s already running in a thread with an event loop — I triggered a cascade of “event loop is already running” errors.
The right approach: Either switch to an async-first framework like Quart or FastAPI. If you must stick with Flask, create a global event loop at startup and schedule work with loop.run_until_complete(), or — simpler yet — spin up a dedicated asyncio background thread and communicate with the web threads via a queue.
Pitfall 2: Synchronous blocking call inside a coroutine — performance gets worse
Feeling clever, I wrote asyncio.gather(*[call_api_blocking(i) for i in range(20)]), only to find the total time still hovered around 18 seconds. Logs showed each task finishing one after another before the next even started. It took me a while to spot: call_api_blocking was using requests.get(), which is synchronous and blocking. await was useless because the moment the event loop hit that first requests.get, the thread froze completely — no other coroutine could be scheduled.
asyncio only works with its own async IO primitives. When you must call a synchronous blocking function, you need to offload it to a thread pool with loop.run_in_executor():
async def call_api_async(url):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, requests.get, url)
That way the network block happens in a separate thread while the event loop immediately switches to another coroutine. Later I replaced requests entirely with aiohttp, and performance really took off. One rule to remember: async is all-or-nothing — don’t mix in rogue blocking calls.
Pitfall 3: Tasks not being cleaned up — memory creeps up
After the performance boost I was confident and deployed. Two days later the pod got OOMKilled. Monitoring showed memory slowly climbing with no GC collection. I eventually traced it to some hand-rolled “flexible concurrency control” code:
tasks = []
for url in urls:
task = asyncio.create_task(process(url))
tasks.append(task)
for t in tasks:
await t
Looks fine at first glance, but inside process(url) some code paths had early return statements, and some exceptions weren’t handled properly. That left tasks in a PENDING or CANCELLED state while still referenced by the tasks list. Those tasks held onto large chunks of response data, keeping the reference chain alive and unfreezable by the GC.
The fix: Use asyncio.TaskGroup (Python 3.11+) to manage the lifecycle automatically. If any task fails, it signals the others to cancel, and the structure is clean with no leaks:
async def main():
async with asyncio.TaskGroup() as tg:
for url in urls:
tg.create_task(process(url))
If you’re on an older Python version, be disciplined: cancel any incomplete tasks in a finally block and clear all references.
The complete, production‑ready skeleton
Here’s the core skeleton I rebuilt, ready to use. It includes a semaphore for concurrency control, aiohttp session reuse, exception isolation, and timeout handling:
import asyncio
import aiohttp
import time
from typing import List
class AsyncFetcher:
def __init__(self, concurrency: int = 10, timeout: int = 10):
self.sem = asyncio.Semaphore(concurrency) # limit concurrency to avoid overloading downstream
self.timeout = aiohttp.ClientTimeout(total=timeout)
async def fetch_one(self, session: aiohttp.ClientSession, url: str) -> dict:
async with self.sem:
try:
async with session.get(url, timeout=self.timeout) as resp:
data = await resp.json()
Top comments (0)