DEV Community

MEROLINE LIZLENT
MEROLINE LIZLENT

Posted on

Python asyncio

One of the most potent standard library modules in Python is called "asyncio," but it's also one of the most misunderstood. When asyncio might work better, developers reach for threads or processes, or they use async/await everywhere without knowing what it truly does.

This article discusses the useful patterns you'll utilize on a daily basis after building a real mental model from the ground up.

The Core Problem: I/O Is Slow

Your CPU can execute billions of operations per second. A network request takes 50–500 milliseconds. Reading from disk takes milliseconds. If your code does this:

result1 = fetch_from_database(query1)  # wait 100ms
result2 = call_external_api(url)       # wait 200ms
result3 = read_large_file(path)        # wait 50ms
# Total: ~350ms
Enter fullscreen mode Exit fullscreen mode

Your computers brain, the CPU is not doing anything for most of the time. It is just waiting around.

Threading is a way to make use of this time. It does this by doing things at the same time. But Threading has some problems. It needs memory it has to switch between tasks and it can have bugs. There is also something called the GIL.

asyncio does things differently. Of doing many things at the same time it does one thing at a time.. When the thing it is doing is waiting, asyncio switches, to something else. This way, asyncio only uses one thread. It works together with the tasks so it does not use up too many resources.

The Event Loop

The thing about asyncio is that it is about the event loop. This event loop is like a scheduler. Here is what it does:

  1. It runs a coroutine until it gets to an await.
  2. Then it sets up the I/O operation.
  3. After that the event loop moves on, to another coroutine.
  4. The event loop comes back when the I/O operation is finished.
import asyncio

async def main():
    print("Start")
    await asyncio.sleep(1)  # "I'm waiting — go run something else"
    print("Done")

asyncio.run(main())  # Create an event loop and run main()
Enter fullscreen mode Exit fullscreen mode

asyncio.run() is the modern entry point (Python 3.7+). It creates an event loop, runs your coroutine, and cleans up.

Coroutines: async def and await

A coroutine is a kind of function that you define with async def.
When you call it it does not start running away. Instead it gives you a coroutine object. You have to use await or schedule it to make it run. The coroutine only starts when you do one of these things. You have to tell it to go it won't go on its own. It needs await to begin.

import asyncio

async def greet(name: str):
    print(f"Hello, {name}")
    await asyncio.sleep(0.5)
    print(f"Goodbye, {name}")

async def main():
    # This runs sequentially — each greet waits for the previous to finish
    await greet("Alice")
    await greet("Bob")

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode
Hello, Alice
Goodbye, Alice
Hello, Bob
Goodbye, Bob
Enter fullscreen mode Exit fullscreen mode

Sequential, not concurrent. To get concurrency, you need asyncio.gather() or tasks.

asyncio.gather(): Run Coroutines Concurrently

import asyncio

async def fetch_user(user_id: int):
    print(f"Fetching user {user_id}...")
    await asyncio.sleep(1)  # Simulate network call
    return {"id": user_id, "name": f"User {user_id}"}

async def main():
    # Run all three concurrently — total time ~1s, not ~3s
    results = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
    )
    print(results)

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode
Fetching user 1...
Fetching user 2...
Fetching user 3...
[{'id': 1, ...}, {'id': 2, ...}, {'id': 3, ...}]
Enter fullscreen mode Exit fullscreen mode

All three started immediately; all three finished in ~1 second total.

Handling Errors in gather()

results = await asyncio.gather(
    fetch_user(1),
    fetch_user(2),
    fetch_user(999),
    return_exceptions=True,  # Don't raise — return exceptions as values
)

for result in results:
    if isinstance(result, Exception):
        print(f"Error: {result}")
    else:
        print(result)
Enter fullscreen mode Exit fullscreen mode

Tasks: Fire and Don't Wait

asyncio.create_task() schedules a coroutine to run concurrently without immediately awaiting it.

import asyncio

async def background_job(name: str, delay: float):
    await asyncio.sleep(delay)
    print(f"{name} completed after {delay}s")

async def main():
    # Schedule tasks without waiting for them
    task1 = asyncio.create_task(background_job("Job A", 2))
    task2 = asyncio.create_task(background_job("Job B", 1))

    print("Tasks started, doing other work...")
    await asyncio.sleep(0.5)
    print("Still doing other work...")

    # Wait for both tasks to finish
    await task1
    await task2
    print("All done")

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode
Tasks started, doing other work...
Still doing other work...
Job B completed after 1s
Job A completed after 2s
All done
Enter fullscreen mode Exit fullscreen mode

asyncio.wait() and as_completed()

More control over how you wait for multiple tasks:

import asyncio

async def slow_operation(n: int):
    await asyncio.sleep(n * 0.5)
    return n * 10

async def main():
    tasks = [asyncio.create_task(slow_operation(i)) for i in range(1, 5)]

    # Process results as they complete (not in order)
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"Got result: {result}")

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode
Got result: 10   ← task 1 finished first
Got result: 20
Got result: 30
Got result: 40
Enter fullscreen mode Exit fullscreen mode

Timeouts

import asyncio

async def slow_api_call():
    await asyncio.sleep(10)  # Takes forever
    return {"data": "..."}

async def main():
    try:
        # Raise TimeoutError if it takes more than 2 seconds
        result = await asyncio.wait_for(slow_api_call(), timeout=2.0)
    except asyncio.TimeoutError:
        print("Request timed out")

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Python 3.11+ adds asyncio.timeout() as a context manager:

async def main():
    try:
        async with asyncio.timeout(2.0):
            result = await slow_api_call()
            other_result = await another_call()
    except asyncio.TimeoutError:
        print("Timed out")
Enter fullscreen mode Exit fullscreen mode

