Python Async Concurrency Patterns for High-Performance Apps
Tags: python, async, concurrency, performance
Why Async?
Python's GIL limits thread performance, but async/await provides cooperative multitasking for I/O-bound tasks. Perfect for APIs, web scrapers, and real-time apps.
Pattern 1: Basic Async HTTP
import asyncio
import httpx
from typing import List
async def fetch_url(client: httpx.AsyncClient, url: str) -> dict:
"""Fetch a single URL."""
response = await client.get(url, timeout=10.0)
return {"url": url, "status": response.status_code, "size": len(response.content)}
async def fetch_all(urls: List[str]) -> List[dict]:
"""Fetch multiple URLs concurrently."""
async with httpx.AsyncClient() as client:
tasks = [fetch_url(client, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
# Run it
urls = ["https://api.github.com", "https://httpbin.org/get"]
results = asyncio.run(fetch_all(urls))
print(results)
Pattern 2: Rate-Limited Concurrency
import asyncio
from asyncio import Semaphore
import httpx
class RateLimitedFetcher:
"""Control concurrency to avoid overwhelming APIs."""
def __init__(self, max_concurrent: int = 10):
self.semaphore = Semaphore(max_concurrent)
self.client = httpx.AsyncClient()
async def fetch(self, url: str) -> dict:
async with self.semaphore:
try:
response = await self.client.get(url, timeout=10.0)
return {"url": url, "status": response.status_code, "data": response.json()}
except Exception as e:
return {"url": url, "error": str(e)}
async def fetch_all(self, urls: list) -> list:
tasks = [self.fetch(url) for url in urls]
return await asyncio.gather(*tasks)
async def close(self):
await self.client.aclose()
# Usage
async def main():
fetcher = RateLimitedFetcher(max_concurrent=5)
urls = [f"https://api.github.com/repos/python/cpython" for _ in range(20)]
results = await fetcher.fetch_all(urls)
await fetcher.close()
return results
asyncio.run(main())
Pattern 3: Async Producer-Consumer Queue
import asyncio
from asyncio import Queue
import random
async def producer(queue: Queue, name: str):
"""Generate items for processing."""
for i in range(10):
item = f"{name}-{i}"
await queue.put(item)
print(f"Produced: {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
await queue.put(None) # Signal completion
async def consumer(queue: Queue, name: str):
"""Process items from queue."""
while True:
item = await queue.get()
if item is None:
break
print(f"{name} processing: {item}")
await asyncio.sleep(random.uniform(0.2, 1.0))
queue.task_done()
async def main():
queue = Queue(maxsize=5)
# 1 producer, 3 consumers
producers = [asyncio.create_task(producer(queue, f"P{i}")) for i in range(1)]
consumers = [asyncio.create_task(consumer(queue, f"C{i}")) for i in range(3)]
await asyncio.gather(*producers)
await asyncio.gather(*consumers)
asyncio.run(main())
Pattern 4: Async Context Managers
import asyncio
from contextlib import asynccontextmanager
import aiohttp
@asynccontextmanager
async def get_session():
"""Managed HTTP session."""
session = aiohttp.ClientSession()
try:
yield session
finally:
await session.close()
async def fetch_with_session(url: str):
async with get_session() as session:
async with session.get(url) as response:
return await response.json()
async def main():
data = await fetch_with_session("https://httpbin.org/json")
print(data)
asyncio.run(main())
Performance Comparison
| Pattern | Throughput | Memory | Use Case |
|---|---|---|---|
| Sequential | Low | Low | Simple scripts |
| Threading | Medium | Medium | I/O-bound |
| Multiprocessing | High | High | CPU-bound |
| Async | Very High | Low | I/O-bound, many connections |
Common Pitfalls
-
Don't mix sync/async: Use
asyncio.to_thread()for blocking I/O -
Handle exceptions: Use
try/exceptin coroutines -
Set timeouts: Always use
asyncio.wait_for()or client timeouts -
Avoid blocking: Never call
time.sleep()in async code
Related Products
Need async templates and production patterns? Check out our async Python toolkits.
Top comments (1)
The semaphore-limited fetcher is a practical step up from raw
asyncio.gather, especially with thehttpx.AsyncClienttimeout set explicitly. The producer-consumer section also points at the right production concern withQueue(maxsize=5): backpressure matters as much as raw throughput once traffic gets uneven. One small engineering note: with three consumers, I'd usually send one completion sentinel per consumer or cancel the workers afterqueue.join(), otherwise a demo like this can teach a shutdown pattern that hangs under load.