One of the most potent standard library modules in Python is called "asyncio," but it's also one of the most misunderstood. When asyncio might work better, developers reach for threads or processes, or they use async/await everywhere without knowing what it truly does.
This article discusses the useful patterns you'll utilize on a daily basis after building a real mental model from the ground up.
The Core Problem: I/O Is Slow
Your CPU can execute billions of operations per second. A network request takes 50–500 milliseconds. Reading from disk takes milliseconds. If your code does this:
result1 = fetch_from_database(query1) # wait 100ms
result2 = call_external_api(url) # wait 200ms
result3 = read_large_file(path) # wait 50ms
# Total: ~350ms
Your computers brain, the CPU is not doing anything for most of the time. It is just waiting around.
Threading is a way to make use of this time. It does this by doing things at the same time. But Threading has some problems. It needs memory it has to switch between tasks and it can have bugs. There is also something called the GIL.
asyncio does things differently. Of doing many things at the same time it does one thing at a time.. When the thing it is doing is waiting, asyncio switches, to something else. This way, asyncio only uses one thread. It works together with the tasks so it does not use up too many resources.
The Event Loop
The thing about asyncio is that it is about the event loop. This event loop is like a scheduler. Here is what it does:
- It runs a coroutine until it gets to an
await. - Then it sets up the I/O operation.
- After that the event loop moves on, to another coroutine.
- The event loop comes back when the I/O operation is finished.
import asyncio
async def main():
print("Start")
await asyncio.sleep(1) # "I'm waiting — go run something else"
print("Done")
asyncio.run(main()) # Create an event loop and run main()
asyncio.run() is the modern entry point (Python 3.7+). It creates an event loop, runs your coroutine, and cleans up.
Coroutines: async def and await
A coroutine is a kind of function that you define with async def.
When you call it it does not start running away. Instead it gives you a coroutine object. You have to use await or schedule it to make it run. The coroutine only starts when you do one of these things. You have to tell it to go it won't go on its own. It needs await to begin.
import asyncio
async def greet(name: str):
print(f"Hello, {name}")
await asyncio.sleep(0.5)
print(f"Goodbye, {name}")
async def main():
# This runs sequentially — each greet waits for the previous to finish
await greet("Alice")
await greet("Bob")
asyncio.run(main())
Hello, Alice
Goodbye, Alice
Hello, Bob
Goodbye, Bob
Sequential, not concurrent. To get concurrency, you need asyncio.gather() or tasks.
asyncio.gather(): Run Coroutines Concurrently
import asyncio
async def fetch_user(user_id: int):
print(f"Fetching user {user_id}...")
await asyncio.sleep(1) # Simulate network call
return {"id": user_id, "name": f"User {user_id}"}
async def main():
# Run all three concurrently — total time ~1s, not ~3s
results = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
)
print(results)
asyncio.run(main())
Fetching user 1...
Fetching user 2...
Fetching user 3...
[{'id': 1, ...}, {'id': 2, ...}, {'id': 3, ...}]
All three started immediately; all three finished in ~1 second total.
Handling Errors in gather()
results = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(999),
return_exceptions=True, # Don't raise — return exceptions as values
)
for result in results:
if isinstance(result, Exception):
print(f"Error: {result}")
else:
print(result)
Tasks: Fire and Don't Wait
asyncio.create_task() schedules a coroutine to run concurrently without immediately awaiting it.
import asyncio
async def background_job(name: str, delay: float):
await asyncio.sleep(delay)
print(f"{name} completed after {delay}s")
async def main():
# Schedule tasks without waiting for them
task1 = asyncio.create_task(background_job("Job A", 2))
task2 = asyncio.create_task(background_job("Job B", 1))
print("Tasks started, doing other work...")
await asyncio.sleep(0.5)
print("Still doing other work...")
# Wait for both tasks to finish
await task1
await task2
print("All done")
asyncio.run(main())
Tasks started, doing other work...
Still doing other work...
Job B completed after 1s
Job A completed after 2s
All done
asyncio.wait() and as_completed()
More control over how you wait for multiple tasks:
import asyncio
async def slow_operation(n: int):
await asyncio.sleep(n * 0.5)
return n * 10
async def main():
tasks = [asyncio.create_task(slow_operation(i)) for i in range(1, 5)]
# Process results as they complete (not in order)
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"Got result: {result}")
asyncio.run(main())
Got result: 10 ← task 1 finished first
Got result: 20
Got result: 30
Got result: 40
Timeouts
import asyncio
async def slow_api_call():
await asyncio.sleep(10) # Takes forever
return {"data": "..."}
async def main():
try:
# Raise TimeoutError if it takes more than 2 seconds
result = await asyncio.wait_for(slow_api_call(), timeout=2.0)
except asyncio.TimeoutError:
print("Request timed out")
asyncio.run(main())
Python 3.11+ adds asyncio.timeout() as a context manager:
async def main():
try:
async with asyncio.timeout(2.0):
result = await slow_api_call()
other_result = await another_call()
except asyncio.TimeoutError:
print("Timed out")
Queues: Producer-Consumer Pattern
asyncio.Queue is perfect for worker pool patterns:
import asyncio
import random
async def producer(queue: asyncio.Queue, num_items: int):
for i in range(num_items):
await asyncio.sleep(random.uniform(0.1, 0.5))
item = f"task_{i}"
await queue.put(item)
print(f"Produced: {item}")
# Signal workers to stop
for _ in range(3): # one sentinel per worker
await queue.put(None)
async def worker(name: str, queue: asyncio.Queue):
while True:
item = await queue.get()
if item is None:
break
print(f" {name} processing {item}")
await asyncio.sleep(0.3) # simulate work
queue.task_done()
print(f" {name} shutting down")
async def main():
queue = asyncio.Queue(maxsize=5) # Buffer of 5 items
# Start 3 workers
workers = [
asyncio.create_task(worker(f"Worker-{i}", queue))
for i in range(3)
]
await producer(queue, 10)
await asyncio.gather(*workers)
asyncio.run(main())
Semaphores: Limit Concurrency
When you have 1000 URLs to fetch but don't want to hammer a server with 1000 simultaneous connections:
import asyncio
import httpx
async def fetch(client: httpx.AsyncClient, url: str, semaphore: asyncio.Semaphore):
async with semaphore: # Only N coroutines can enter at once
response = await client.get(url)
return response.status_code
async def main():
urls = [f"https://httpbin.org/delay/1?n={i}" for i in range(20)]
semaphore = asyncio.Semaphore(5) # Max 5 concurrent requests
async with httpx.AsyncClient() as client:
tasks = [fetch(client, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
Locks and Synchronization
When multiple coroutines access shared state:
import asyncio
class Counter:
def __init__(self):
self.value = 0
self._lock = asyncio.Lock()
async def increment(self):
async with self._lock:
current = self.value
await asyncio.sleep(0) # Yield to event loop (simulate real work)
self.value = current + 1
async def main():
counter = Counter()
tasks = [asyncio.create_task(counter.increment()) for _ in range(100)]
await asyncio.gather(*tasks)
print(f"Final count: {counter.value}") # 100, not some race-condition number
asyncio.run(main())
Other synchronization primitives:
-
asyncio.Event— signal between coroutines -
asyncio.Condition— wait for a condition to be true -
asyncio.Semaphore— limit concurrent access
Running Blocking Code in asyncio
Sometimes you need to call a blocking function (CPU-intensive or a non-async library) without blocking the event loop:
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# For blocking I/O (database drivers without async support, etc.)
async def read_file_async(path: str) -> str:
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as pool:
content = await loop.run_in_executor(pool, open(path).read)
return content
# For CPU-heavy work (data processing, image resizing, etc.)
def cpu_intensive_task(data: list) -> int:
return sum(x ** 2 for x in data)
async def main():
loop = asyncio.get_event_loop()
data = list(range(1_000_000))
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_intensive_task, data)
print(result)
asyncio.run(main())
Async Context Managers and Iterators
import asyncio
# Async context manager
class AsyncDatabase:
async def __aenter__(self):
print("Connecting to DB...")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, *args):
print("Closing DB connection")
await asyncio.sleep(0.05)
async def fetch(self, query: str):
await asyncio.sleep(0.1)
return [{"id": 1}, {"id": 2}]
# Async generator (async iterator)
async def paginate(db, query: str, page_size: int = 10):
page = 0
while True:
results = await db.fetch(f"{query} LIMIT {page_size} OFFSET {page * page_size}")
if not results:
break
for row in results:
yield row
page += 1
async def main():
async with AsyncDatabase() as db:
async for row in paginate(db, "SELECT * FROM users"):
print(row)
asyncio.run(main())
Common Mistakes
❌ Calling a coroutine without awaiting it
async def main():
fetch_data() # Does nothing! Returns a coroutine object silently.
await fetch_data() # Correct
❌ Using time.sleep() instead of asyncio.sleep()
import time
async def bad():
time.sleep(2) # ❌ Blocks the ENTIRE event loop for 2 seconds
async def good():
await asyncio.sleep(2) # ✅ Yields control — other coroutines run
❌ Forgetting asyncio.run() or running in wrong context
# ❌ This does nothing at the top level
async def main():
...
main() # Just creates a coroutine object
# ✅ Correct
asyncio.run(main())
❌ CPU-bound work on the event loop
async def bad():
result = sum(x**2 for x in range(10_000_000)) # ❌ Blocks event loop
return result
async def good():
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, lambda: sum(x**2 for x in range(10_000_000)))
return result
When to Use asyncio (and When Not To)
Use asyncio when:
- Making many concurrent network requests
- Building web servers or APIs (FastAPI, Starlette, aiohttp)
- Working with WebSockets or streaming data
- I/O-heavy scripts (database queries, file operations, API calls)
Don't use asyncio when:
- Your code is CPU-bound — use
multiprocessinginstead - Your entire stack uses synchronous libraries that don't support
async - The script is simple and sequential — it just adds boilerplate
Wrapping Up
asyncio is not about running things faster on a single CPU — it's about keeping your program productive while waiting for I/O. The mental model is simple: one thread, cooperative switching at every await.
Master gather(), create_task(), Semaphore, and Queue, and you'll have the tools to handle virtually any concurrent I/O workload Python throws at you.
Top comments (0)