DEV Community

BAOFUFAN
BAOFUFAN

Posted on

Python asyncio Pitfalls: The Bug That Cost Me 3 Hours

Last week I got a requirement: fetch data from 30 third-party APIs, clean it, and store it in the database. I thought, easy — just requests and a for loop. But it took over 40 seconds, and the product manager stared daggers at me. “Can you make it faster?” — I gritted my teeth and said yes, then turned to asyncio. The 10x speed boost was awesome, but the pitfalls along the way… each one could keep you up at night questioning your life choices. Today I’m sharing these painfully real lessons so you can avoid the same detours.

Why asyncio? Let’s look at some numbers

I tested a simplified scenario: concurrently requesting 20 URLs, each with a ~200ms response time. The synchronous version took 4 seconds; the async version only 0.3 seconds — a 13x difference. The key reason: asyncio leverages an event loop + coroutines to make use of IO wait time. Instead of a thread waiting idly for a network response, it immediately switches to send the next request. Python’s GIL isn’t a bottleneck here because the bottleneck is IO, not CPU.

But those shiny numbers come with pitfalls. Below I’ll use runnable code examples to connect the theory with practice, and dig up the hidden traps the docs don’t mention.

Core principle: What the event loop actually does

The event loop is single-threaded. It maintains a task queue: when a coroutine awaits an IO operation, it yields control back to the loop, which immediately schedules another ready coroutine. Think of it like this: you’re the only waiter in a restaurant (single thread). When a customer places an order (makes a request), you don’t stand there waiting for the kitchen to cook (waiting for a response); you go serve the next customer. When the food is ready, the kitchen calls you (IO readiness callback), and you deliver it. That’s how one person can handle 20 tables.

Let’s feel it with code:

import asyncio
import time

async def fetch_one(url: str) -> str:
    """
    模拟一个IO操作:请求某个URL并返回数据
    用asyncio.sleep模拟网络延迟,不会阻塞事件循环
    """
    print(f"开始请求 {url}")
    await asyncio.sleep(0.2)   # 模拟200ms的网络IO
    print(f"完成请求 {url}")
    return f"{url} 的数据"

async def main_sync_like():
    """
    错误示范:把async当同步写,一个接一个await
    耗时 = N * 单任务耗时,协程的优势全没了
    """
    t0 = time.time()
    result1 = await fetch_one("url_1")
    result2 = await fetch_one("url_2")
    result3 = await fetch_one("url_3")
    print(f"耗时: {time.time() - t0:.2f}s")  # 约0.6s

async def main_async():
    """
    正确打开方式:用gather并发执行
    耗时 ≈ 最慢那一个任务的耗时
    """
    t0 = time.time()
    results = await asyncio.gather(
        fetch_one("url_1"),
        fetch_one("url_2"),
        fetch_one("url_3")
    )
    print(f"耗时: {time.time() - t0:.2f}s")  # 约0.2s
    print(results)

# Python 3.7+ 运行方式
# asyncio.run(main_async())
Enter fullscreen mode Exit fullscreen mode

asyncio.gather() is the key to concurrency: it registers multiple coroutines at once, returning only when all of them complete (or fail). Notice that all requests are fired “simultaneously”, so the total time is bounded by the slowest URL, not the sum. That’s how async can boost IO-bound performance by tens of times.

Production-ready concurrency: with timeouts and error handling

The previous code is too idealistic. In reality, third-party APIs can time out, crash, or return garbage data. Using a plain gather could crash the whole batch — absolutely unacceptable in production. We must add “airbags” to each task:

import asyncio
import aiohttp
import time
from typing import List, Dict, Any

API_URLS = [
    "https://httpbin.org/delay/0.1",
    "https://httpbin.org/delay/0.2",
    "https://httpbin.org/status/500",   # 故意放一个会炸的
    "https://httpbin.org/delay/0.3",
]

async def fetch_with_timeout(session: aiohttp.ClientSession,
                             url: str,
                             timeout: int = 5) -> Dict[str, Any]:
    """
    单个请求的协程:加超时、加异常捕获
    返回统一格式的字典,方便上游处理
    """
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as resp:
            data = await resp.json()
            return {"url": url, "status": resp.status, "data": data, "error": None}
    except asyncio.TimeoutError:
        return {"url": url, "status": None, "data": None, "error": "timeout"}
    except Exception as e:
        return {"url": url, "status": None, "data": None, "error": str(e)}

async def batch_fetch(urls: List[str], concurrency: int = 10) -> List[Dict[str, Any]]:
    """
    并发拉取一组URL,支持限制并发数(用Semaphore)
    返回所有结果,包含成功和失败的记录
    """
    semaphore = asyncio.Semaphore(concurrency)  # 防止把对方打爆
    async with aiohttp.ClientSession() as session:
        async def bo
Enter fullscreen mode Exit fullscreen mode

Notice how fetch_with_timeout uses aiohttp with a timeout and wraps everything in a try-except, returning a consistent dict. batch_fetch introduces a Semaphore to cap concurrency so you don’t accidentally hammer the third-party service. The inner async def bo (intentionally left incomplete) is a trap I’ll explain right after you’ve seen the real pitfall in action…

(The article continues with the exact bug that cost me 3 hours — a subtle mistake with Semaphore and closure scoping that I’ll dissect in the follow‑up post.)

Top comments (0)