DEV Community

AIbuddy_il
AIbuddy_il

Posted on • Originally published at aibuddy.co.il

Lessons from Running 100+ AI Agents in Production

Running AI agents at scale is different from running AI agents in a demo. The failure modes are subtle, the costs sneak up on you, and the bugs are non-deterministic. After running 100+ production agents at AI Buddy — serving businesses in WhatsApp automation, lead qualification, and customer support — here are the hard lessons.

Lesson 1: Rate Limits Will Destroy You in Unexpected Ways

You know about rate limits. You've read the docs. You're wrong about when they'll hit you.

Anthropic's rate limits are per-account, not per-API-key. If you're running 10 agents and they all get busy at 9am when Israeli businesses open, you'll hit limits across all of them simultaneously.

The fix is a token budget system, not just retry logic:

import asyncio
import time
from collections import deque
from dataclasses import dataclass
from typing import Optional
import logging

logger = logging.getLogger(__name__)

@dataclass
class TokenBudget:
    """Track and enforce rate limits proactively."""
    requests_per_minute: int = 50
    tokens_per_minute: int = 40000

    def __post_init__(self):
        self._request_times = deque()
        self._token_usage = deque()
        self._lock = asyncio.Lock()

    async def acquire(self, estimated_tokens: int = 500) -> bool:
        """
        Returns True if the request can proceed now.
        Returns False if we should wait or queue.
        """
        async with self._lock:
            now = time.time()
            window = 60.0  # 1 minute window

            # Clean old entries
            while self._request_times and now - self._request_times[0] > window:
                self._request_times.popleft()
            while self._token_usage and now - self._token_usage[0][0] > window:
                self._token_usage.popleft()

            # Check request limit
            if len(self._request_times) >= self.requests_per_minute:
                oldest = self._request_times[0]
                wait_time = window - (now - oldest)
                logger.warning(f"Rate limit approaching, need to wait {wait_time:.1f}s")
                return False

            # Check token limit
            current_tokens = sum(tokens for _, tokens in self._token_usage)
            if current_tokens + estimated_tokens > self.tokens_per_minute:
                logger.warning(f"Token budget exhausted: {current_tokens}/{self.tokens_per_minute}")
                return False

            # Record this request
            self._request_times.append(now)
            self._token_usage.append((now, estimated_tokens))
            return True

# Global budget shared across all agents
_global_budget = TokenBudget(requests_per_minute=45, tokens_per_minute=38000)

async def rate_limited_completion(messages: list, **kwargs) -> str:
    """LLM call with proactive rate limiting."""
    import anthropic

    client = anthropic.AsyncAnthropic()

    # Wait if needed (up to 30 seconds)
    for attempt in range(30):
        if await _global_budget.acquire():
            break
        await asyncio.sleep(1)
    else:
        raise RuntimeError("Rate limit wait exceeded 30 seconds")

    for retry in range(3):
        try:
            response = await client.messages.create(
                model="claude-haiku-4-5",
                max_tokens=kwargs.get("max_tokens", 512),
                messages=messages
            )
            return response.content[0].text
        except anthropic.RateLimitError as e:
            wait = 2 ** retry * 5  # 5s, 10s, 20s
            logger.error(f"Rate limited despite budget tracking (attempt {retry+1}): {e}")
            await asyncio.sleep(wait)

    raise RuntimeError("All retries exhausted")
Enter fullscreen mode Exit fullscreen mode

Lesson 2: Context Windows Are a Billing Trap

A typical conversation agent stores message history and sends it with every request. With Claude Opus at $15/MTok input, a 50-message conversation history (roughly 10,000 tokens) costs $0.15 per request — not per conversation, per request. If your agent sends 20 replies in a conversation, that's $3.00 in input tokens alone.

The fix: smart context compression.

from typing import List, Dict
import json

