Running AI agents at scale is different from running AI agents in a demo. The failure modes are subtle, the costs sneak up on you, and the bugs are non-deterministic. After running 100+ production agents at AI Buddy — serving businesses in WhatsApp automation, lead qualification, and customer support — here are the hard lessons.
Lesson 1: Rate Limits Will Destroy You in Unexpected Ways
You know about rate limits. You've read the docs. You're wrong about when they'll hit you.
Anthropic's rate limits are per-account, not per-API-key. If you're running 10 agents and they all get busy at 9am when Israeli businesses open, you'll hit limits across all of them simultaneously.
The fix is a token budget system, not just retry logic:
import asyncio
import time
from collections import deque
from dataclasses import dataclass
from typing import Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class TokenBudget:
"""Track and enforce rate limits proactively."""
requests_per_minute: int = 50
tokens_per_minute: int = 40000
def __post_init__(self):
self._request_times = deque()
self._token_usage = deque()
self._lock = asyncio.Lock()
async def acquire(self, estimated_tokens: int = 500) -> bool:
"""
Returns True if the request can proceed now.
Returns False if we should wait or queue.
"""
async with self._lock:
now = time.time()
window = 60.0 # 1 minute window
# Clean old entries
while self._request_times and now - self._request_times[0] > window:
self._request_times.popleft()
while self._token_usage and now - self._token_usage[0][0] > window:
self._token_usage.popleft()
# Check request limit
if len(self._request_times) >= self.requests_per_minute:
oldest = self._request_times[0]
wait_time = window - (now - oldest)
logger.warning(f"Rate limit approaching, need to wait {wait_time:.1f}s")
return False
# Check token limit
current_tokens = sum(tokens for _, tokens in self._token_usage)
if current_tokens + estimated_tokens > self.tokens_per_minute:
logger.warning(f"Token budget exhausted: {current_tokens}/{self.tokens_per_minute}")
return False
# Record this request
self._request_times.append(now)
self._token_usage.append((now, estimated_tokens))
return True
# Global budget shared across all agents
_global_budget = TokenBudget(requests_per_minute=45, tokens_per_minute=38000)
async def rate_limited_completion(messages: list, **kwargs) -> str:
"""LLM call with proactive rate limiting."""
import anthropic
client = anthropic.AsyncAnthropic()
# Wait if needed (up to 30 seconds)
for attempt in range(30):
if await _global_budget.acquire():
break
await asyncio.sleep(1)
else:
raise RuntimeError("Rate limit wait exceeded 30 seconds")
for retry in range(3):
try:
response = await client.messages.create(
model="claude-haiku-4-5",
max_tokens=kwargs.get("max_tokens", 512),
messages=messages
)
return response.content[0].text
except anthropic.RateLimitError as e:
wait = 2 ** retry * 5 # 5s, 10s, 20s
logger.error(f"Rate limited despite budget tracking (attempt {retry+1}): {e}")
await asyncio.sleep(wait)
raise RuntimeError("All retries exhausted")
Lesson 2: Context Windows Are a Billing Trap
A typical conversation agent stores message history and sends it with every request. With Claude Opus at $15/MTok input, a 50-message conversation history (roughly 10,000 tokens) costs $0.15 per request — not per conversation, per request. If your agent sends 20 replies in a conversation, that's $3.00 in input tokens alone.
The fix: smart context compression.
from typing import List, Dict
import json
def compress_conversation_history(
messages: List[Dict],
max_tokens: int = 2000,
always_keep_last_n: int = 6
) -> List[Dict]:
"""
Compress old messages to stay within token budget.
Strategy: summarize messages older than the last N, keep recent verbatim.
"""
if len(messages) <= always_keep_last_n:
return messages
old_messages = messages[:-always_keep_last_n]
recent_messages = messages[-always_keep_last_n:]
# Rough token estimate: 1 token ≈ 4 chars
old_token_estimate = sum(len(m["content"]) // 4 for m in old_messages)
if old_token_estimate < 500:
# Not worth compressing, just keep all
return messages
# Summarize old messages
summary = summarize_old_messages(old_messages)
compressed = [
{
"role": "system", # inject as context
"content": f"[Conversation summary: {summary}]"
}
] + recent_messages
return compressed
def summarize_old_messages(messages: List[Dict]) -> str:
"""Summarize old conversation messages into a compact context block."""
import anthropic
client = anthropic.Anthropic()
conversation_text = "\n".join([
f"{m['role'].upper()}: {m['content']}"
for m in messages
])
response = client.messages.create(
model="claude-haiku-4-5", # Use cheap model for summaries
max_tokens=200,
messages=[{
"role": "user",
"content": f"Summarize this conversation in 3-5 sentences, preserving key facts, customer preferences, and any commitments made:\n\n{conversation_text}"
}]
)
return response.content[0].text
# Usage
def get_agent_response(phone: str, new_message: str) -> str:
state = ConversationState(phone).get()
# Compress before sending to LLM
compressed_messages = compress_conversation_history(
state["messages"],
max_tokens=2000,
always_keep_last_n=6
)
compressed_messages.append({"role": "user", "content": new_message})
return rate_limited_completion(compressed_messages)
This reduces input tokens by 60-70% for long conversations.
Lesson 3: Hallucinations Are Systematic, Not Random
The first time an agent told a customer a price that didn't exist, we wrote it off as a fluke. The third time, we noticed the pattern: the agent hallucinated when it had no real information and was trained to be helpful.
The fix is explicit "I don't know" training:
SYSTEM_PROMPT_ANTI_HALLUCINATION = """You are a customer service agent for [Business Name].
CRITICAL RULES about what you know:
- You ONLY know what is in the <business_info> section below
- If a customer asks something not covered there, say exactly: "I don't have that information. Let me connect you with our team who can help — they'll reach out within 2 hours."
- NEVER invent prices, policies, or availability
- NEVER say "I think" or "probably" about business specifics
- It is ALWAYS better to admit you don't know than to guess
<business_info>
{business_knowledge_base}
</business_info>
Violation examples — NEVER do these:
❌ "I believe the price is around 200 shekels"
❌ "We probably have that in stock"
❌ "I think our hours are 9-5"
Correct responses when unsure:
✅ "I don't have pricing information. Our team will send you a quote within 2 hours."
✅ "Let me get you connected with someone who can answer that accurately."
"""
Additionally, validate factual claims before sending:
import re
from typing import List, Tuple
# Patterns that often indicate hallucination
SUSPICIOUS_PATTERNS = [
(r'\d+\s*(?:shekels?|nis|₪)', 'price_claim'),
(r'(?:open|available|operating)\s+(?:from\s+)?\d+', 'hours_claim'),
(r'(?:ships?|delivers?|arrives?)\s+(?:in\s+)?\d+\s*(?:day|hour|week)', 'delivery_claim'),
(r'we\s+(?:definitely|certainly|absolutely|always)\s+', 'certainty_claim'),
]
def flag_potential_hallucinations(
response: str,
knowledge_base: str
) -> Tuple[str, List[str]]:
"""
Detect and flag potentially hallucinated claims.
Returns (cleaned_response, list_of_flags)
"""
flags = []
for pattern, claim_type in SUSPICIOUS_PATTERNS:
matches = re.findall(pattern, response, re.IGNORECASE)
if matches:
# Check if this claim is grounded in the knowledge base
for match in matches:
if match.lower() not in knowledge_base.lower():
flags.append(f"{claim_type}: '{match}' not found in KB")
return response, flags
# In your message handler:
async def handle_with_validation(phone: str, text: str, kb: str) -> str:
response = await get_agent_response(phone, text)
cleaned, flags = flag_potential_hallucinations(response, kb)
if flags:
logger.warning(f"Potential hallucination for {phone}: {flags}")
# Option 1: Send to human review
# Option 2: Re-prompt with stricter instructions
# Option 3: Replace response with fallback
# We use option 2 for minor flags:
if len(flags) > 2: # Multiple suspicious claims — fallback
return "I want to make sure I give you accurate information. Let me have our team follow up with you directly."
return cleaned
Lesson 4: Monitoring Needs to Be Business-Aware
Generic APM tools (Datadog, Grafana) measure latency and error rates. They won't tell you that your agent is failing to capture lead data, or that it's apologizing too much (a real problem we had — the agent said sorry so often it undermined customer confidence).
Build business-aware metrics:
from dataclasses import dataclass, field
from typing import Dict, List
import time
@dataclass
class AgentMetrics:
"""Track business-relevant agent metrics, not just technical ones."""
# Conversation outcomes
conversations_started: int = 0
conversations_completed: int = 0
leads_captured: int = 0
human_escalations: int = 0
# Quality signals
apology_rate: float = 0.0 # % of responses containing apologies
avg_response_length: float = 0.0
out_of_scope_rate: float = 0.0
# Technical
avg_latency_ms: float = 0.0
error_rate: float = 0.0
token_usage_per_conversation: float = 0.0
# Time tracking
_latencies: List[float] = field(default_factory=list)
def record_response(self, response: str, latency_ms: float):
self._latencies.append(latency_ms)
self.avg_latency_ms = sum(self._latencies) / len(self._latencies)
# Detect business quality issues
apology_words = ['sorry', 'apologize', 'unfortunately', 'regret']
if any(w in response.lower() for w in apology_words):
# This is a rolling average, simplified here
self.apology_rate = (self.apology_rate * 0.95) + 0.05
else:
self.apology_rate = self.apology_rate * 0.95
def lead_captured(self):
self.leads_captured += 1
@property
def lead_capture_rate(self) -> float:
if self.conversations_started == 0:
return 0.0
return self.leads_captured / self.conversations_started
def to_dict(self) -> Dict:
return {
"lead_capture_rate": f"{self.lead_capture_rate:.1%}",
"human_escalation_rate": f"{self.human_escalations / max(self.conversations_started, 1):.1%}",
"avg_latency_ms": int(self.avg_latency_ms),
"apology_rate": f"{self.apology_rate:.1%}",
"error_rate": f"{self.error_rate:.1%}"
}
# Global metrics registry
_metrics: Dict[str, AgentMetrics] = {}
def get_metrics(agent_id: str) -> AgentMetrics:
if agent_id not in _metrics:
_metrics[agent_id] = AgentMetrics()
return _metrics[agent_id]
# Alerting
ALERT_THRESHOLDS = {
"lead_capture_rate_min": 0.15, # Alert if below 15%
"apology_rate_max": 0.20, # Alert if over 20% of responses apologize
"avg_latency_ms_max": 5000, # Alert if p50 latency over 5s
"human_escalation_rate_max": 0.30 # Alert if over 30% escalate
}
def check_alerts(agent_id: str):
metrics = get_metrics(agent_id)
alerts = []
if metrics.lead_capture_rate < ALERT_THRESHOLDS["lead_capture_rate_min"]:
alerts.append(f"Low lead capture rate: {metrics.lead_capture_rate:.1%}")
if metrics.apology_rate > ALERT_THRESHOLDS["apology_rate_max"]:
alerts.append(f"High apology rate: {metrics.apology_rate:.1%} — agent may be confused")
if metrics.avg_latency_ms > ALERT_THRESHOLDS["avg_latency_ms_max"]:
alerts.append(f"High latency: {metrics.avg_latency_ms}ms")
if alerts:
send_alert(agent_id, alerts)
def send_alert(agent_id: str, alerts: List[str]):
import requests, os
slack_url = os.environ.get("SLACK_ALERTS_WEBHOOK")
if slack_url:
requests.post(slack_url, json={
"text": f"⚠️ Agent Alert: {agent_id}\n" + "\n".join(f"• {a}" for a in alerts)
}, timeout=5)
Lesson 5: Fallback Architecture Prevents Cascading Failures
When the LLM API goes down (and it will), you need fallback behavior that doesn't just show an error. Design a degradation ladder:
from enum import Enum
from typing import Optional
import time
class AgentMode(Enum):
FULL_AI = "full_ai" # Normal operation
CACHED_AI = "cached_ai" # Using cached responses for common queries
RULES_ONLY = "rules_only" # Simple pattern matching, no LLM
HUMAN_ONLY = "human_only" # All conversations go to humans
_current_mode = AgentMode.FULL_AI
_mode_changed_at = time.time()
_consecutive_failures = 0
FAILURE_THRESHOLDS = {
2: AgentMode.CACHED_AI,
5: AgentMode.RULES_ONLY,
10: AgentMode.HUMAN_ONLY
}
def record_failure():
global _consecutive_failures, _current_mode
_consecutive_failures += 1
for threshold, mode in sorted(FAILURE_THRESHOLDS.items()):
if _consecutive_failures >= threshold:
if _current_mode != mode:
logger.error(f"Degrading to {mode.value} after {_consecutive_failures} failures")
_current_mode = mode
notify_ops(f"Agent degraded to {mode.value}")
def record_success():
global _consecutive_failures, _current_mode
_consecutive_failures = 0
if _current_mode != AgentMode.FULL_AI:
_current_mode = AgentMode.FULL_AI
logger.info("Agent recovered to FULL_AI mode")
# Rule-based fallback for when LLM is unavailable
RULE_RESPONSES = {
r'(?:price|cost|how much|pricing)':
"Our team will send you a personalized quote. Please share your name and what you need.",
r'(?:hours|open|available|when)':
"We're available Sunday-Thursday 8am-6pm. Would you like to schedule a callback?",
r'(?:hello|hi|hey|shalom|שלום)':
"Hello! How can I help you today?",
r'(?:thanks|thank you|toda|תודה)':
"Happy to help! Is there anything else you need?",
}
def rules_based_response(text: str) -> str:
"""Simple pattern matching for degraded mode."""
import re
text_lower = text.lower()
for pattern, response in RULE_RESPONSES.items():
if re.search(pattern, text_lower, re.IGNORECASE):
return response
# Default fallback
return "Thanks for reaching out! Our team will contact you shortly."
async def get_response_with_fallback(phone: str, message: str) -> str:
"""Get agent response with automatic mode degradation."""
global _current_mode
if _current_mode == AgentMode.HUMAN_ONLY:
escalate_to_human(phone, message)
return "I'm connecting you with our team right now. Someone will reach out within 15 minutes."
if _current_mode == AgentMode.RULES_ONLY:
return rules_based_response(message)
try:
response = await get_agent_response(phone, message)
record_success()
return response
except Exception as e:
record_failure()
logger.error(f"Agent failure (mode: {_current_mode}): {e}")
if _current_mode == AgentMode.CACHED_AI:
# Try cached responses
cached = get_cached_response(message)
if cached:
return cached
return rules_based_response(message)
Lesson 6: Cost Optimization in Production
Our biggest surprise: model costs were not the majority of infrastructure spend. Database and Redis were. Here's the actual cost breakdown for a 10-agent deployment serving ~500 conversations/day:
| Component | Monthly Cost |
|---|---|
| Claude API (haiku, mostly) | ~$45 |
| Claude API (opus, HOT lead review) | ~$30 |
| Redis (conversation state) | $15 |
| Server hosting | $25 |
| WhatsApp API | $0 (free tier covers 1000 msgs) |
| Total | ~$115/month |
Model selection is the highest-leverage cost optimization:
def select_model_for_context(conversation_length: int, is_priority_customer: bool) -> str:
"""
Use expensive models only when they matter.
Rules:
- haiku: regular conversations, short history
- sonnet: complex queries, returning customers
- opus: only for HOT lead qualification review
"""
if is_priority_customer:
return "claude-sonnet-4-5"
if conversation_length > 15:
# Long conversations: use sonnet for better context handling
return "claude-sonnet-4-5"
return "claude-haiku-4-5" # Default: cheapest model that's good enough
Lesson 7: The Prompt Is Your Most Important Piece of Infrastructure
We've refactored agent prompts more than any other component. Every word matters. Here's our versioning approach:
# prompts/v3_customer_service.py
PROMPT_VERSION = "v3.2"
PROMPT_CHANGELOG = """
v3.2 (2026-01-15): Reduced apology rate by removing "I'm sorry" from default phrasing
v3.1 (2026-01-10): Added Hebrew language detection and auto-switch
v3.0 (2025-12-20): Major rewrite — reduced hallucination rate from 8% to 2%
v2.x (2025-11-xx): Deprecated, too verbose
"""
SYSTEM_PROMPT = """...""" # The actual prompt
def get_prompt(version: str = "latest") -> tuple[str, str]:
"""Return (prompt, version) for tracking and A/B testing."""
return SYSTEM_PROMPT, PROMPT_VERSION
Track which prompt version generated which conversation. When a conversation goes wrong, you need to know which prompt was running.
The Architecture That Works
After 100+ agents in production, here's the architecture we'd build from scratch today:
Request → Load Balancer
↓
Agent Pool (async workers)
↓ ↓
Redis (state) Rate Limiter
↓ ↓
LLM API ←──── Model Router
↓
Response Validator
↓ ↓
Send to User Log to Analytics
Key principles:
- Async everything — sync LLM calls block other users
- State in Redis, not memory — agents restart; state shouldn't disappear
- Model routing — not every request needs your most expensive model
- Validate before sending — catch hallucinations before they reach users
- Degrade gracefully — a rules-based response beats an error every time
These lessons come from real production failures. Every one of them happened to us before we fixed it. The code above is what we actually run.
AI Buddy builds and operates these agents for businesses that don't want to manage the infrastructure themselves. If you want to implement this yourself, the code above is a solid starting point. If you'd rather focus on your business, we've already solved these problems.
Top comments (0)