Last Thursday afternoon, operations suddenly @ me in the group chat: "Dude, the activity page data is taking over 30 seconds to load, users are complaining." I opened the monitoring dashboard and saw that the data aggregation endpoint was making 12 serial downstream API calls internally, each taking 2-3 seconds, accumulating to over 30 seconds. I wrote this endpoint two years ago, thinking "features first," and now it became a bottleneck. I decided: go async. I spent an afternoon refactoring with asyncio, and after deployment, the stress test results made the backend group go wild — average response time dropped from 32 seconds to 2.1 seconds. This article will recap the whole process, sharing the code, the thinking, and the pitfalls I encountered, hoping to help others who are also tortured by synchronous blocking.
Why asyncio and not multithreading?
Many people's first reaction is: "Why not use a thread pool?" I tried concurrent.futures.ThreadPoolExecutor with all 12 downstream services fully loaded, but the thread context switching overhead plus the GIL contention ate CPU without increasing throughput, and timeout control was extremely painful. Our scenario is typical IO-bound: the endpoint spends 99% of its time waiting for network responses, with the CPU almost idle. This is exactly asyncio's sweet spot — a single-threaded event loop where tasks can be suspended without occupying threads, and switching costs are nearly zero.
Simply put, the synchronous model is like "lining up one by one to buy tickets," while asyncio is "everyone ordering on their phones at the same time, whoever responds first gets processed first."
Core solution: Concurrently hitting downstream APIs with aiohttp + gather
The first step in refactoring was choosing the right tool. I didn't build an HTTP client on bare asyncio, but went straight to aiohttp, which has native async support and excellent connection pool management. Here's the core code:
import asyncio
import aiohttp
from typing import List, Dict, Any
async def fetch_one(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]:
"""单个 API 调用,带超时和异常兜底"""
try:
# aiohttp 的 timeout 是总超时,包含连接+读取
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
resp.raise_for_status()
return await resp.json()
except Exception as e:
# 生产环境一定要接住,别让一个下游拖死整个聚合
return {"error": str(e), "url": url}
async def aggregate_data(api_urls: List[str]) -> List[Dict[str, Any]]:
"""并发调用所有下游,gather 收集结果"""
# 连接器:限制总连接数,防止打爆下游
connector = aiohttp.TCPConnector(limit=20, limit_per_host=5)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [fetch_one(session, url) for url in api_urls]
# return_exceptions=True: 不让单个异常中断 gather
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
Here, asyncio.gather is the key: 12 coroutines initiate requests concurrently, and the total time depends only on the slowest one. That means if each downstream takes 2-3 seconds, theoretically the total time can be squeezed to about 3 seconds. Adding connection establishment and JSON parsing overhead, the real-world performance of 2.1 seconds makes perfect sense.
Advanced optimization: Semaphore rate limiting + timeout circuit breaking
The code above is still not stable enough for production. If some downstream endpoints become extremely slow (e.g., suddenly 30 seconds), or a burst of traffic overwhelms the connection pool, we need finer control. I added two locks:
import asyncio
from asyncio import Semaphore
# 全局信号量:最多同时跑 8 个请求,防止下游被压垮
SEM = Semaphore(8)
async def fetch_one_limited(session: aiohttp.ClientSession, url: str) -> Dict[str, Any]:
async with SEM: # 获取槽位,超出数量的协程在此排队
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=3)) as resp:
return await resp.json()
except asyncio.TimeoutError:
return {"error": "timeout", "url": url}
except Exception as e:
return {"error": str(e), "url": url}
async def aggregate_with_limit(api_urls: List[str]) -> List[Dict[str, Any]]:
connector = aiohttp.TCPConnector(limit=20)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [fetch_one_limited(session, url) for url in api_urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
Semaphore(8) limits the number of requests "in flight" at the same time. Even if you pass in 100 URLs, at most 8 will run concurrently. This protects downstream services and avoids exhausting local ports.
Lessons learned: Hard-earned experience
Pitfall 1: Accidentally putting synchronous blocking code inside a coroutine
I initially used time.sleep(1) for simulation testing, and the result was the entire event loop got stuck, and all requests executed serially. Synchronous blocking calls must never appear inside a coroutine; you must use await asyncio.sleep(). A similar pitfall: calling requests inside a coroutine...
Top comments (0)