DEV Community

Taumai flores
Taumai flores

Posted on

FastAPI + Async Python: Build High-Performance AI Apps That Don't Buckle Under Pressure

FastAPI + Async Python: Build High-Performance AI Apps That Don't Buckle Under Pressure

You've trained a great model. Your API works perfectly in testing. Then real traffic hits — and everything grinds to a halt. If you've been here before, the problem almost certainly isn't your model. It's your architecture. In this tutorial, we'll walk through how to combine FastAPI with async Python to build AI-powered APIs that stay fast, responsive, and scalable even when inference gets heavy.


Why Async Matters for AI Applications

Traditional synchronous Python is a bottleneck killer for AI workloads. When your app calls an LLM, runs an embedding model, or fetches results from a vector database, it blocks the entire thread while it waits. In a sync framework, that means every other request has to queue up and wait too.

Async Python solves this with the event loop. Instead of blocking, your code yields control while waiting for I/O-bound operations (API calls, database queries, file reads), letting the server handle other requests in the meantime.

FastAPI is built on top of Starlette and uses asyncio natively, making it the ideal choice for modern AI APIs.

Here's a quick illustration of the difference:

# Synchronous — blocks the thread
def get_embedding(text: str):
    response = openai.Embedding.create(input=text, model="text-embedding-ada-002")
    return response["data"][0]["embedding"]

# Asynchronous — yields control while waiting
async def get_embedding(text: str):
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "https://api.openai.com/v1/embeddings",
            headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
            json={"input": text, "model": "text-embedding-ada-002"}
        )
    return response.json()["data"][0]["embedding"]
Enter fullscreen mode Exit fullscreen mode

The second version allows your server to process dozens of other requests while OpenAI's API is thinking. At scale, this is the difference between 50 concurrent users and 500.


Setting Up Your FastAPI Project Structure

A clean structure saves you from painful refactors later. Here's the layout we'll build around:

my_ai_app/
├── main.py
├── api/
│   ├── routes/
│   │   ├── embeddings.py
│   │   └── chat.py
├── services/
│   ├── llm_service.py
│   └── vector_store.py
├── core/
│   ├── config.py
│   └── dependencies.py
├── models/
│   └── schemas.py
└── requirements.txt
Enter fullscreen mode Exit fullscreen mode

Your main.py bootstraps the app with lifespan management — a FastAPI feature that handles startup and shutdown cleanly:

from contextlib import asynccontextmanager
from fastapi import FastAPI
from api.routes import embeddings, chat
from core.dependencies import init_http_client, close_http_client

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: initialize shared resources
    await init_http_client()
    yield
    # Shutdown: clean up connections
    await close_http_client()

app = FastAPI(title="AI API", lifespan=lifespan)
app.include_router(embeddings.router, prefix="/embeddings")
app.include_router(chat.router, prefix="/chat")
Enter fullscreen mode Exit fullscreen mode

Using the lifespan context manager ensures your HTTP client pool, database connections, and model instances are created once and reused — not spun up fresh on every request.


Building Async AI Service Layers

The golden rule: keep your routes thin and push logic into service classes. This makes testing easier and keeps your async patterns consistent.

Here's a practical LLMService that wraps any OpenAI-compatible endpoint:

# services/llm_service.py
import httpx
from core.config import settings

class LLMService:
    def __init__(self, client: httpx.AsyncClient):
        self.client = client
        self.base_url = settings.OPENAI_BASE_URL

    async def chat_completion(self, messages: list[dict], model: str = "gpt-4o") -> str:
        response = await self.client.post(
            f"{self.base_url}/chat/completions",
            headers={"Authorization": f"Bearer {settings.OPENAI_API_KEY}"},
            json={"model": model, "messages": messages},
            timeout=30.0,
        )
        response.raise_for_status()
        return response.json()["choices"][0]["message"]["content"]

    async def get_embeddings_batch(self, texts: list[str]) -> list[list[float]]:
        # Process multiple texts concurrently
        import asyncio
        tasks = [self._single_embedding(text) for text in texts]
        return await asyncio.gather(*tasks)

    async def _single_embedding(self, text: str) -> list[float]:
        response = await self.client.post(
            f"{self.base_url}/embeddings",
            headers={"Authorization": f"Bearer {settings.OPENAI_API_KEY}"},
            json={"input": text, "model": "text-embedding-ada-002"},
        )
        return response.json()["data"][0]["embedding"]
