DEV Community

Anna lilith
Anna lilith

Posted on

Python Async Concurrency Patterns for High-Performance Apps

Python Async Concurrency Patterns for High-Performance Apps

Tags: python, async, concurrency, performance

Why Async?

Python's GIL limits thread performance, but async/await provides cooperative multitasking for I/O-bound tasks. Perfect for APIs, web scrapers, and real-time apps.

Pattern 1: Basic Async HTTP

import asyncio
import httpx
from typing import List

async def fetch_url(client: httpx.AsyncClient, url: str) -> dict:
    """Fetch a single URL."""
    response = await client.get(url, timeout=10.0)
    return {"url": url, "status": response.status_code, "size": len(response.content)}

async def fetch_all(urls: List[str]) -> List[dict]:
    """Fetch multiple URLs concurrently."""
    async with httpx.AsyncClient() as client:
        tasks = [fetch_url(client, url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

# Run it
urls = ["https://api.github.com", "https://httpbin.org/get"]
results = asyncio.run(fetch_all(urls))
print(results)
Enter fullscreen mode Exit fullscreen mode

Pattern 2: Rate-Limited Concurrency

import asyncio
from asyncio import Semaphore
import httpx

class RateLimitedFetcher:
    """Control concurrency to avoid overwhelming APIs."""

    def __init__(self, max_concurrent: int = 10):
        self.semaphore = Semaphore(max_concurrent)
        self.client = httpx.AsyncClient()

    async def fetch(self, url: str) -> dict:
        async with self.semaphore:
            try:
                response = await self.client.get(url, timeout=10.0)
                return {"url": url, "status": response.status_code, "data": response.json()}
            except Exception as e:
                return {"url": url, "error": str(e)}

    async def fetch_all(self, urls: list) -> list:
        tasks = [self.fetch(url) for url in urls]
        return await asyncio.gather(*tasks)

    async def close(self):
        await self.client.aclose()

# Usage
async def main():
    fetcher = RateLimitedFetcher(max_concurrent=5)
    urls = [f"https://api.github.com/repos/python/cpython" for _ in range(20)]
    results = await fetcher.fetch_all(urls)
    await fetcher.close()
    return results

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Pattern 3: Async Producer-Consumer Queue

import asyncio
from asyncio import Queue
import random

async def producer(queue: Queue, name: str):
    """Generate items for processing."""
    for i in range(10):
        item = f"{name}-{i}"
        await queue.put(item)
        print(f"Produced: {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))
    await queue.put(None)  # Signal completion

async def consumer(queue: Queue, name: str):
    """Process items from queue."""
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"{name} processing: {item}")
        await asyncio.sleep(random.uniform(0.2, 1.0))
        queue.task_done()

async def main():
    queue = Queue(maxsize=5)

    # 1 producer, 3 consumers
    producers = [asyncio.create_task(producer(queue, f"P{i}")) for i in range(1)]
    consumers = [asyncio.create_task(consumer(queue, f"C{i}")) for i in range(3)]

    await asyncio.gather(*producers)
    await asyncio.gather(*consumers)

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Pattern 4: Async Context Managers

import asyncio
from contextlib import asynccontextmanager
import aiohttp

@asynccontextmanager
async def get_session():
    """Managed HTTP session."""
    session = aiohttp.ClientSession()
    try:
        yield session
    finally:
        await session.close()

async def fetch_with_session(url: str):
    async with get_session() as session:
        async with session.get(url) as response:
            return await response.json()

async def main():
    data = await fetch_with_session("https://httpbin.org/json")
    print(data)

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Performance Comparison

Pattern Throughput Memory Use Case
Sequential Low Low Simple scripts
Threading Medium Medium I/O-bound
Multiprocessing High High CPU-bound
Async Very High Low I/O-bound, many connections

Common Pitfalls

  1. Don't mix sync/async: Use asyncio.to_thread() for blocking I/O
  2. Handle exceptions: Use try/except in coroutines
  3. Set timeouts: Always use asyncio.wait_for() or client timeouts
  4. Avoid blocking: Never call time.sleep() in async code

Related Products

Need async templates and production patterns? Check out our async Python toolkits.

Browse the collection →

Top comments (1)

Collapse
 
marcusykim profile image
Marcus Kim

The semaphore-limited fetcher is a practical step up from raw asyncio.gather, especially with the httpx.AsyncClient timeout set explicitly. The producer-consumer section also points at the right production concern with Queue(maxsize=5): backpressure matters as much as raw throughput once traffic gets uneven. One small engineering note: with three consumers, I'd usually send one completion sentinel per consumer or cancel the workers after queue.join(), otherwise a demo like this can teach a shutdown pattern that hangs under load.