If you've been avoiding async/await in Python because it seems confusing, this guide will change that. We'll build real things, not toy examples.
Why Async?
Synchronous code waits. When you call an API, your program sits idle until the response comes back. Async lets you do other work during that wait.
# Synchronous: 10 API calls take ~10 seconds
for url in urls:
response = requests.get(url) # blocks here
# Async: 10 API calls take ~1 second
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks) # all at once
The Basics
import asyncio
async def fetch_data(name, delay):
print(f"Starting {name}")
await asyncio.sleep(delay) # non-blocking sleep
print(f"Finished {name}")
return f"{name}: data"
async def main():
# Run sequentially
result1 = await fetch_data("A", 2)
result2 = await fetch_data("B", 1)
# Total: ~3 seconds
# Run concurrently
results = await asyncio.gather(
fetch_data("A", 2),
fetch_data("B", 1),
)
# Total: ~2 seconds
asyncio.run(main())
Real Example: Parallel API Calls
import asyncio
import aiohttp
async def fetch_json(session, url):
async with session.get(url) as response:
return await response.json()
async def get_github_repos(usernames):
async with aiohttp.ClientSession() as session:
tasks = [
fetch_json(session, f"https://api.github.com/users/{u}/repos")
for u in usernames
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for username, result in zip(usernames, results):
if isinstance(result, Exception):
print(f"{username}: Error - {result}")
else:
print(f"{username}: {len(result)} repos")
asyncio.run(get_github_repos(["torvalds", "gvanrossum", "antirez"]))
Error Handling
asyncio.gather with return_exceptions=True prevents one failure from killing everything:
async def might_fail(n):
if n == 3:
raise ValueError("I don't like 3")
await asyncio.sleep(0.1)
return n * 2
async def main():
results = await asyncio.gather(
*[might_fail(i) for i in range(5)],
return_exceptions=True
)
for i, r in enumerate(results):
if isinstance(r, Exception):
print(f"Task {i} failed: {r}")
else:
print(f"Task {i} result: {r}")
asyncio.run(main())
Timeouts
Never let async tasks run forever:
async def slow_operation():
await asyncio.sleep(100)
return "finally"
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=5.0)
except asyncio.TimeoutError:
print("Operation timed out!")
asyncio.run(main())
Semaphores: Limiting Concurrency
Don't hammer APIs with 1000 concurrent requests:
async def fetch_with_limit(session, url, semaphore):
async with semaphore:
async with session.get(url) as response:
return await response.text()
async def main():
semaphore = asyncio.Semaphore(10) # max 10 concurrent
urls = [f"https://example.com/page/{i}" for i in range(100)]
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_limit(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
asyncio.run(main())
Async Context Managers and Iterators
class AsyncDB:
async def __aenter__(self):
self.conn = await create_connection()
return self
async def __aexit__(self, *args):
await self.conn.close()
async def fetch_rows(self, query):
cursor = await self.conn.execute(query)
async for row in cursor:
yield row
async def main():
async with AsyncDB() as db:
async for row in db.fetch_rows("SELECT * FROM users"):
print(row)
When NOT to Use Async
- CPU-bound work (use
multiprocessinginstead) - Simple scripts with few I/O operations
- When your libraries don't support async
Key Rules
-
awaitonly works insideasync def - Use
asyncio.run()as the entry point - Use
asyncio.gather()for concurrent tasks - Always set timeouts on network operations
- Use semaphores to limit concurrency
- Don't mix
requestswithasyncio— useaiohttp
Async Python isn't hard — it's just different. Start with gather() for parallel API calls and expand from there.
🚀 Level up your AI workflow! Check out my AI Developer Mega Prompt Pack — 80 battle-tested prompts for developers. $9.99
Top comments (0)