Enter fullscreen mode Exit fullscreen mode

Notice asyncio.gather() in get_embeddings_batch. This is one of the most powerful async patterns available — it fires off multiple coroutines concurrently rather than sequentially. Embedding 10 texts takes roughly the same time as embedding 1.


Handling CPU-Bound Inference Without Blocking the Event Loop

Here's a trap that catches many developers: asyncio is great for I/O-bound work, but CPU-bound tasks (like running a local model with PyTorch or transformers) will still block your event loop even in an async function.

The fix is run_in_executor, which offloads blocking work to a thread pool:

# services/local_model_service.py
import asyncio
from concurrent.futures import ThreadPoolExecutor
from transformers import pipeline

class LocalModelService:
    def __init__(self):
        self.classifier = pipeline("sentiment-analysis")
        self._executor = ThreadPoolExecutor(max_workers=4)

    async def classify(self, text: str) -> dict:
        loop = asyncio.get_event_loop()
        # Run blocking inference in a thread pool
        result = await loop.run_in_executor(
            self._executor,
            self.classifier,
            text
        )
        return result[0]
Enter fullscreen mode Exit fullscreen mode

And your route stays clean and async:

# api/routes/chat.py
from fastapi import APIRouter, Depends
from models.schemas import ChatRequest, ChatResponse
from services.llm_service import LLMService
from core.dependencies import get_llm_service

router = APIRouter()

@router.post("/complete", response_model=ChatResponse)
async def chat_complete(
    request: ChatRequest,
    llm_service: LLMService = Depends(get_llm_service)
):
    content = await llm_service.chat_completion(
        messages=request.messages,
        model=request.model
    )
    return ChatResponse(content=content)
Enter fullscreen mode Exit fullscreen mode

FastAPI's dependency injection system, demonstrated here with Depends, is perfect for sharing service instances across routes without global state.


Streaming Responses for Real-Time AI Output

Users hate waiting 10 seconds for a complete LLM response. Streaming solves this — send tokens as they arrive. FastAPI supports this natively with StreamingResponse:

from fastapi import APIRouter
from fastapi.responses import StreamingResponse
import httpx
import json

router = APIRouter()

async def stream_llm_tokens(messages: list[dict]):
    async with httpx.AsyncClient() as client:
        async with client.stream(
            "POST",
            "https://api.openai.com/v1/chat/completions",
            headers={"Authorization": f"Bearer {OPENAI_API_KEY}"},
            json={"model": "gpt-4o", "messages": messages, "stream": True},
            timeout=60.0,
        ) as response:
            async for line in response.aiter_lines():
                if line.startswith("data: ") and line != "data: [DONE]":
                    chunk = json.loads(line[6:])
                    delta = chunk["choices"][0]["delta"].get("content", "")
                    if delta:
                        yield f"data: {delta}\n\n"

@router.post("/stream")
async def stream_chat(request: ChatRequest):
    return StreamingResponse(
        stream_llm_tokens(request.messages),
        media_type="text/event-stream"
    )
Enter fullscreen mode Exit fullscreen mode

This pattern keeps memory usage low and dramatically improves perceived performance. Users see the first token in under a second instead of waiting for the entire response to buffer.


Conclusion: Ship Architecture That Grows With You

The combination of FastAPI, async Python, and thoughtful service layering isn't just a performance trick — it's the foundation for AI applications that can actually handle production traffic. To recap the key principles:

  • Use async/await for all I/O-bound operations (external API calls, databases)
  • Use run_in_executor for CPU-bound local model inference
  • Use asyncio.gather() to fan out concurrent requests
  • Use lifespan management to share connection pools and model instances
  • Use streaming whenever you're dealing with LLM output

Your next step: take one of your existing FastAPI endpoints and convert it to fully async. Measure the before and after throughput with a tool like Locust or wrk. The numbers will make the case better than any benchmark article can.

The best AI API is the one that's still responding when your product goes viral. Build it right from the start.

Top comments (0)