AI API calls are I/O-bound — you're waiting on network responses. Async Python lets you run many AI requests concurrently, dramatically improving throughput. Here's how to build high-concurrency AI applications.
Why Async for AI?
A single AI API call might take 1-3 seconds. If you process 100 requests sequentially, that's 100-300 seconds. With async concurrency, you can process all 100 in seconds.
`python
import asyncio
import httpx
Sequential (slow)
async def process_sequential(requests):
results = []
for req in requests:
result = await call_ai(req) # 2 seconds each
results.append(result)
return results
Total: len(requests) × 2 seconds
Concurrent (fast)
async def process_concurrent(requests):
tasks = [call_ai(req) for req in requests]
results = await asyncio.gather(*tasks)
return results
Total: ~2 seconds total (all run in parallel)
`
Basic Async AI Client
`python
import asyncio
import httpx
from typing import Optional
class AsyncAIClient:
def init(self, apikey: str, baseurl: str = "https://api.ofox.ai/v1"):
self.apikey = apikey
self.baseurl = baseurl
self._client: Optional[httpx.AsyncClient] = None
async def aenter(self):
self._client = httpx.AsyncClient(
headers={"Authorization": f"Bearer {self.api_key}"},
timeout=120.0
)
return self
async def aexit(self, *args):
if self._client:
await self._client.aclose()
async def chat(self, messages: list[dict], kwargs) -> str:
response = await self._client.post(
f"{self.base_url}/chat/completions",
json={
"model": kwargs.get("model", "claude-3-5-sonnet-20241022"),
"messages": messages,
"maxtokens": kwargs.get("maxtokens", 1024),
"temperature": kwargs.get("temperature", 0.7)
}
)
response.raiseforstatus()
data = response.json()
return data["choices"][0]["message"]["content"]
async def chat_stream(self, messages: list[dict], kwargs):
"""Streaming version for real-time output."""
async with self._client.stream(
"POST",
f"{self.base_url}/chat/completions",
json={
"model": kwargs.get("model", "claude-3-5-sonnet-20241022"),
"messages": messages,
"stream": True,
"maxtokens": kwargs.get("maxtokens", 1024)
}
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
yield data
Usage
async def main():
async with AsyncAIClient("your-api-key") as client:
response = await client.chat([
{"role": "user", "content": "Hello, Claude!"}
])
print(response)
`
Concurrent Batch Processing
`python
import asyncio
from dataclasses import dataclass
from typing import List
@dataclass
class BatchItem:
id: str
prompt: str
metadata: dict = None
@dataclass
class BatchResult:
id: str
success: bool
response: str = ""
error: str = ""
async def process_batch(
items: List[BatchItem],
client: AsyncAIClient,
max_concurrency: int = 10
) -> List[BatchResult]:
"""
Process items in batches with controlled concurrency.
"""
semaphore = asyncio.Semaphore(max_concurrency)
async def process_one(item: BatchItem) -> BatchResult:
async with semaphore:
try:
response = await client.chat([
{"role": "user", "content": item.prompt}
])
return BatchResult(id=item.id, success=True, response=response)
except Exception as e:
return BatchResult(id=item.id, success=False, error=str(e))
tasks = [process_one(item) for item in items]
results = await asyncio.gather(*tasks)
return list(results)
Usage
async def main():
items = [
BatchItem(id=f"req-{i}", prompt=f"Process item {i}")
for i in range(100)
]
async with AsyncAIClient("your-api-key") as client:
results = await processbatch(items, client, maxconcurrency=20)
successes = [r for r in results if r.success]
failures = [r for r in results if not r.success]
print(f"Completed: {len(successes)}/{len(items)}")
`
Rate-Limited Concurrency
`python
import asyncio
import time
class RateLimiter:
"""Token bucket rate limiter for API calls."""
def init(self, callsperminute: int = 60):
self.callsperminute = callsperminute
self.interval = 60.0 / callsperminute
self.last_call = 0.0
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.time()
waittime = self.lastcall + self.interval - now
if wait_time > 0:
await asyncio.sleep(wait_time)
self.last_call = time.time()
async def ratelimitedprocessing(items: List[BatchItem], client: AsyncAIClient):
limiter = RateLimiter(callsperminute=60) # 60 RPM
async def process(item: BatchItem) -> BatchResult:
await limiter.acquire() # Wait if needed
try:
response = await client.chat([{"role": "user", "content": item.prompt}])
return BatchResult(id=item.id, success=True, response=response)
except Exception as e:
return BatchResult(id=item.id, success=False, error=str(e))
tasks = [process(item) for item in items]
return await asyncio.gather(*tasks)
`
Retry with Exponential Backoff
`python
import asyncio
from typing import TypeVar
T = TypeVar('T')
async def retrywithbackoff(
fn,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0
) -> T:
for attempt in range(max_retries):
try:
return await fn()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = min(basedelay (2 * attempt), maxdelay)
Check if error is retryable
if hasattr(e, 'response') and e.response:
status = e.response.status_code
if status not in (429, 500, 502, 503, 504):
raise # Don't retry client errors
print(f"Attempt {attempt + 1} failed, retrying in {delay}s: {e}")
await asyncio.sleep(delay)
raise RuntimeError("Unreachable")
Usage
async def robust_chat(client: AsyncAIClient, messages: list[dict]) -> str:
async def call():
return await client.chat(messages)
return await retrywithbackoff(call, max_retries=3)
`
Building a Production AI Queue
`python
import asyncio
from collections import deque
from dataclasses import dataclass
import uuid
@dataclass
class AIJob:
id: str
messages: list[dict]
future: asyncio.Future
class AsyncAIQueue:
def init(self, client: AsyncAIClient, workers: int = 5):
self.client = client
self.workers = workers
self.queue: asyncio.Queue[AIJob] = asyncio.Queue()
self.results: dict[str, str] = {}
async def worker(self):
while True:
job = await self.queue.get()
try:
response = await retrywithbackoff(
lambda: self.client.chat(job.messages)
)
self.results[job.id] = response
job.future.set_result(response)
except Exception as e:
job.future.set_exception(e)
finally:
self.queue.task_done()
async def start(self):
self.worker_tasks = [
asyncio.create_task(self.worker())
for _ in range(self.workers)
]
async def submit(self, messages: list[dict]) -> str:
future = asyncio.geteventloop().create_future()
job = AIJob(id=str(uuid.uuid4()), messages=messages, future=future)
await self.queue.put(job)
return job.id
async def getresult(self, jobid: str, timeout: float = 120.0) -> str:
Poll for result
start = time.time()
while time.time() - start < timeout:
if job_id in self.results:
return self.results.pop(job_id)
await asyncio.sleep(0.1)
raise TimeoutError(f"Job {job_id} timed out")
`
Getting Started
Build high-concurrency AI applications with ofox.ai — their reliable API supports async patterns with competitive pricing for production workloads.
👉 Get started with ofox.ai
This article contains affiliate links.
Tags: python,async,ai,programming,developer
Canonical URL: https://dev.to/zny10289
Top comments (0)