Queues: Producer-Consumer Pattern

asyncio.Queue is perfect for worker pool patterns:

import asyncio
import random

async def producer(queue: asyncio.Queue, num_items: int):
    for i in range(num_items):
        await asyncio.sleep(random.uniform(0.1, 0.5))
        item = f"task_{i}"
        await queue.put(item)
        print(f"Produced: {item}")
    # Signal workers to stop
    for _ in range(3):  # one sentinel per worker
        await queue.put(None)

async def worker(name: str, queue: asyncio.Queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"  {name} processing {item}")
        await asyncio.sleep(0.3)  # simulate work
        queue.task_done()
    print(f"  {name} shutting down")

async def main():
    queue = asyncio.Queue(maxsize=5)  # Buffer of 5 items

    # Start 3 workers
    workers = [
        asyncio.create_task(worker(f"Worker-{i}", queue))
        for i in range(3)
    ]

    await producer(queue, 10)
    await asyncio.gather(*workers)

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Semaphores: Limit Concurrency

When you have 1000 URLs to fetch but don't want to hammer a server with 1000 simultaneous connections:

import asyncio
import httpx

async def fetch(client: httpx.AsyncClient, url: str, semaphore: asyncio.Semaphore):
    async with semaphore:  # Only N coroutines can enter at once
        response = await client.get(url)
        return response.status_code

async def main():
    urls = [f"https://httpbin.org/delay/1?n={i}" for i in range(20)]
    semaphore = asyncio.Semaphore(5)  # Max 5 concurrent requests

    async with httpx.AsyncClient() as client:
        tasks = [fetch(client, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)

    print(results)

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Locks and Synchronization

When multiple coroutines access shared state:

import asyncio

class Counter:
    def __init__(self):
        self.value = 0
        self._lock = asyncio.Lock()

    async def increment(self):
        async with self._lock:
            current = self.value
            await asyncio.sleep(0)  # Yield to event loop (simulate real work)
            self.value = current + 1

async def main():
    counter = Counter()
    tasks = [asyncio.create_task(counter.increment()) for _ in range(100)]
    await asyncio.gather(*tasks)
    print(f"Final count: {counter.value}")  # 100, not some race-condition number

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Other synchronization primitives:

  • asyncio.Event — signal between coroutines
  • asyncio.Condition — wait for a condition to be true
  • asyncio.Semaphore — limit concurrent access

Running Blocking Code in asyncio

Sometimes you need to call a blocking function (CPU-intensive or a non-async library) without blocking the event loop:

import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# For blocking I/O (database drivers without async support, etc.)
async def read_file_async(path: str) -> str:
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor() as pool:
        content = await loop.run_in_executor(pool, open(path).read)
    return content

# For CPU-heavy work (data processing, image resizing, etc.)
def cpu_intensive_task(data: list) -> int:
    return sum(x ** 2 for x in data)

async def main():
    loop = asyncio.get_event_loop()
    data = list(range(1_000_000))

    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_intensive_task, data)
    print(result)

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Async Context Managers and Iterators

import asyncio

# Async context manager
class AsyncDatabase:
    async def __aenter__(self):
        print("Connecting to DB...")
        await asyncio.sleep(0.1)
        return self

    async def __aexit__(self, *args):
        print("Closing DB connection")
        await asyncio.sleep(0.05)

    async def fetch(self, query: str):
        await asyncio.sleep(0.1)
        return [{"id": 1}, {"id": 2}]

# Async generator (async iterator)
async def paginate(db, query: str, page_size: int = 10):
    page = 0
    while True:
        results = await db.fetch(f"{query} LIMIT {page_size} OFFSET {page * page_size}")
        if not results:
            break
        for row in results:
            yield row
        page += 1

async def main():
    async with AsyncDatabase() as db:
        async for row in paginate(db, "SELECT * FROM users"):
            print(row)

asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

Common Mistakes

❌ Calling a coroutine without awaiting it

async def main():
    fetch_data()  # Does nothing! Returns a coroutine object silently.
    await fetch_data()  # Correct
Enter fullscreen mode Exit fullscreen mode

❌ Using time.sleep() instead of asyncio.sleep()

import time

async def bad():
    time.sleep(2)  # ❌ Blocks the ENTIRE event loop for 2 seconds

async def good():
    await asyncio.sleep(2)  # ✅ Yields control — other coroutines run
Enter fullscreen mode Exit fullscreen mode

❌ Forgetting asyncio.run() or running in wrong context

# ❌ This does nothing at the top level
async def main():
    ...

main()  # Just creates a coroutine object

# ✅ Correct
asyncio.run(main())
Enter fullscreen mode Exit fullscreen mode

❌ CPU-bound work on the event loop

async def bad():
    result = sum(x**2 for x in range(10_000_000))  # ❌ Blocks event loop
    return result

async def good():
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, lambda: sum(x**2 for x in range(10_000_000)))
    return result
Enter fullscreen mode Exit fullscreen mode

When to Use asyncio (and When Not To)

Use asyncio when:

  • Making many concurrent network requests
  • Building web servers or APIs (FastAPI, Starlette, aiohttp)
  • Working with WebSockets or streaming data
  • I/O-heavy scripts (database queries, file operations, API calls)

Don't use asyncio when:

  • Your code is CPU-bound — use multiprocessing instead
  • Your entire stack uses synchronous libraries that don't support async
  • The script is simple and sequential — it just adds boilerplate

Wrapping Up

asyncio is not about running things faster on a single CPU — it's about keeping your program productive while waiting for I/O. The mental model is simple: one thread, cooperative switching at every await.

Master gather(), create_task(), Semaphore, and Queue, and you'll have the tools to handle virtually any concurrent I/O workload Python throws at you.

Top comments (0)