Introduction
In this post, I will briefly introduce how to utilize coroutines for LLMs. Using asyncio for LLM inference is straightforward because most AI frameworks support asyncio natively nowadays.
Coroutines for LLM
LLM API SDK
API SDKs—Google GenAI, OpenAI, Claude, and others— provide coroutine functions. You can easily leverage coroutines for AI like this:
# Synchronous inference function of OpenAI SDK
client = OpenAI()
response = client.chat.completions.create(
model="gpt-5-mini",
messages=[{"role": "user", "content": prompt}],
)
# Coroutine function of OpenAI SDK
client = AsyncOpenAI()
response = await client.chat.completions.create(
model="gpt-5-mini",
messages=[{"role": "user", "content": prompt}],
)
# Synchronous inference function of Google GenAI SDK
client = genai.Client()
response = client.models.generate_content(
model="gemini-2.5-flash", contents=prompt
)
# Coroutine function of Google GenAI SDK
client = genai.Client()
response = await client.aio.models.generate_content(
model="gemini-2.5-flash", contents=prompt
)
For the OpenAI SDK, there’s a specific client for async. For Google GenAI, you should add aio after client. Other SDKs, such as Claude and Ollama, also support coroutine functions.
Example comparing async and sync functions
Input
import asyncio
import time
import os
import logging
from dotenv import load_dotenv
from google import genai
from openai import OpenAI, AsyncOpenAI
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S')
logger = logging.getLogger(__name__)
load_dotenv()
# --- Sync Functions ---
def sync_chat_google(prompt: str):
logger.info("Starting Sync Google GenAI request...")
try:
client = genai.Client()
response = client.models.generate_content(
model="gemini-2.5-flash", contents=prompt
)
logger.info("Finished Sync Google GenAI request.")
return f"Google (Sync): {response.text[:50]}..."
except Exception as e:
logger.error(f"Sync Google Error: {e}")
return f"Google (Sync) Error: {e}"
def sync_chat_openai(prompt: str):
logger.info("Starting Sync OpenAI request...")
try:
client = OpenAI()
response = client.chat.completions.create(
model="gpt-5-mini",
messages=[{"role": "user", "content": prompt}],
reasoning_effort="low"
)
logger.info("Finished Sync OpenAI request.")
return f"OpenAI (Sync): {response.choices[0].message.content[:50]}..."
except Exception as e:
logger.error(f"Sync OpenAI Error: {e}")
return f"OpenAI (Sync) Error: {e}"
def run_sync():
logger.info("--- Starting Sync Execution ---")
start_time = time.time()
prompt = "Explain asyncio in one sentence."
res_google = sync_chat_google(prompt)
res_openai = sync_chat_openai(prompt)
end_time = time.time()
total_time = end_time - start_time
print(f"\n[Sync Results]")
print(res_google)
print(res_openai)
print(f"Total Sync Time: {total_time:.2f} seconds\n")
return total_time
# --- Async Functions ---
async def async_chat_google(prompt: str):
logger.info("Starting Async Google GenAI request...")
try:
client = genai.Client()
response = await client.aio.models.generate_content(
model="gemini-2.5-flash", contents=prompt
)
logger.info("Finished Async Google GenAI request.")
return f"Google (Async): {response.text[:50]}..."
except Exception as e:
logger.error(f"Async Google Error: {e}")
return f"Google (Async) Error: {e}"
async def async_chat_openai(prompt: str):
logger.info("Starting Async OpenAI request...")
try:
client = AsyncOpenAI()
response = await client.chat.completions.create(
model="gpt-5-mini",
messages=[{"role": "user", "content": prompt}],
reasoning_effort="low"
)
logger.info("Finished Async OpenAI request.")
return f"OpenAI (Async): {response.choices[0].message.content[:50]}..."
except Exception as e:
logger.error(f"Async OpenAI Error: {e}")
return f"OpenAI (Async) Error: {e}"
async def run_async():
logger.info("--- Starting Async Execution ---")
start_time = time.time()
prompt = "Explain asyncio in one sentence."
# Schedule both calls concurrently
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(async_chat_google(prompt))
task2 = tg.create_task(async_chat_openai(prompt))
results = [task1.result(), task2.result()]
end_time = time.time()
total_time = end_time - start_time
print(f"\n[Async Results]")
for res in results:
print(res)
print(f"Total Async Time: {total_time:.2f} seconds\n")
return total_time
async def main():
logger.info("Starting Comparison...")
sync_time = run_sync()
async_time = await run_async()
print("-" * 30)
print(f"Sync Time: {sync_time:.2f}s")
print(f"Async Time: {async_time:.2f}s")
if async_time < sync_time:
print(f"Conclusion: Async was {sync_time / async_time:.2f}x faster!")
else:
print("Conclusion: Async was not faster (overhead or network variance).")
if __name__ == "__main__":
asyncio.run(main())
Output
00:16:17 - INFO - Starting Comparison...
00:16:17 - INFO - --- Starting Sync Execution ---
00:16:17 - INFO - Starting Sync Google GenAI request...
00:16:18 - INFO - AFC is enabled with max remote calls: 10.
00:16:24 - INFO - HTTP Request: POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent "HTTP/1.1 200 OK"
00:16:24 - INFO - Finished Sync Google GenAI request.
00:16:24 - INFO - Starting Sync OpenAI request...
00:16:28 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
00:16:28 - INFO - Finished Sync OpenAI request.
[Sync Results]
Google (Sync): Asyncio is Python's library for writing concurrent...
OpenAI (Sync): asyncio is Python's library for writing concurrent...
Total Sync Time: 10.36 seconds
00:16:28 - INFO - --- Starting Async Execution ---
00:16:28 - INFO - Starting Async Google GenAI request...
00:16:28 - INFO - AFC is enabled with max remote calls: 10.
00:16:28 - INFO - Starting Async OpenAI request...
00:16:31 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
00:16:31 - INFO - Finished Async OpenAI request.
00:16:35 - INFO - HTTP Request: POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent "HTTP/1.1 200 OK"
00:16:35 - INFO - Finished Async Google GenAI request.
[Async Results]
Google (Async): asyncio is Python's library for writing concurrent...
OpenAI (Async): asyncio is Python's standard-library framework for...
Total Async Time: 6.91 seconds
------------------------------
Sync Time: 10.36s
Async Time: 6.91s
Conclusion: Async was 1.50x faster!
As you can see, the async coroutine functions are 1.5x faster because they call the OpenAI API and the Google API concurrently. On the other hand, the synchronous functions call the Google Genai API first, wait for the response, and then subsequently call the OpenAI API.
Sync vs Async
When you build complex AI inference architectures, such as Agentic AI, you should think carefully about whether to utilize async. It depends on the specific requirements of your architecture.
Sequential Dependencies: When some inferences depend on the previous ones—for example, if a previous step retrieves necessary context (like in RAG), or the result of one inference must be included in the prompt of the subsequent inference—you generally execute them sequentially.
Independent Tasks: When you run independent inferences that do not rely on each other, you can leverage coroutines to run them concurrently.
Example
import asyncio
import time
import os
import logging
from dotenv import load_dotenv
from typing import List
from langchain_google_genai import ChatGoogleGenerativeAI
import ollama
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S')
logger = logging.getLogger(__name__)
load_dotenv()
# --- 1. Async Retrieval Simulation ---
async def retrieve_docs_db(query: str) -> str:
"""Simulates retrieving documents from a database (IO bound)."""
logger.info(f"[DB] Searching for: '{query}'...")
await asyncio.sleep(2) # Simulate network/DB latency
result = "Context from DB: Asyncio is single-threaded but concurrent."
logger.info(f"[DB] Found: {result}")
return result
async def retrieve_docs_web(query: str) -> str:
"""Simulates retrieving documents from the web (IO bound)."""
logger.info(f"[Web] Searching for: '{query}'...")
await asyncio.sleep(2) # Simulate network latency
result = "Context from Web: Asyncio uses an event loop to manage tasks."
logger.info(f"[Web] Found: {result}")
return result
# --- 2. Sync Ollama Inference ---
def query_ollama(context: List[str], question: str) -> str:
combined_context = "\n".join(context)
prompt = f"Context:\n{combined_context}\n\nQuestion: {question}\n\nAnswer:"
logger.info("Starting Ollama generation...")
try:
client = ollama.Client()
model = "qwen3:8b"
response = client.chat(model=model, messages=[
{'role': 'user', 'content': prompt},
])
answer = response['message']['content']
logger.info("Finished Ollama generation.")
return answer
except Exception as e:
logger.error(f"Ollama Error: {e}")
return f"Ollama Error: {e}"
# --- 3. Sync Google Refinement ---
def refine_with_google(draft_answer: str) -> str:
logger.info("Starting Google Refinement...")
try:
prompt = f"Please refine and polish the following text to be more professional:\n\n{draft_answer}"
genai_llm = ChatGoogleGenerativeAI(
model="gemini-3-flash-preview",
thinking_level="low"
)
response = genai_llm.invoke(prompt)
logger.info("Finished Google Refinement.")
return response.text
except Exception as e:
logger.error(f"Google Refinement Error: {e}")
return f"Google Error: {e}"
# --- Main Flow ---
async def main():
start_total = time.time()
query = "How does asyncio work?"
print(f"\n=== Starting Concurrent RAG Demo ===")
print(f"Query: {query}\n")
# Step 1: Concurrent Retrieval
logger.info("--- Step 1: Concurrent Retrieval ---")
start_retrieval = time.time()
# Launch both retrievals at the same time
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(retrieve_docs_db(query))
task2 = tg.create_task(retrieve_docs_web(query))
results = [task1.result(), task2.result()]
retrieval_time = time.time() - start_retrieval
logger.info(f"Retrieval complete in {retrieval_time:.2f}s")
# Step 2: Sync Ollama Inference (Dependent on Step 1)
logger.info("--- Step 2: Sync Ollama Inference ---")
start_ollama = time.time()
draft_answer = query_ollama(results, query)
ollama_time = time.time() - start_ollama
print(f"\n[Ollama Draft]:\n{draft_answer}\n")
# Step 3: Sync Google Refinement (Dependent on Step 2)
logger.info("--- Step 3: Sync Google Refinement ---")
start_google = time.time()
refined_answer = refine_with_google(draft_answer)
google_time = time.time() - start_google
print(f"\n[Google Refined]:\n{refined_answer}\n")
total_time = time.time() - start_total
print("=" * 40)
print(f"Retrieval Time: {retrieval_time:.2f}s")
print(f"Ollama Time: {ollama_time:.2f}s")
print(f"Google Time: {google_time:.2f}s")
print(f"Total Time: {total_time:.2f}s")
print("=" * 40)
if __name__ == "__main__":
asyncio.run(main())
Output
=== Starting Concurrent RAG Demo ===
Query: How does asyncio work?
20:05:38 - INFO - --- Step 1: Concurrent Retrieval ---
20:05:38 - INFO - [DB] Searching for: 'How does asyncio work?'...
20:05:38 - INFO - [Web] Searching for: 'How does asyncio work?'...
20:05:40 - INFO - [DB] Found: Context from DB: Asyncio is single-threaded but concurrent.
20:05:40 - INFO - [Web] Found: Context from Web: Asyncio uses an event loop to manage tasks.
20:05:40 - INFO - Retrieval complete in 2.01s
20:05:40 - INFO - --- Step 2: Sync Ollama Inference ---
20:05:40 - INFO - Starting Ollama generation...
20:05:57 - INFO - HTTP Request: POST http://127.0.0.1:11434/api/chat "HTTP/1.1 200 OK"
20:05:57 - INFO - Finished Ollama generation.
[Ollama Draft]:
Asyncio is a Python library that enables **single-threaded concurrency** through **asynchronous I/O** and an **event loop**. Here's how it works:
### 1. **Single-Threaded, Concurrent Execution**
- Asyncio operates in a **single thread**, avoiding the overhead of multi-threading. Instead of using multiple threads, it leverages **non-blocking I/O** to handle multiple tasks concurrently.
- Concurrency here means tasks can **overlap** in execution, even though they run in the same thread. This is ideal for **I/O-bound tasks** (e.g., network requests, file reads) where waiting for I/O is common.
### 2. **Event Loop as the Core Mechanism**
- The **event loop** is the heart of asyncio. It manages and schedules tasks, handling I/O operations and switching between coroutines when needed.
- When a coroutine (an `async def` function) is started, the event loop schedules it. If the coroutine encounters an I/O operation (e.g., a network call), it **yields control** back to the event loop, allowing
other tasks to run in the meantime.
### 3. **Coroutines and `await`**
- Coroutines are defined using `async def`. They can be paused and resumed, enabling **cooperative multitasking**.
- The `await` keyword is used to **delegate control** to another coroutine or I/O operation. While waiting, the event loop can process other tasks, ensuring efficient resource utilization.
### 4. **Non-Blocking I/O**
- Asyncio avoids blocking the thread by using **asynchronous I/O**. For example, when a coroutine makes a network request, it doesn’t wait for the response to complete. Instead, it **registers a callback** with
the event loop and continues executing other tasks. Once the I/O operation completes, the event loop resumes the coroutine.
### 5. **Task Scheduling and Collaboration**
- The event loop manages **tasks** (coroutines) and ensures they run in a coordinated manner. It handles:
- Scheduling coroutines to run.
- Handling callbacks for completed I/O operations.
- Managing exceptions and ensuring orderly execution.
### 6. **Use Cases**
- Asyncio is best suited for **I/O-bound applications** (e.g., web servers, APIs, data scraping) where waiting for I/O is a bottleneck.
- It is **not ideal** for CPU-bound tasks, which would benefit more from multiprocessing or threading.
### Summary
Asyncio achieves concurrency by using a **single thread** with an **event loop** to manage **non-blocking I/O** and **coroutines**. This allows multiple tasks to run **simultaneously** without blocking the main th
read, making it efficient for handling many I/O operations in applications like web servers or network clients.
20:05:57 - INFO - --- Step 3: Sync Google Refinement ---
20:05:57 - INFO - Starting Google Refinement...
20:05:57 - INFO - AFC is enabled with max remote calls: 10.
20:06:06 - INFO - HTTP Request: POST https://generativelanguage.googleapis.com/v1beta/models/gemini-3-flash-preview:generateContent "HTTP/1.1 200 OK"
20:06:06 - INFO - Finished Google Refinement.
[Google Refined]:
Here are three ways to refine the text, depending on the desired tone and context.
### Option 1: Formal & Technical (Best for Documentation or Whitepapers)
This version uses precise terminology and focuses on the architectural benefits of the library.
**Overview of Asyncio in Python**
Asyncio is a specialized Python library designed to facilitate **single-threaded concurrency** through **asynchronous I/O** and an **event loop** architecture. It is structured around the following core principles
:
* **Single-Threaded Concurrency:** Unlike traditional multi-threading, asyncio operates within a single thread. It leverages **non-blocking I/O** to manage multiple execution streams concurrently. This approach
eliminates the overhead associated with context switching and thread management, making it highly efficient for **I/O-bound tasks**. * **The Event Loop:** Serving as the central orchestrator, the event loop manages task scheduling and I/O operations. When a coroutine encounters an I/O bottleneck, it **yields control** back to the loop, which
then executes other pending tasks, ensuring optimal CPU utilization. * **Coroutines and Cooperative Multitasking:** Defined via the `async def` syntax, coroutines are the fundamental units of execution. Through the `await` keyword, these functions practice **cooperative multitask
ing**, pausing their execution to allow the event loop to process other operations until the awaited result is available. * **Non-Blocking Operations:** Asyncio prevents thread blocking by registering callbacks for I/O events. This allows the system to initiate a request—such as a network call—and immediately move to the next task,
resuming the original coroutine only once the data is ready. * **Optimized Use Cases:** Asyncio is the industry standard for high-performance **I/O-bound applications**, including web servers, distributed systems, and real-time data streaming. Conversely, for CPU-intensiv
e computations, multiprocessing remains the preferred parallelization strategy.
---
### Option 2: Professional & Concise (Best for a Presentation or Summary)
This version is streamlined for readability while maintaining a professional tone.
**Understanding Asyncio**
Asyncio enables Python developers to handle high-concurrency workloads within a single thread. By utilizing an event loop and non-blocking I/O, it provides a scalable alternative to traditional threading.
1. **Concurrency without Threads:** By overlapping task execution rather than running them in parallel, asyncio avoids the memory overhead of multiple threads while efficiently handling thousands of simultaneous
connections. 2. **The Event Loop & Coroutines:** The event loop acts as a scheduler. Coroutines (`async def`) cooperatively yield control using `await`, allowing the loop to switch between tasks seamlessly whenever a program
is waiting for external data. 3. **Efficiency through Non-Blocking I/O:** Instead of halting execution during a network or file operation, asyncio registers the operation and continues with other work. The event loop resumes the paused task o
nly after the I/O operation signals completion. 4. **Strategic Application:** Asyncio is ideal for network-heavy applications like APIs and web scrapers. However, for CPU-bound tasks, developers should utilize multiprocessing to bypass the Global Interpreter L
ock (GIL).
---
### Option 3: Modern & Direct (Best for a Technical Blog or Internal Memo)
This version is punchy and uses active language to explain the concepts.
**Asyncio: Scaling Python through Asynchronous I/O**
Asyncio is Python's built-in solution for **single-threaded concurrency**. It is designed to maximize resource efficiency by ensuring the CPU never sits idle while waiting for network or disk responses.
* **How it Works:** At the core is the **Event Loop**, which manages **Coroutines** (asynchronous functions).
* **Cooperative Scheduling:** Using the `await` keyword, a coroutine voluntarily pauses itself during I/O operations. This "cooperation" allows the event loop to rotate through other tasks, creating a highly res
ponsive system. * **Non-Blocking Workflow:** By utilizing asynchronous I/O, the application can trigger multiple network requests simultaneously without blocking the main execution thread.
* **When to use it:** Use Asyncio for I/O-heavy workloads (Web Servers, APIs, Database-heavy apps). Avoid it for heavy mathematical computations, where Multiprocessing is better suited.
**Summary:** Asyncio delivers high-performance concurrency by combining a single-threaded event loop with cooperative multitasking, making it an essential tool for modern, scalable Python development.
---
### Key Improvements Made:
* **Vocabulary:** Changed "heart of" to "central orchestrator," and "ideal" to "industry standard" or "preferred strategy."
* **Clarity:** Clarified the distinction between concurrency (overlapping tasks) and parallelism (simultaneous tasks).
* **Precision:** Replaced general terms with more technical equivalents like "context switching," "resource utilization," and "cooperative multitasking."
========================================
Retrieval Time: 2.01s
Ollama Time: 17.12s
Google Time: 9.58s
Total Time: 28.71s
========================================
This is just a simple LLM example using LangChain (Google GenAI) and the Ollama (Qwen3:8b) SDK. As you can see, the RAG component is just a mock simulation for convenience.
This process can retrieve context from two sources and these tasks run concurrently. However, the subsequent LLM inference requires that context data, so it must run sequentially after the retrievals. Similarly, the refinement step must also be sequential, as it relies on refining the previous result.
Note: I used synchronous LLM clients for Langchain and Ollama since this is just a demonstration. However, in production-level development, you should implement these parts as coroutines as well. In a production environment, the server may receive high traffic; using async allows the system to handle multiple incoming requests efficiently while waiting for the LLM to respond.
AI Framework
Most AI frameworks—such as LangGraph, the OpenAI Agents SDK, Google AI SDKs, AutoGen, and CrewAI— build agents or workflows to be asynchronous by default.
But you must ensure that you use coroutine functions within your nodes or agents. Even though the frameworks themselves are asynchronous, performance can crawl if you inadvertently include blocking I/O functions in certain nodes.
Conclusion
Implementing coroutines for LLMs is not that difficult. In fact, it is quite straightforward because most SDKs and frameworks now provide native support for coroutines. The most important aspect, however, is utilizing asyncio patterns correctly. Production-level projects can be complex, making it easy to misuse coroutines or introduce bottlenecks. Therefore, when building AI inference projects, you must carefully consider several factors: which parts should be asynchronous versus synchronous, when to offload tasks using to_thread, what the optimal concurrency limit shoud be.
Top comments (0)