Last week, I finally snapped. Our “legacy” news aggregator was crawling 200 sites in 8 minutes, with two database timeouts along the way. Ops complained it was “slower than a tortoise,” the product manager asked, “Can we get it under 1 minute?” I said: give me half a day, and I’ll rewrite it with asyncio.
The result? Total time dropped from 487 seconds to 32 seconds — a 15x speedup. My boss walked past my desk, glanced at the screen, and literally said, “Whoa, now that’s the speed it should be.” Today I’ll walk you through that refactor — no textbook fluff, just real, battle‑tested takeaways.
Why asyncio, not threading?
When faced with I/O‑bound tasks, many folks reach for concurrent.futures and thread pools. But threads come with GIL overhead, context‑switching costs, and let’s be honest — a crawler spends 99% of its time waiting for network responses. Using OS threads to “wait for I/O” is like hiring a fleet of drivers just to have them sit in their cars.
asyncio takes a different approach: single thread + event loop. When a coroutine is waiting for a network response, it voluntarily yields control (await), and the event loop immediately switches to another coroutine that’s ready to run. No thread‑switching overhead, no lock contention, minimal memory footprint.
Three core ingredients:
- Event loop – the scheduler; it runs whatever is ready.
-
Coroutines –
async deffunctions that suspend withawait. - Futures/Tasks – wrappers around coroutines that let you wait for results.
It’s a completely different mindset from synchronous code — you have to get comfortable thinking concurrently.
The refactor: from blocking sync to async concurrency
Let’s start with the synchronous crawler I inherited (simplified core logic):
import time
import requests
URLS = [f"https://httpbin.org/delay/1?id={i}" for i in range(10)]
def fetch_sync(url: str) -> str:
# each request blocks for 1 second (simulating network I/O)
resp = requests.get(url, timeout=5)
return resp.json()["url"]
start = time.perf_counter()
results = [fetch_sync(url) for url in URLS]
elapsed = time.perf_counter() - start
print(f"同步耗时: {elapsed:.2f}s, 结果数: {len(results)}")
# Output: 同步耗时: 10.12s, 结果数: 10
Ten requests, each taking 1 second, executed one after another — naturally that’s 10 seconds. Who can put up with that?
Converting to asyncio boils down to two steps: swap the I/O function for its async counterpart, then schedule everything concurrently.
import asyncio
import aiohttp
import time
URLS = [f"https://httpbin.org/delay/1?id={i}" for i in range(10)]
async def fetch_async(session: aiohttp.ClientSession, url: str) -> str:
# aiohttp async request — await yields control
async with session.get(url, timeout=5) as resp:
data = await resp.json()
return data["url"]
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch_async(session, url) for url in URLS]
results = await asyncio.gather(*tasks) # run all coroutines concurrently
return results
start = time.perf_counter()
results = asyncio.run(main())
elapsed = time.perf_counter() - start
print(f"异步耗时: {elapsed:.2f}s, 结果数: {len(results)}")
# Output: 异步耗时: 1.05s, 结果数: 10
asyncio.gather() fires off all 10 coroutines at once, so the total time is roughly that of the slowest request (1 second) instead of the sum. That’s the magic of the event loop: while coroutine 1 is waiting on I/O, the loop is already running coroutine 2, 3, … until a response arrives and control is handed back.
Going deeper: semaphores and error handling — don’t let async become chaos
If you think the snippet above is production‑ready, you’re probably in for a rude awakening. The first pitfall I hit was unlimited concurrency. When the URL list grew from 10 to 2,000, the target server instantly banned my IP — because I had opened 2,000 TCP connections at once.
The fix: asyncio.Semaphore, to cap the number of simultaneous coroutines.
async def fetch_with_limit(session, url, sem, retries=3):
async with sem: # semaphore controls how many coroutines run at once
for attempt in range(retries):
try:
async with session.get(url, timeout=5) as resp:
if resp.status == 200:
return await resp.json()
else:
raise Exception(f"HTTP {resp.status}")
except Exception as e:
if attempt == retries - 1:
print(f"请求失败: {url}, 错误: {e}")
return None
await asyncio.sleep(2 ** attempt) # exponential backoff
async def main_with_limit():
sem = asyncio.Semaphore(50) # max 50 concurrent requests
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(session, url, sem) fo
Top comments (0)