def compress_conversation_history(
    messages: List[Dict],
    max_tokens: int = 2000,
    always_keep_last_n: int = 6
) -> List[Dict]:
    """
    Compress old messages to stay within token budget.
    Strategy: summarize messages older than the last N, keep recent verbatim.
    """
    if len(messages) <= always_keep_last_n:
        return messages

    old_messages = messages[:-always_keep_last_n]
    recent_messages = messages[-always_keep_last_n:]

    # Rough token estimate: 1 token ≈ 4 chars
    old_token_estimate = sum(len(m["content"]) // 4 for m in old_messages)

    if old_token_estimate < 500:
        # Not worth compressing, just keep all
        return messages

    # Summarize old messages
    summary = summarize_old_messages(old_messages)

    compressed = [
        {
            "role": "system",  # inject as context
            "content": f"[Conversation summary: {summary}]"
        }
    ] + recent_messages

    return compressed

def summarize_old_messages(messages: List[Dict]) -> str:
    """Summarize old conversation messages into a compact context block."""
    import anthropic

    client = anthropic.Anthropic()

    conversation_text = "\n".join([
        f"{m['role'].upper()}: {m['content']}"
        for m in messages
    ])

    response = client.messages.create(
        model="claude-haiku-4-5",  # Use cheap model for summaries
        max_tokens=200,
        messages=[{
            "role": "user",
            "content": f"Summarize this conversation in 3-5 sentences, preserving key facts, customer preferences, and any commitments made:\n\n{conversation_text}"
        }]
    )

    return response.content[0].text

# Usage
def get_agent_response(phone: str, new_message: str) -> str:
    state = ConversationState(phone).get()

    # Compress before sending to LLM
    compressed_messages = compress_conversation_history(
        state["messages"],
        max_tokens=2000,
        always_keep_last_n=6
    )

    compressed_messages.append({"role": "user", "content": new_message})

    return rate_limited_completion(compressed_messages)
Enter fullscreen mode Exit fullscreen mode

This reduces input tokens by 60-70% for long conversations.

Lesson 3: Hallucinations Are Systematic, Not Random

The first time an agent told a customer a price that didn't exist, we wrote it off as a fluke. The third time, we noticed the pattern: the agent hallucinated when it had no real information and was trained to be helpful.

The fix is explicit "I don't know" training:

SYSTEM_PROMPT_ANTI_HALLUCINATION = """You are a customer service agent for [Business Name].

CRITICAL RULES about what you know:
- You ONLY know what is in the <business_info> section below
- If a customer asks something not covered there, say exactly: "I don't have that information. Let me connect you with our team who can help — they'll reach out within 2 hours."
- NEVER invent prices, policies, or availability
- NEVER say "I think" or "probably" about business specifics
- It is ALWAYS better to admit you don't know than to guess

<business_info>
{business_knowledge_base}
</business_info>

Violation examples — NEVER do these:
❌ "I believe the price is around 200 shekels""We probably have that in stock""I think our hours are 9-5"

Correct responses when unsure:
✅ "I don't have pricing information. Our team will send you a quote within 2 hours.""Let me get you connected with someone who can answer that accurately."
"""
Enter fullscreen mode Exit fullscreen mode

Additionally, validate factual claims before sending:

import re
from typing import List, Tuple

# Patterns that often indicate hallucination
SUSPICIOUS_PATTERNS = [
    (r'\d+\s*(?:shekels?|nis|₪)', 'price_claim'),
    (r'(?:open|available|operating)\s+(?:from\s+)?\d+', 'hours_claim'),
    (r'(?:ships?|delivers?|arrives?)\s+(?:in\s+)?\d+\s*(?:day|hour|week)', 'delivery_claim'),
    (r'we\s+(?:definitely|certainly|absolutely|always)\s+', 'certainty_claim'),
]

def flag_potential_hallucinations(
    response: str,
    knowledge_base: str
) -> Tuple[str, List[str]]:
    """
    Detect and flag potentially hallucinated claims.
    Returns (cleaned_response, list_of_flags)
    """
    flags = []

    for pattern, claim_type in SUSPICIOUS_PATTERNS:
        matches = re.findall(pattern, response, re.IGNORECASE)
        if matches:
            # Check if this claim is grounded in the knowledge base
            for match in matches:
                if match.lower() not in knowledge_base.lower():
                    flags.append(f"{claim_type}: '{match}' not found in KB")

    return response, flags

# In your message handler:
async def handle_with_validation(phone: str, text: str, kb: str) -> str:
    response = await get_agent_response(phone, text)
    cleaned, flags = flag_potential_hallucinations(response, kb)

    if flags:
        logger.warning(f"Potential hallucination for {phone}: {flags}")
        # Option 1: Send to human review
        # Option 2: Re-prompt with stricter instructions
        # Option 3: Replace response with fallback

        # We use option 2 for minor flags:
        if len(flags) > 2:  # Multiple suspicious claims — fallback
            return "I want to make sure I give you accurate information. Let me have our team follow up with you directly."

    return cleaned
Enter fullscreen mode Exit fullscreen mode

Lesson 4: Monitoring Needs to Be Business-Aware

Generic APM tools (Datadog, Grafana) measure latency and error rates. They won't tell you that your agent is failing to capture lead data, or that it's apologizing too much (a real problem we had — the agent said sorry so often it undermined customer confidence).

Build business-aware metrics:

from dataclasses import dataclass, field
from typing import Dict, List
import time

@dataclass  
class AgentMetrics:
    """Track business-relevant agent metrics, not just technical ones."""

    # Conversation outcomes
    conversations_started: int = 0
    conversations_completed: int = 0
    leads_captured: int = 0
    human_escalations: int = 0

    # Quality signals
    apology_rate: float = 0.0  # % of responses containing apologies
    avg_response_length: float = 0.0
    out_of_scope_rate: float = 0.0

    # Technical
    avg_latency_ms: float = 0.0
    error_rate: float = 0.0
    token_usage_per_conversation: float = 0.0

    # Time tracking
    _latencies: List[float] = field(default_factory=list)

    def record_response(self, response: str, latency_ms: float):
        self._latencies.append(latency_ms)
        self.avg_latency_ms = sum(self._latencies) / len(self._latencies)

        # Detect business quality issues
        apology_words = ['sorry', 'apologize', 'unfortunately', 'regret']
        if any(w in response.lower() for w in apology_words):
            # This is a rolling average, simplified here
            self.apology_rate = (self.apology_rate * 0.95) + 0.05
        else:
            self.apology_rate = self.apology_rate * 0.95

    def lead_captured(self):
        self.leads_captured += 1

    @property
    def lead_capture_rate(self) -> float:
        if self.conversations_started == 0:
            return 0.0
        return self.leads_captured / self.conversations_started

    def to_dict(self) -> Dict:
        return {
            "lead_capture_rate": f"{self.lead_capture_rate:.1%}",
            "human_escalation_rate": f"{self.human_escalations / max(self.conversations_started, 1):.1%}",
            "avg_latency_ms": int(self.avg_latency_ms),
            "apology_rate": f"{self.apology_rate:.1%}",
            "error_rate": f"{self.error_rate:.1%}"
        }

# Global metrics registry
_metrics: Dict[str, AgentMetrics] = {}

def get_metrics(agent_id: str) -> AgentMetrics:
    if agent_id not in _metrics:
        _metrics[agent_id] = AgentMetrics()
    return _metrics[agent_id]

# Alerting
ALERT_THRESHOLDS = {
    "lead_capture_rate_min": 0.15,   # Alert if below 15%
    "apology_rate_max": 0.20,         # Alert if over 20% of responses apologize
    "avg_latency_ms_max": 5000,       # Alert if p50 latency over 5s
    "human_escalation_rate_max": 0.30 # Alert if over 30% escalate
}

def check_alerts(agent_id: str):
    metrics = get_metrics(agent_id)

    alerts = []
    if metrics.lead_capture_rate < ALERT_THRESHOLDS["lead_capture_rate_min"]:
        alerts.append(f"Low lead capture rate: {metrics.lead_capture_rate:.1%}")
    if metrics.apology_rate > ALERT_THRESHOLDS["apology_rate_max"]:
        alerts.append(f"High apology rate: {metrics.apology_rate:.1%} — agent may be confused")
    if metrics.avg_latency_ms > ALERT_THRESHOLDS["avg_latency_ms_max"]:
        alerts.append(f"High latency: {metrics.avg_latency_ms}ms")

    if alerts:
        send_alert(agent_id, alerts)

def send_alert(agent_id: str, alerts: List[str]):
    import requests, os
    slack_url = os.environ.get("SLACK_ALERTS_WEBHOOK")
    if slack_url:
        requests.post(slack_url, json={
            "text": f"⚠️ Agent Alert: {agent_id}\n" + "\n".join(f"{a}" for a in alerts)
        }, timeout=5)
Enter fullscreen mode Exit fullscreen mode

Lesson 5: Fallback Architecture Prevents Cascading Failures

When the LLM API goes down (and it will), you need fallback behavior that doesn't just show an error. Design a degradation ladder:

from enum import Enum
from typing import Optional
import time

class AgentMode(Enum):
    FULL_AI = "full_ai"           # Normal operation
    CACHED_AI = "cached_ai"       # Using cached responses for common queries  
    RULES_ONLY = "rules_only"     # Simple pattern matching, no LLM
    HUMAN_ONLY = "human_only"     # All conversations go to humans

_current_mode = AgentMode.FULL_AI
_mode_changed_at = time.time()
_consecutive_failures = 0

FAILURE_THRESHOLDS = {
    2: AgentMode.CACHED_AI,
    5: AgentMode.RULES_ONLY,
    10: AgentMode.HUMAN_ONLY
}

def record_failure():
    global _consecutive_failures, _current_mode
    _consecutive_failures += 1

    for threshold, mode in sorted(FAILURE_THRESHOLDS.items()):
        if _consecutive_failures >= threshold:
            if _current_mode != mode:
                logger.error(f"Degrading to {mode.value} after {_consecutive_failures} failures")
                _current_mode = mode
                notify_ops(f"Agent degraded to {mode.value}")

def record_success():
    global _consecutive_failures, _current_mode
    _consecutive_failures = 0
    if _current_mode != AgentMode.FULL_AI:
        _current_mode = AgentMode.FULL_AI
        logger.info("Agent recovered to FULL_AI mode")

# Rule-based fallback for when LLM is unavailable
RULE_RESPONSES = {
    r'(?:price|cost|how much|pricing)': 
        "Our team will send you a personalized quote. Please share your name and what you need.",
    r'(?:hours|open|available|when)':
        "We're available Sunday-Thursday 8am-6pm. Would you like to schedule a callback?",
    r'(?:hello|hi|hey|shalom|שלום)':
        "Hello! How can I help you today?",
    r'(?:thanks|thank you|toda|תודה)':
        "Happy to help! Is there anything else you need?",
}

def rules_based_response(text: str) -> str:
    """Simple pattern matching for degraded mode."""
    import re
    text_lower = text.lower()

    for pattern, response in RULE_RESPONSES.items():
        if re.search(pattern, text_lower, re.IGNORECASE):
            return response

    # Default fallback
    return "Thanks for reaching out! Our team will contact you shortly."

async def get_response_with_fallback(phone: str, message: str) -> str:
    """Get agent response with automatic mode degradation."""
    global _current_mode

    if _current_mode == AgentMode.HUMAN_ONLY:
        escalate_to_human(phone, message)
        return "I'm connecting you with our team right now. Someone will reach out within 15 minutes."

    if _current_mode == AgentMode.RULES_ONLY:
        return rules_based_response(message)

    try:
        response = await get_agent_response(phone, message)
        record_success()
        return response
    except Exception as e:
        record_failure()
        logger.error(f"Agent failure (mode: {_current_mode}): {e}")

        if _current_mode == AgentMode.CACHED_AI:
            # Try cached responses
            cached = get_cached_response(message)
            if cached:
                return cached

        return rules_based_response(message)
Enter fullscreen mode Exit fullscreen mode

Lesson 6: Cost Optimization in Production

Our biggest surprise: model costs were not the majority of infrastructure spend. Database and Redis were. Here's the actual cost breakdown for a 10-agent deployment serving ~500 conversations/day:

Component Monthly Cost
Claude API (haiku, mostly) ~$45
Claude API (opus, HOT lead review) ~$30
Redis (conversation state) $15
Server hosting $25
WhatsApp API $0 (free tier covers 1000 msgs)
Total ~$115/month

Model selection is the highest-leverage cost optimization:

def select_model_for_context(conversation_length: int, is_priority_customer: bool) -> str:
    """
    Use expensive models only when they matter.

    Rules:
    - haiku: regular conversations, short history
    - sonnet: complex queries, returning customers  
    - opus: only for HOT lead qualification review
    """
    if is_priority_customer:
        return "claude-sonnet-4-5"

    if conversation_length > 15:
        # Long conversations: use sonnet for better context handling
        return "claude-sonnet-4-5"

    return "claude-haiku-4-5"  # Default: cheapest model that's good enough
Enter fullscreen mode Exit fullscreen mode

Lesson 7: The Prompt Is Your Most Important Piece of Infrastructure

We've refactored agent prompts more than any other component. Every word matters. Here's our versioning approach:

# prompts/v3_customer_service.py
PROMPT_VERSION = "v3.2"
PROMPT_CHANGELOG = """
v3.2 (2026-01-15): Reduced apology rate by removing "I'm sorry" from default phrasing
v3.1 (2026-01-10): Added Hebrew language detection and auto-switch
v3.0 (2025-12-20): Major rewrite — reduced hallucination rate from 8% to 2%
v2.x (2025-11-xx): Deprecated, too verbose
"""

SYSTEM_PROMPT = """..."""  # The actual prompt

def get_prompt(version: str = "latest") -> tuple[str, str]:
    """Return (prompt, version) for tracking and A/B testing."""
    return SYSTEM_PROMPT, PROMPT_VERSION
Enter fullscreen mode Exit fullscreen mode

Track which prompt version generated which conversation. When a conversation goes wrong, you need to know which prompt was running.

The Architecture That Works

After 100+ agents in production, here's the architecture we'd build from scratch today:

Request → Load Balancer
            ↓
         Agent Pool (async workers)
            ↓               ↓
      Redis (state)    Rate Limiter
            ↓               ↓
         LLM API ←──── Model Router
            ↓
         Response Validator
            ↓         ↓
      Send to User   Log to Analytics
Enter fullscreen mode Exit fullscreen mode

Key principles:

  1. Async everything — sync LLM calls block other users
  2. State in Redis, not memory — agents restart; state shouldn't disappear
  3. Model routing — not every request needs your most expensive model
  4. Validate before sending — catch hallucinations before they reach users
  5. Degrade gracefully — a rules-based response beats an error every time

These lessons come from real production failures. Every one of them happened to us before we fixed it. The code above is what we actually run.

AI Buddy builds and operates these agents for businesses that don't want to manage the infrastructure themselves. If you want to implement this yourself, the code above is a solid starting point. If you'd rather focus on your business, we've already solved these problems.

Top comments (0)