This is a condensed, production-focused series. Each part builds on the last. By the end, you'll have a mental model — and working code — for infrastructure that can handle thousands of concurrent AI users without breaking a sweat.
Part 1: The Delusion of "Just Add More Compute"
It's a familiar story: you build an incredible AI application. Locally, it's blazing fast. The responses are snappy, the logic is sound, and you feel ready to conquer the world. Then you launch.
At 10 users, it hums. At 100 users, it stutters. At 1,000 users? It completely melts down.
Why? Because the infrastructure you used to build your prototype is fundamentally different from the infrastructure required to run it at scale.
Many developers fall into the trap of thinking they can just throw more money at the problem — spin up a bigger instance, add more RAM, maybe even spring for a beefier GPU. This approach is called vertical scaling, and it has hard limits.1
Why AI Apps Fail Early
AI applications are unique beasts. Unlike a standard web app that simply queries a database, an AI app typically requires:
- Intensive compute — generating text, images, or structured data takes significant processing power.
- Long-running processes — a single request can take seconds or even minutes to complete.
- State management — maintaining context across multiple interactions is essential for chat applications.
When 1,000 users hit your single beefy server simultaneously, the compute queue overflows, memory gets exhausted, and connections time out. Your users are left staring at a spinner, or worse, a 502 Bad Gateway.
❌ Hard truth: You cannot brute-force your way out of a poorly architected AI infrastructure.
Vertical vs. Horizontal Scaling: The Showdown
| Feature | Vertical Scaling (Scale Up) | Horizontal Scaling (Scale Out) |
|---|---|---|
| Concept | Add more power (CPU, RAM) to one machine | Add more machines to your pool |
| Limits | Hardware ceiling — there's only so big a server can get | Virtually limitless |
| Downtime | Requires downtime to upgrade hardware | Zero downtime; new nodes are added dynamically |
| Cost growth | Exponential as hardware gets more specialized | Linear — often cheaper per compute unit |
| Resilience | Single point of failure | Highly resilient — one node dies, others pick up the slack |
| AI workloads | Quickly hits a ceiling on parallel requests | Ideal for distributing heavy inference tasks |
If you want your AI app to survive the jump from prototype to production, you must embrace horizontal scaling.2
The Problem with Traditional HTTP for AI
Imagine you ask an AI to write a short story. Using standard HTTP, the client sends the request and the server works on it. While the server works, the connection hangs open.
If generation takes 30 seconds, that connection is blocked for 30 seconds. If a network blip occurs, the connection breaks, the data is lost, and the user has to start over. This is a disaster for user experience.
The Solution: WebSockets + Asynchronous Processing
Here is the standard architecture for robust AI data delivery:
- The client connects via WebSocket — a persistent, two-way connection instead of a one-off request.
- The server immediately acknowledges without starting heavy compute.
- The generation task is placed onto a message queue.
- Worker nodes pick up the task from the queue, run the model, and stream the response token-by-token back through the WebSocket.
This guarantees no blocked connections, real-time streamed feedback that reduces perceived latency, and resilience if the client disconnects mid-stream.3
import asyncio
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
manager = ConnectionManager()
# Simulate token-by-token AI generation
async def mock_ai_generation(prompt: str):
response_tokens = [
"This ", "is ", "a ", "simulated ", "response ",
"from ", "our ", "AI ", "model. ", "It ",
"streams ", "data ", "without ", "loss."
]
for token in response_tokens:
await asyncio.sleep(0.5) # Simulate inference delay
yield token
@app.websocket("/ws/generate")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.send_personal_message("Processing your request...", websocket)
# Stream tokens back as they are "generated"
async for token in mock_ai_generation(data):
await manager.send_personal_message(token, websocket)
await manager.send_personal_message("[DONE]", websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
# Run with: uvicorn your_filename:app --reload
Part 2: The "Never Block the User" Rule — Message Queues in AI
If you've used ChatGPT, Claude, or Gemini, you've probably noticed a UX quirk: the input box grays out while the AI is responding. You're locked out until it finishes.
Multi-billion-dollar companies do this deliberately — your second message usually depends on the AI's answer to your first, so they enforce a strict sequential timeline. But not every AI feature is a chatbot.
Consider:
- A legal tech platform where a user uploads 50 contracts and hits "Extract Clauses."
- An AI code reviewer where a developer pushes 10 commits and wants background reviews while they keep coding.
- A bulk content generator where a marketer pastes 20 blog titles to be drafted simultaneously.
In these tools, forcing a user to wait for task 1 before submitting task 2 is terrible UX. This is where message queues become essential.
Choosing the Right Message Queue
Before writing code, you need to pick your queuing backbone. Here's a practical decision guide:
| Queue | When to Choose | When NOT to Choose | Why |
|---|---|---|---|
| Redis (Celery/RQ) | Fast, reliable standard async AI tasks | Massive continuous real-time data streams | In-memory speed makes it the industry default for web-app background tasks |
| RabbitMQ | Complex routing logic (e.g. image tasks → GPU A, text → GPU B) | Simple FIFO needs | AMQP protocol allows intelligent routing via "exchanges" |
| Apache Kafka | Enterprise-scale event streaming needing replay/multi-consumer analysis | Small-to-medium startups | Built for insane throughput, but notoriously complex to host4 |
| asyncio.Queue | Prototyping, local testing, transient tasks | Production — crashes wipe the queue permanently | Zero infrastructure, built into Python |
Building It: Real OpenAI Streaming with Asyncio Queues
Prerequisites:
pip install fastapi uvicorn websockets openai
import asyncio
import os
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from openai import AsyncOpenAI
app = FastAPI()
client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY", "sk-fake-key"))
# ── 1. THE AI WORKER ─────────────────────────────────────
async def process_ai_task(prompt: str):
"""Streams the OpenAI response token by token."""
try:
stream = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
stream=True,
)
async for chunk in stream:
if chunk.choices[0].delta.content is not None:
yield chunk.choices[0].delta.content
except Exception as e:
yield f"\n[API Error: {str(e)}]\n"
# ── 2. THE QUEUE MANAGER ──────────────────────────────────
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
self.user_queues: dict[WebSocket, asyncio.Queue] = {}
self.worker_tasks: dict[WebSocket, asyncio.Task] = {}
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
self.user_queues[websocket] = asyncio.Queue()
# Each user gets their own dedicated background worker
task = asyncio.create_task(self.queue_worker(websocket))
self.worker_tasks[websocket] = task
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
self.worker_tasks[websocket].cancel()
del self.worker_tasks[websocket]
del self.user_queues[websocket]
async def queue_message(self, websocket: WebSocket, message: str):
"""Enqueues the task and immediately confirms receipt to the user."""
queue = self.user_queues[websocket]
await queue.put(message)
# The magic: we never block — we just confirm it's in line
await websocket.send_text(
f"\n[System]: Task queued. {queue.qsize()} item(s) in line.\n"
)
async def queue_worker(self, websocket: WebSocket):
"""Processes queued tasks sequentially in the background."""
queue = self.user_queues[websocket]
try:
while True:
message = await queue.get() # Waits until a task arrives
await websocket.send_text(f"\n[AI processing]: {message}\n")
async for token in process_ai_task(message):
await websocket.send_text(token)
await websocket.send_text("\n[DONE]\n")
queue.task_done()
except asyncio.CancelledError:
pass # Clean exit on disconnect
manager = ConnectionManager()
# ── 3. THE WEBSOCKET ENDPOINT ─────────────────────────────
@app.websocket("/ws/tasks")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
# Hand off immediately — the loop is free for the next message
await manager.queue_message(websocket, data)
except WebSocketDisconnect:
manager.disconnect(websocket)
# Run with: uvicorn filename:app --reload
By decoupling the receiving of a prompt from the processing of it, you create a system that feels snappy even under load.
Part 4: AI-Specific Observability & Benchmarking — The Panopticon
Once your traffic is flowing, how do you actually know if your setup is performing well?
"Without data, you're just another person with an opinion." — W. Edwards Deming
If you swap models, change your caching layer, or move from OpenAI to a self-hosted vLLM instance, you cannot rely on eyeballing the chat UI and thinking "yeah, that seems snappier." Standard web metrics like server uptime, CPU utilization, and HTTP 200 counts are practically useless for debugging an LLM application.
Why Standard Observability Fails
If a standard CRUD endpoint takes 5 seconds to return a JSON payload, that's a clear failure. You check your query plan and add an index.
If an AI endpoint takes 5 seconds to return a response — is that a failure?
- If it generated a 2-word answer: yes.
- If it generated a 500-word essay: no.
Because AI execution time is directly tied to output length, standard request latency tells you nothing useful. You must track the Big Two instead:5
The Big Two: TTFT and TPOT
TTFT (Time To First Token) is the exact milliseconds between the user hitting Send and the first piece of text arriving. It measures your network overhead, load balancer efficiency, and LLM provider queue time. High TTFT makes your app feel broken.
TPOT (Time Per Output Token) is the average time to generate each subsequent token after the first arrives — also called Inter-Token Latency. It measures pure GPU inference speed. If TTFT is fast but TPOT is slow, your network is fine but your model is the bottleneck.
The Code: A Production Benchmarking Script
import asyncio
import time
from openai import AsyncOpenAI
# Point this at your real key, or a LiteLLM Gateway!
client = AsyncOpenAI(api_key="sk-replace-with-your-key")
async def benchmark_llm_call(prompt: str):
print(f"Benchmarking: '{prompt}'...\n")
start_time = time.time()
first_token_time = None
token_count = 0
try:
# MUST use stream=True — otherwise TTFT measurement is impossible
stream = await client.chat.completions.create(
model="gpt-4-turbo",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in stream:
if first_token_time is None:
first_token_time = time.time() # Capture the exact microsecond
if chunk.choices[0].delta.content is not None:
token_count += 1
end_time = time.time()
# ── THE BENCHMARK MATH ──────────────────────────────
ttft = first_token_time - start_time
total_time = end_time - start_time
tpot = (end_time - first_token_time) / max((token_count - 1), 1)
print("\n📊 ── BENCHMARK RESULTS ───────────────────────")
print(f" Tokens Generated : {token_count}")
print(f" TTFT : {ttft:.3f}s")
print(f" TPOT : {tpot:.3f}s / token")
print(f" Total Latency : {total_time:.3f}s")
print("───────────────────────────────────────────────\n")
except Exception as e:
print(f"❌ Benchmarking failed: {str(e)}")
if __name__ == "__main__":
test_prompt = "Write a 3-paragraph explanation of vertical vs horizontal scaling."
asyncio.run(benchmark_llm_call(test_prompt))
How to use this data: Run the script before and after every infrastructure change. Did you add a new message queue? Check your TTFT — a 500ms spike means the queue is adding overhead. Did you switch from GPT-4 to a local Llama model? Check your TPOT for mathematical proof of the inference speed difference.
Part 5: Token-Aware Rate Limiting & Guardrails — The Bouncer
You can have the fastest queues, the smartest load balancers, and top-tier observability. But without an AI-specific bouncer, a single rogue user — or a broken looping AI agent — will bankrupt your API wallet overnight.
Here's the core problem: standard web rate limiting is completely blind to compute cost.
The Request Asymmetry Problem
| User Type | Requests/min | Payload | Compute Cost | Standard Limiter Says |
|---|---|---|---|---|
| User A (light) | 20 | Short chat prompts ("Hi", "Thanks") | ~$0.002 | ❌ BLOCKED (exceeded 15 RPM) |
| User B (heavy/rogue) | 2 | 80k context window document drops | ~$4.50+ | ✅ ALLOWED (well under 15 RPM) |
Standard rate limiters punish your harmless, chatty users while letting heavy resource hogs drain your API quota.6
The Solution: Token Bucket Algorithm for LLMs
The fix is inline middleware that evaluates the weight of a request (input tokens) before touching the LLM, tracks it in real time, and reconciles the budget after generation (output tokens).
import asyncio
import time
from fastapi import FastAPI, HTTPException, Header
from pydantic import BaseModel
app = FastAPI()
# ── 1. TOKEN BUCKET ───────────────────────────────────────
class TokenBucket:
def __init__(self, max_tokens: int, refill_rate_per_sec: float):
self.max_tokens = max_tokens
self.refill_rate = refill_rate_per_sec
self.tokens = max_tokens
self.last_update = time.time()
def _refill(self):
now = time.time()
elapsed = now - self.last_update
self.tokens = min(self.max_tokens, self.tokens + (elapsed * self.refill_rate))
self.last_update = now
def consume(self, estimated_tokens: int) -> bool:
self._refill()
if self.tokens >= estimated_tokens:
self.tokens -= estimated_tokens
return True
return False
# Per-user buckets — swap this into Redis with Lua scripts for production!
USER_LIMITS = {
"user_premium_123": TokenBucket(max_tokens=50_000, refill_rate_per_sec=100),
"user_free_456": TokenBucket(max_tokens=5_000, refill_rate_per_sec=10),
}
class GenerationRequest(BaseModel):
prompt: str
def estimate_token_count(text: str) -> int:
# Rough rule: 1 token ≈ 4 characters. Use tiktoken for precision in production.
return max(1, len(text) // 4)
# ── 2. RATE-LIMITED ENDPOINT ──────────────────────────────
@app.post("/v1/chat/completions")
async def secure_chat_endpoint(
request: GenerationRequest,
x_user_id: str = Header(...)
):
if x_user_id not in USER_LIMITS:
raise HTTPException(status_code=401, detail="Invalid User ID")
bucket = USER_LIMITS[x_user_id]
input_estimate = estimate_token_count(request.prompt)
# Reserve 3× input as a safety buffer for output length unpredictability
reservation = input_estimate * 3
if not bucket.consume(reservation):
raise HTTPException(
status_code=429,
detail="Token capacity exceeded. Wait for pool refill."
)
# Simulate LLM inference
await asyncio.sleep(0.5)
actual_output_tokens = 150
# Reconcile: refund tokens if the model was more concise than worst-case
actual_used = input_estimate + actual_output_tokens
refund = reservation - actual_used
if refund > 0:
bucket.tokens = min(bucket.max_tokens, bucket.tokens + refund)
return {
"status": "success",
"tokens_spent": actual_used,
"remaining_pool": int(bucket.tokens)
}
# Run via: uvicorn file_name:app --reload
Two things make this powerful. First, abusive payloads are dropped before they touch your expensive GPU or API provider — a free-tier user throwing a context bomb gets a 429 in 2 milliseconds without costing you anything. Second, because exact output length is fundamentally unpredictable before inference, the code reserves a safe pool upfront and instantly refunds the user's quota if the AI answers succinctly.
💬 Let's Talk in the Comments
I wrote this series because I watched a lot of smart developers — myself included — build genuinely impressive AI features, then ship them into an infrastructure that was never designed to carry the weight. The app works perfectly in a demo. Then three colleagues use it simultaneously and it falls over. That gap between "it works on my machine" and "it works for a thousand real users" is where most AI projects quietly die, and I think it deserves more honest, code-first attention than the usual "just use a managed service" advice.
If you're building something with LLMs right now, I'd love to know what part of this resonates most with your situation — or where you've hit a wall that none of these patterns fully solved. Drop your questions, your war stories, or your pushback in the comments. A few things I'm especially curious about from you:
- Have you hit a scaling wall that wasn't a compute problem but a design problem? What did the breaking point actually look like?
- Are you running self-hosted models (vLLM, Ollama, etc.) or leaning on managed providers — and how does that change which of these patterns matter most to you?
- Is there a Part 6 you wish existed? Gateway caching, cost allocation per tenant, multi-agent coordination — I'll write the thing people actually need next. The best technical writing is a conversation, not a lecture. I'm here.
Wrapping Up the Series
Over these five installments, we've fundamentally rearchitected the standard web template to support scale-ready AI systems:
- WebSockets — prevented timeout drops and opened live token streaming.
- Async Message Queues — decoupled user inputs from long-running inference tasks.
- Gateway Load Balancing — stopped provider downtime with usage-aware routing.
- AI Observability (TTFT & TPOT) — replaced human guesswork with empirical benchmarking.
- Token-Aware Rate Limiting — built an ironclad economic protection layer.
Build your AI apps on these five pillars, and your infrastructure won't just survive 1,000 concurrent users — it will welcome them seamlessly.
References & Further Reading
-
On vertical vs horizontal scaling fundamentals: AWS — Scaling Up vs. Scaling Out ↩
-
On horizontal scaling for high-availability systems: Martin Fowler — Patterns of Enterprise Application Architecture ↩
-
On WebSocket protocol and persistent connections: MDN Web Docs — The WebSocket API ↩
-
On Apache Kafka's architecture and operational complexity: Confluent — Kafka vs. RabbitMQ ↩
-
On TTFT and TPOT as primary LLM inference metrics: Anyscale — LLM Performance Guide ↩
-
On token-based rate limiting for LLM APIs: OpenAI — Rate Limits Documentation ↩



Top comments (0)