Introduction
In many cases, we have to run several jobs concurrently. Most developers are likely familiar with multi-threading or multi-processing, both of which Python supports through ThreadPoolExecutor and ProcessPoolExecutor.
However, there is another powerful tool: the ‘coroutine’. While multi-threading and multi-processing are Hardware/OS-based methods, coroutines provide concurrency at the software level.
Python introduced coroutines via asyncio library of Python 3.5 (2015). For a long time, it was just a niche tool primarily for backend developers. However, since the LLM boom of the 2020s, coroutines have become an essential feature for AI engineers.
About this series
I think it would be difficult to cover everything about coroutines in a single post, which is why I created this coroutine series. I will explore the features of coroutine and their specific applications in AI step by step.
What is a Coroutine?
While multi-processing is hardware-based parallellism and multi-threading is OS-level concurrency, coroutines are software-based concurrency.
| Method | Multi-Processing | Multi-Threading | Coroutines (Asyncio) |
|---|---|---|---|
| Type | Hardware Parallelism | Kernel Concurrency | Software Concurrency |
| The Scheduler | The OS Kernel | The OS Kernel | The Python Event Loop |
| Switching Style | Preemptive (Forceful) | Preemptive (Forceful) | Cooperative (Polite) |
| Resource Cost | High (Separate Memory) | Medium (Stack + Kernel Objects) | Low (Tiny Object on Heap) |
| Awareness | The CPU knows. | The OS knows. | Only Python knows. |
Multi-processing requires multiple CPU logical cores, while multi-threading ostensibly utilizes multiple threads within a single process. However, as you may know, technically Python multi-threading is not truly capable of executing tasks simultaneously due to the Global Interpreter Lock (GIL). Consequently, it is better suited for I/O-bound concurrency rather than true parallellism.
For CPU-bound tasks: Threads must hold the GIL to execute bytecode. They contend for the lock, causing overhead. A multi-threaded CPU-bound program in Python is often slower than a single-threaded one due to lock contention and context switch overheads.
For I/O-bound tasks: The GIL is released when a thread performs a blocking I/O operation (e.g.,
socket.recv,time.sleep, file I/O). This allows other threads to run Python code while the first thread waits in the OS kernel. Consequently, standardthreadingis effective for concurrent I/O in Python.
But what if we don’t even need to allocate multiple threads in an execution? In most I/O-bound scenarios, having several threads is actually unnecessary—especially when you are making API calls and simply waiting for a response. This is precisely why Python introduced support for coroutines.
Why has it become essential?
As mentioned above, when coroutines were first introduced in Python, they were a niche tool used by only a small group of developers. Today, however, we rely heavily on LLMs, most of which are accessed through API calls. Unless you are running a model locally on your own PC, you must request an inference from a server—even in an on-premises environment. (What developer would write code on H100 while it is already serving an LLM model?)
When working with LLM APIs, over 90% of your execution time is spent waiting for a response. Instead of sitting idle while waiting, your process could be handling the next task, not just be laid back.
If you have ten concurrent requests, you don’t need to wait for the first user's prompt to finish. While waiting for the first response, you can send the second, third, and eventually all ten prompts almost simultaneously (provided your LLM server has enough throughput).
Furthermore, in the Agentic AI structure, many inferences are independent of one another. Then sending prompts to LLM servers concurrently can significantly reduce your total latency.
In this context, your Python process is not the Calculator; it is the Traffic Controller.
How can I leverage coroutines?
Python has a built-in library called asyncio that allows you to implement this.
Imagine you are making several cups of coffee and have multiple coffee machines. But you always use only one machine and do nothing while wating for one cup of coffee because you are too lazy to manage tasks concurrently. In code, it looks like this:
Input
import time
def brew_coffee(n: int) -> None:
print(f"Start brewing coffee #{n}...")
# This blocks the entire thread. Nothing else can happen.
time.sleep(5)
print(f"Coffee #{n} is ready!")
def main() -> None:
start = time.time()
# Sequential execution
for i in range(3):
brew_coffee(i + 1)
end = time.time()
print(f"Total time: {end - start:.2f} seconds")
if __name__ == "__main__":
main()
Output
Start brewing coffee #1...
Coffee #1 is ready!
Start brewing coffee #2...
Coffee #2 is ready!
Start brewing coffee #3...
Coffee #3 is ready!
Total time: 15.00 seconds
But as your cafe grows, you have to become more efficient now that you can’t keep customers waiting by using only one machine at a time. This is where you—the barista—act as a coroutine. You start one machine, and while it's brewing, you immediately start the second and third.
Input
import asyncio
import time
from typing import List, Coroutine, Any
## 1. Define a coroutine using 'async def'
async def brew_coffee(n: int) -> None:
print(f"Start brewing coffee #{n}...")
# 2. 'await' hands control back to the Event Loop
# This is non-blocking sleep.
await asyncio.sleep(5)
print(f"Coffee #{n} is ready!")
async def main() -> None:
start = time.time()
# 3. Create a list of coroutine objects (they haven't run yet!)
tasks: List[Coroutine[Any, Any, None]] = [brew_coffee(i + 1) for i in range(3)]
# 4. Schedule them concurrently and wait for all to finish
await asyncio.gather(*tasks)
end = time.time()
print("Coffee ready!")
print(f"Total time: {end - start:.2f} seconds")
if __name__ == "__main__":
# 5. The entry point for the async world
asyncio.run(main())
Output
Start brewing coffee #1...
Start brewing coffee #2...
Start brewing coffee #3...
Coffee #1 is ready!
Coffee #2 is ready!
Coffee #3 is ready!
Coffee ready!
Total time: 5.00 seconds
Now you can make three cups of coffee in just five seconds. You didn’t hire more workers, you didn’t move faster, and you didn’t buy extra equipment. You just took advantage of the idle time you previously wasted.
So, what exactly are coroutines, asyncio, await, and gather? Let’s dive into them one by one.
Coroutine functions
In Python, a standard block of code starting with def is simply called a function. But if you add async in front of it like async def, it becomes a coroutine function. This tells Python that the function is intended to run asynchronously. ‘Asynchronous’ means that tasks can be handled independently rather than waiting for one to finish before starting the next.
Asynchronous vs Synchronous
The dictionary definitions of these words can sometimes be confusing. Some people might think ‘synchronous jobs? Does it mean the jobs would be run at the same time?’ It helps to think of "synchronous" like "syncing" a device. When you sync two devices, they must be "in step" or connected to share data. Therefore, synchronous programming means tasks are executed in a strict sequence; the previous task must finish before the next one begins. On the other hand, asynchronous means tasks are decoupled, allowing the program to move on to other work without waiting for a specific operation to complete.
Synchronous tasks (Dependent): Washing your clothes and then hanging them out to dry. You cannot hang the laundry until the washer has finished its cycle.
Asynchronous tasks (Independent): Running the dishwasher and microwaving your lunch. You don't need to wait for the dishwasher to finish to start the microwave; both can happen at the same time.
async def and await
As mentioned earlier, the ‘async’ keyword allows you to define a coroutine. Creating one is simple, but running it requires a slightly different approach than a standard function.
import asyncio
async def brew_coffee(n):
print(f"Start brewing coffee #{n}...")
await asyncio.sleep(5)
print(f"Coffee #{n} is ready!")
This is the coffee example, but how can I run this coroutine? Same as normal function, just brew_coffee(1) will this be executed? If you run brew_coffee(1), you will see this output.
<coroutine object brew_coffee at 0x000002957D795BE0>
Unfortunately, it doesn’t start making coffee. Then how can I start it? You have to write await.
await brew_coffee(1)
Start brewing coffee #1...
Coffee #1 is ready!
Now you just made one cup of coffee. await means the process will wait until the job finishes. But wait a minute.. Wait until jobs done? Then what if I start two async jobs like this.
await brew_coffee(1)
await brew_coffee(2)
Then it will return this:
Start brewing coffee #1...
Coffee #1 is ready!
Start brewing coffee #2...
Coffee #2 is ready!
Right.. This was not run concurrently. If it waits on await, how can I start several jobs concurrently?
asyncio.create_task(brew_coffee(1))
asyncio.create_task(brew_coffee(2))
# Or as the first example code
works = [brew_coffee(1), brew_coffee(2)]
await asyncio.gather(*works)
This returns
Start brewing coffee #1...
Start brewing coffee #2...
Coffee #1 is ready!
Coffee #2 is ready!
Now it works!
What is await exactly?
When a coroutine encounters an await expression, it follows a three-step process:
Suspend: When
awaitis encountered, the coroutine saves its current instruction pointer and stack variables.Yield: Control is returned to the Event Loop.
Resume: When the awaited operation completes, the Event Loop restores the coroutine's state and resumes execution at the instruction following the
await.
Long story short, when a task hits await, it says, "I’m waiting for something; go ahead and run someone else." And Event Loop will kick off another task. Only after the awaited operation is complete will the loop return to the original task to finish the job.
But a word of caution: async and await are not a silver bullet! You must avoid using "blocking" I/O functions inside a coroutine.
The Danger of Blocking I/O
Consider this version of our coffee example. It looks almost identical to our successful coroutine code, but I have replaced asyncio.sleep(5) with time.sleep(5).
import asyncio
import time
from typing import List, Coroutine, Any
async def brew_coffee(n: int) -> None:
print(f"Start brewing coffee #{n}...")
time.sleep(5)
print(f"Coffee #{n} is ready!")
async def main() -> None:
start = time.time()
tasks: List[Coroutine[Any, Any, None]] = [brew_coffee(i + 1) for i in range(3)]
await asyncio.gather(*tasks)
end = time.time()
print("Coffee ready!")
print(f"Total time: {end - start:.2f} seconds")
if __name__ == "__main__":
asyncio.run(main())
Output
Start brewing coffee #1...
Coffee #1 is ready!
Start brewing coffee #2...
Coffee #2 is ready!
Start brewing coffee #3...
Coffee #3 is ready!
Coffee ready!
Total time: 15.00 seconds
Yes, this is not a coroutine. Why? Because time.sleep(5) is a blocking I/O operation. If you use blocking functions (like time.sleep, requests.get, or standard file reading) inside a coroutine, you effectively stop the entire "Traffic Controller" from working. For a coroutine to be effective, you must use non-blocking equivalents that know how to await.
- Note: You might think this mistake is too obvious to mention. But in production-level environments, you should be very careful about this issue. Real-world projects often consist of complex codebases with dozens or even hundreds of files. You must carefully audit your coroutines to ensure that no blocking I/O is hidden within whole downstream. One small blocking call can bring your entire high-performance system to a crawl.
Conclusion
In this post, we have explored the fundamentals of coroutines and how to implement them at a basic level. I will cover more advanced features of asyncio and demonstrate how to leverage them for AI inference in the next post of this series.
Top comments (0)