DEV Community

ZNY
ZNY

Posted on

DEV.TO ARTICLE 42: Async Python for AI: Building High-Concurrency AI Applications

Target Keyword: "async python ai api concurrency"
Tags: python,async,ai,programming,developer
Type: Tutorial


Content

Async Python for AI: Building High-Concurrency AI Applications

AI API calls are I/O-bound — you're waiting on network responses. Async Python lets you run many AI requests concurrently, dramatically improving throughput. Here's how to build high-concurrency AI applications.

Why Async for AI?

A single AI API call might take 1-3 seconds. If you process 100 requests sequentially, that's 100-300 seconds. With async concurrency, you can process all 100 in seconds.

import asyncio
import httpx

# Sequential (slow)
async def process_sequential(requests):
    results = []
    for req in requests:
        result = await call_ai(req)  # 2 seconds each
        results.append(result)
    return results
# Total: len(requests) × 2 seconds

# Concurrent (fast)
async def process_concurrent(requests):
    tasks = [call_ai(req) for req in requests]
    results = await asyncio.gather(*tasks)
    return results
# Total: ~2 seconds total (all run in parallel)
Enter fullscreen mode Exit fullscreen mode

Basic Async AI Client

import asyncio
import httpx
from typing import Optional

class AsyncAIClient:
    def __init__(self, api_key: str, base_url: str = "https://api.ofox.ai/v1"):
        self.api_key = api_key
        self.base_url = base_url
        self._client: Optional[httpx.AsyncClient] = None

    async def __aenter__(self):
        self._client = httpx.AsyncClient(
            headers={"Authorization": f"Bearer {self.api_key}"},
            timeout=120.0
        )
        return self

    async def __aexit__(self, *args):
        if self._client:
            await self._client.aclose()

    async def chat(self, messages: list[dict], **kwargs) -> str:
        response = await self._client.post(
            f"{self.base_url}/chat/completions",
            json={
                "model": kwargs.get("model", "claude-3-5-sonnet-20241022"),
                "messages": messages,
                "max_tokens": kwargs.get("max_tokens", 1024),
                "temperature": kwargs.get("temperature", 0.7)
            }
        )
        response.raise_for_status()
        data = response.json()
        return data["choices"][0]["message"]["content"]

    async def chat_stream(self, messages: list[dict], **kwargs):
        """Streaming version for real-time output."""
        async with self._client.stream(
            "POST",
            f"{self.base_url}/chat/completions",
            json={
                "model": kwargs.get("model", "claude-3-5-sonnet-20241022"),
                "messages": messages,
                "stream": True,
                "max_tokens": kwargs.get("max_tokens", 1024)
            }
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("data: "):
                    data = line[6:]
                    if data == "[DONE]":
                        break
                    yield data

# Usage
async def main():
    async with AsyncAIClient("your-api-key") as client:
        response = await client.chat([
            {"role": "user", "content": "Hello, Claude!"}
        ])
        print(response)
Enter fullscreen mode Exit fullscreen mode

Concurrent Batch Processing

import asyncio
from dataclasses import dataclass
from typing import List

@dataclass
class BatchItem:
    id: str
    prompt: str
    metadata: dict = None

@dataclass
class BatchResult:
    id: str
    success: bool
    response: str = ""
    error: str = ""

async def process_batch(
    items: List[BatchItem],
    client: AsyncAIClient,
    max_concurrency: int = 10
) -> List[BatchResult]:
    """
    Process items in batches with controlled concurrency.
    """
    semaphore = asyncio.Semaphore(max_concurrency)

    async def process_one(item: BatchItem) -> BatchResult:
        async with semaphore:
            try:
                response = await client.chat([
                    {"role": "user", "content": item.prompt}
                ])
                return BatchResult(id=item.id, success=True, response=response)
            except Exception as e:
                return BatchResult(id=item.id, success=False, error=str(e))

    tasks = [process_one(item) for item in items]
    results = await asyncio.gather(*tasks)
    return list(results)

# Usage
async def main():
    items = [
        BatchItem(id=f"req-{i}", prompt=f"Process item {i}")
        for i in range(100)
    ]

    async with AsyncAIClient("your-api-key") as client:
        results = await process_batch(items, client, max_concurrency=20)

        successes = [r for r in results if r.success]
        failures = [r for r in results if not r.success]

        print(f"Completed: {len(successes)}/{len(items)}")
Enter fullscreen mode Exit fullscreen mode

Rate-Limited Concurrency

import asyncio
import time

class RateLimiter:
    """Token bucket rate limiter for API calls."""

    def __init__(self, calls_per_minute: int = 60):
        self.calls_per_minute = calls_per_minute
        self.interval = 60.0 / calls_per_minute
        self.last_call = 0.0
        self._lock = asyncio.Lock()

    async def acquire(self):
        async with self._lock:
            now = time.time()
            wait_time = self.last_call + self.interval - now
            if wait_time > 0:
                await asyncio.sleep(wait_time)
            self.last_call = time.time()

async def rate_limited_processing(items: List[BatchItem], client: AsyncAIClient):
    limiter = RateLimiter(calls_per_minute=60)  # 60 RPM

    async def process(item: BatchItem) -> BatchResult:
        await limiter.acquire()  # Wait if needed
        try:
            response = await client.chat([{"role": "user", "content": item.prompt}])
            return BatchResult(id=item.id, success=True, response=response)
        except Exception as e:
            return BatchResult(id=item.id, success=False, error=str(e))

    tasks = [process(item) for item in items]
    return await asyncio.gather(*tasks)
Enter fullscreen mode Exit fullscreen mode

Retry with Exponential Backoff

import asyncio
from typing import TypeVar

T = TypeVar('T')

async def retry_with_backoff(
    fn,
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0
) -> T:
    for attempt in range(max_retries):
        try:
            return await fn()
        except Exception as e:
            if attempt == max_retries - 1:
                raise

            delay = min(base_delay * (2 ** attempt), max_delay)
            # Check if error is retryable
            if hasattr(e, 'response') and e.response:
                status = e.response.status_code
                if status not in (429, 500, 502, 503, 504):
                    raise  # Don't retry client errors

            print(f"Attempt {attempt + 1} failed, retrying in {delay}s: {e}")
            await asyncio.sleep(delay)

    raise RuntimeError("Unreachable")

# Usage
async def robust_chat(client: AsyncAIClient, messages: list[dict]) -> str:
    async def call():
        return await client.chat(messages)

    return await retry_with_backoff(call, max_retries=3)
Enter fullscreen mode Exit fullscreen mode

Building a Production AI Queue

import asyncio
from collections import deque
from dataclasses import dataclass
import uuid

@dataclass
class AIJob:
    id: str
    messages: list[dict]
    future: asyncio.Future

class AsyncAIQueue:
    def __init__(self, client: AsyncAIClient, workers: int = 5):
        self.client = client
        self.workers = workers
        self.queue: asyncio.Queue[AIJob] = asyncio.Queue()
        self.results: dict[str, str] = {}

    async def worker(self):
        while True:
            job = await self.queue.get()
            try:
                response = await retry_with_backoff(
                    lambda: self.client.chat(job.messages)
                )
                self.results[job.id] = response
                job.future.set_result(response)
            except Exception as e:
                job.future.set_exception(e)
            finally:
                self.queue.task_done()

    async def start(self):
        self.worker_tasks = [
            asyncio.create_task(self.worker())
            for _ in range(self.workers)
        ]

    async def submit(self, messages: list[dict]) -> str:
        future = asyncio.get_event_loop().create_future()
        job = AIJob(id=str(uuid.uuid4()), messages=messages, future=future)
        await self.queue.put(job)
        return job.id

    async def get_result(self, job_id: str, timeout: float = 120.0) -> str:
        # Poll for result
        start = time.time()
        while time.time() - start < timeout:
            if job_id in self.results:
                return self.results.pop(job_id)
            await asyncio.sleep(0.1)
        raise TimeoutError(f"Job {job_id} timed out")
Enter fullscreen mode Exit fullscreen mode

Getting Started

Build high-concurrency AI applications with ofox.ai — their reliable API supports async patterns with competitive pricing for production workloads.

👉 Get started with ofox.ai


This article contains affiliate links.


Tags: python,async,ai,programming,developer
Canonical URL: https://dev.to/zny10289

Top comments (0)