Here’s the story. Last month, my team lead tossed me a “small task”: revamp the company’s sentiment monitoring crawler. We needed to scrape 80,000 items daily, but the old synchronous script couldn’t finish before dawn, and the server kept running out of memory. I thought, “Just add some concurrency — ThreadPoolExecutor should do the trick.” The day after deployment, an alert call woke me up. The machine with 200 threads had barely touched its CPU, but the memory blew up, swap was maxed out, and the database connection pool was exhausted. A Go developer colleague walked by and said, “With all this I/O waiting, why aren’t you using asyncio?” That comment was worth three days of overtime pay.
If you’ve written Python backends, you’ve likely seen this pattern: the program spends most of its time not computing, but waiting — for HTTP responses, database results, Redis GETs. In those cases, threads or processes are like using a sledgehammer to swat a fly; the context switching overhead often outweighs the actual work. asyncio turns that around: it uses a single thread and an event loop to schedule all waiting tasks, switching to whichever becomes ready first. No time is wasted on pointless blocking.
Below, I’ll lay out the core logic and real pitfalls I encountered during this refactor. The code is ready to run.
How the Event Loop Pulls Off Its Magic
At its core, asyncio has three key players: the Event Loop, Coroutines, and Futures/Tasks. The event loop is a tireless scheduler, watching all registered tasks within a single thread. When a coroutine hits await, it effectively says, “I need to wait; you take care of the others.” The event loop immediately suspends it and moves on to the next ready task. The thread never blocks, so whether you have 100 tasks or 10,000, as long as the I/O doesn’t saturate the bandwidth, performance scales nearly linearly.
Here’s a minimal example:
import asyncio
import time
async def fetch_user(user_id: int) -> dict:
"""
模拟从远程 API 获取用户数据。
假设每次请求需要 1 秒网络延迟。
"""
await asyncio.sleep(1) # 模拟 IO 等待,不阻塞事件循环
return {"id": user_id, "name": f"User_{user_id}"}
async def main():
start = time.time()
# 同时发起 10 个请求
tasks = [fetch_user(i) for i in range(10)]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"完成 {len(results)} 个请求,总耗时 {elapsed:.2f} 秒")
# 如果同步执行需要 10 秒,这里只需要约 1 秒
asyncio.run(main())
The key is await asyncio.sleep(1). If you wrote time.sleep(1), the entire thread would freeze, dragging the event loop with it. With asyncio.sleep, control is immediately handed back, so many coroutines can “wait” concurrently. That’s the foundation of asyncio concurrency: overlapping wait times.
In practice, you won’t limit yourself to 10 concurrent requests, but firing 10,000 can easily take down the target server. A production-ready implementation needs concurrency control — enter asyncio.Semaphore.
Production-Grade Implementation: Semaphore Limiting + Exception Isolation + Timeout Safety Net
The code below comes from the refactored sentiment collection module, with three key improvements:
- Semaphore caps concurrency to prevent exhausting file descriptors or triggering rate limits.
- Each task has its own try/except, so one failing URL won’t affect the rest.
- asyncio.timeout provides a safety net to prevent a slow response from stalling the entire batch.
import asyncio
import aiohttp
from typing import List, Dict, Any
# 最大同时请求数,根据对方服务 QPS 和本机资源设定
MAX_CONCURRENT = 50
async def fetch_url(
session: aiohttp.ClientSession,
sem: asyncio.Semaphore,
url: str
) -> Dict[str, Any]:
"""
带限流和超时保护的单条采集逻辑。
"""
async with sem: # 超过 MAX_CONCURRENT 的协程会在这里阻塞等待
try:
# 设置单次请求超时 30 秒
async with asyncio.timeout(30):
async with session.get(url) as resp:
text = await resp.text()
return {"url": url, "status": resp.status, "length": len(text)}
except asyncio.TimeoutError:
return {"url": url, "error": "timeout", "length": 0}
except Exception as e:
return {"url": url, "error": str(e), "length": 0}
async def batch_fetch(urls: List[str]) -> List[Dict[str, Any]]:
"""
并发抓取所有 URL,返回结果列表。
"""
sem = asyncio.Semaphore(MAX_CONCURRENT)
# 复用同一个 session(内含连接池),极大减少 TCP 握手开销
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, sem, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# 使用示例
if __name__ == "__main__":
test_urls = [f"https://httpbin.org/delay/{i % 5}" for i in range(200)]
The Semaphore ensures only MAX_CONCURRENT requests run at any moment. When a request finishes or times out, the slot is released so another can start. By reusing the same ClientSession, we also significantly reduce TCP handshake overhead.
The result? Our crawler went from a sluggish overnight grind to finishing in minutes, with zero memory spikes. The CTO silently opened my performance review — but that’s a story for another day.
Top comments (0)