Build production-grade AI agent systems using microservices. Covers FastAPI, gRPC, Kafka, Kubernetes, OpenTelemetry, and fault-tolerant orchestration patterns in Python.
Table of Contents
- Introduction & Motivation
- Core Architecture Principles
- Agent Service Design
- The AgentRunner Loop
- Inter-Agent Communication
- Tool Registry Service
- Memory Architecture
- Context Window Management
- Orchestrator & Supervisor Pattern
- Security & Authorization
- Observability: Traces, Logs, Metrics
- Deployment on Kubernetes
- Scaling Strategies
- Fault Tolerance & Retry Strategies
- Testing Agent Microservices
- CI/CD Pipeline for Agent Services
- Cost Management & Token Budgeting
- Production Readiness Checklist
- Reference Architecture Diagram
Introduction & Motivation
Why monolithic agent systems fail in production
A single-process agent that handles reasoning, tool calls, memory retrieval, and output generation works well in prototypes. In production it breaks in predictable ways:
- Latency coupling — one slow tool call blocks the entire inference loop
- Unscalable compute — you cannot scale the summarization workload independently from the search workload
- Blast radius — a single LLM API timeout or memory corruption takes the whole system down
- Zero deployment granularity — updating one tool integration requires redeploying everything
- No isolation for billing — impossible to attribute compute cost to individual agent functions
The microservice solution
Each autonomous capability becomes an independently deployable, independently scalable service with:
- Its own API surface (HTTP/gRPC)
- Its own health checks and readiness probes
- Its own memory scope (no shared in-process state)
- Its own tool bindings (resolved at runtime from a Tool Registry)
- Its own observability (distributed traces, metrics, structured logs)
What is a Micro Agent?
A micro agent is a bounded autonomous service that:
- Accepts a task (prompt + context + session ID) via an API call
- Runs a plan → act → observe loop using an LLM backend
- Invokes tools via a centralized Tool Registry
- Stores and retrieves conversation state from an external memory store
- Returns a typed result or emits an event to downstream consumers
Key insight: A micro agent is not a “smart function” — it is a service with its own API contract, memory scope, failure modes, and SLA. Design it accordingly.
Core Architecture Principles
Single Responsibility
Each agent owns exactly one reasoning domain. Examples:
Stateless Reasoning, Stateful Memory
The LLM inference step must be stateless. Memory lives in external stores:
No conversation history should ever live in in-process RAM between requests.
Schema-First Tool Contracts
Every tool must have a JSON Schema definition published to a shared Tool Registry before any agent can invoke it. No ad-hoc function signatures. This enables:
- Runtime input validation before LLM output reaches backend services
- Auto-generated documentation
- Tool versioning with backwards compatibility checks
Idempotent Actions
Any tool call that modifies external state (send email, write to DB, trigger webhook) must be idempotent. Strategies:
- Use idempotency keys at the HTTP layer (pass Idempotency-Key header)
- Use message deduplication at the queue level (Kafka exactly-once semantics)
- Design tool handlers to be safe to retry: check-then-act patterns
Async by Default
Long-running agent tasks (multi-step research, code generation + execution) must use async task queues — not synchronous HTTP with long timeouts.
Client ──► POST /tasks ──► Kafka/BullMQ ──► AgentWorker
Client ──► GET /tasks/{id} ──► Redis (status polling)
◄── WebSocket/SSE push (optional)
Explicit Context Boundaries
Each agent invocation carries a bounded context packet — never grow unbounded message histories. A ContextManager service compresses/summarizes history before injection.
Agent Service Design
Project Layout
Each agent is a containerized FastAPI or gRPC service with this canonical structure:
agent-search/
├── agent/
│ ├── core.py # AgentRunner: plan → act → observe loop
│ ├── prompts.py # System prompt + few-shot templates
│ ├── memory.py # ContextManager: load/compress/save
│ ├── tools.py # Tool bindings (calls Tool Registry)
│ └── schemas.py # Pydantic models for all I/O
├── api/
│ ├── routes.py # POST /run, GET /status/{task_id}
│ ├── middleware.py # Auth, rate limiting, request tracing
│ └── deps.py # Dependency injection: DB, Redis, LLM client
├── tests/
│ ├── unit/
│ ├── integration/
│ └── fixtures/
├── Dockerfile
├── pyproject.toml
└── k8s/
├── deployment.yaml
├── service.yaml
├── hpa.yaml
└── configmap.yaml
API Contract
Every agent exposes these HTTP endpoints at minimum:
POST /run Submit a task (sync, short tasks only)
POST /tasks Submit a task (async, returns task_id)
GET /tasks/{task_id} Poll task status and result
GET /health Liveness probe
GET /ready Readiness probe (checks LLM + memory store)
GET /metrics Prometheus metrics endpoint
# agent/schemas.py
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
from enum import Enum
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class AgentTask(BaseModel):
id: str
session_id: str
prompt: str
metadata: Dict[str, Any] = Field(default_factory=dict)
max_steps: int = Field(default=10, ge=1, le=25)
token_budget: int = Field(default=8192, ge=512, le=32768)
class AgentResult(BaseModel):
task_id: str
status: TaskStatus
output: Optional[str] = None
steps_used: int = 0
tokens_used: int = 0
tool_calls: int = 0
error: Optional[str] = None
duration_ms: int = 0
The AgentRunner Loop
Full Implementation
# agent/core.py
import asyncio
import time
from opentelemetry import trace
from tenacity import retry, stop_after_attempt, wait_exponential_jitter
tracer = trace.get_tracer( __name__ )
MAX_STEPS = 15
class AgentRunner:
def __init__ (self, agent_id: str, config: AgentConfig):
self.agent_id = agent_id
self.llm = LLMClient(model=config.model, timeout=30)
self.memory = ContextManager(agent_id, max_tokens=config.context_limit)
self.tools = ToolRegistryClient(config.tool_registry_url)
self.metrics = AgentMetrics(agent_id)
async def run(self, task: AgentTask) -> AgentResult:
start = time.monotonic()
with tracer.start_as_current_span("agent.run") as span:
span.set_attribute("agent.id", self.agent_id)
span.set_attribute("agent.task_id", task.id)
span.set_attribute("agent.session", task.session_id)
try:
result = await self._run_loop(task, span)
except TokenBudgetExceeded as e:
result = AgentResult(
task_id=task.id,
status=TaskStatus.COMPLETED,
output=e.partial_output,
error="token_budget_exceeded"
)
except Exception as e:
span.record_exception(e)
result = AgentResult(
task_id=task.id,
status=TaskStatus.FAILED,
error=str(e)
)
finally:
result.duration_ms = int((time.monotonic() - start) * 1000)
self.metrics.record(result)
return result
async def _run_loop(self, task: AgentTask, span) -> AgentResult:
# Load available tools from registry
tool_schemas = await self.tools.fetch(agent_id=self.agent_id)
# Load and compress conversation history
context = await self.memory.load(task.session_id)
messages = build_messages(context, task.prompt)
total_tokens = 0
tool_call_count = 0
for step in range(task.max_steps):
span.set_attribute("agent.current_step", step)
with tracer.start_as_current_span("agent.llm_call") as llm_span:
response = await self._complete_with_retry(messages, tool_schemas)
llm_span.set_attribute("llm.prompt_tokens", response.usage.prompt_tokens)
llm_span.set_attribute("llm.completion_tokens", response.usage.completion_tokens)
total_tokens += response.usage.total_tokens
if total_tokens > task.token_budget:
raise TokenBudgetExceeded(
partial_output=response.content,
tokens_used=total_tokens
)
if response.finish_reason == "stop":
await self.memory.save(task.session_id, messages + [response.message])
return AgentResult(
task_id=task.id,
status=TaskStatus.COMPLETED,
output=response.content,
steps_used=step + 1,
tokens_used=total_tokens,
tool_calls=tool_call_count
)
if response.tool_calls:
tool_call_count += len(response.tool_calls)
results = await self._execute_tools(response.tool_calls)
messages.append(response.message)
messages.extend(tool_result_messages(results))
# Hit max steps — return best available output
return AgentResult(
task_id=task.id,
status=TaskStatus.COMPLETED,
output=response.content,
steps_used=task.max_steps,
tokens_used=total_tokens,
error="max_steps_reached"
)
@retry(stop=stop_after_attempt(3), wait=wait_exponential_jitter(max=15))
async def _complete_with_retry(self, messages, tools):
return await self.llm.complete(messages=messages, tools=tools)
async def _execute_tools(self, tool_calls):
tasks = [self.tools.invoke(tc) for tc in tool_calls]
return await asyncio.gather(*tasks, return_exceptions=True)
Inter-Agent Communication
Pattern Selection Matrix
gRPC Service Definition
For synchronous sub-agent calls, gRPC provides strong typing, bidirectional streaming, and efficient binary serialization.
// proto/agent_service.proto
syntax = "proto3";
package agents.v1;
service AgentService {
rpc RunTask (TaskRequest) returns (TaskResponse);
rpc StreamSteps (TaskRequest) returns (stream StepEvent);
rpc Health (HealthRequest) returns (HealthResponse);
}
message TaskRequest {
string task_id = 1;
string session_id = 2;
string prompt = 3;
map<string, string> metadata = 4;
int32 max_steps = 5;
int32 token_budget = 6;
}
message TaskResponse {
string task_id = 1;
string status = 2;
string output = 3;
int32 steps_used = 4;
int32 tokens_used = 5;
string error = 6;
}
message StepEvent {
int32 step_number = 1;
string type = 2; // "llm_call" | "tool_call" | "tool_result"
string content = 3;
}
Kafka Event Schema
For async pipeline handoffs between agents, use Avro or JSON schemas registered in a Schema Registry.
{
"schema": {
"type": "record",
"name": "AgentTaskEvent",
"namespace": "com.myco.agents.v1",
"fields": [
{"name": "task_id", "type": "string"},
{"name": "source_agent", "type": "string"},
{"name": "target_agent", "type": "string"},
{"name": "session_id", "type": "string"},
{"name": "prompt", "type": "string"},
{"name": "context", "type": {"type": "map", "values": "string"}},
{"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}}
]
}
}
Kafka Producer (in Orchestrator)
# In orchestrator when dispatching to agent-search
from aiokafka import AIOKafkaProducer
import json
async def dispatch_to_agent(target_agent: str, task: AgentTask):
producer = AIOKafkaProducer(bootstrap_servers=KAFKA_BROKERS)
await producer.start()
try:
event = {
"task_id": task.id,
"source_agent": "orchestrator",
"target_agent": target_agent,
"session_id": task.session_id,
"prompt": task.prompt,
"created_at": int(time.time() * 1000)
}
await producer.send_and_wait(
topic=f"agent.tasks.{target_agent}",
value=json.dumps(event).encode(),
key=task.session_id.encode(), # partition by session
headers=[("trace-id", get_current_trace_id().encode())]
)
finally:
await producer.stop()
Tool Registry Service
Architecture
The Tool Registry is a centralized FastAPI service that stores, validates, and serves tool definitions. It acts as a typed API gateway for all agent→tool traffic.
Tool Registration Schema
# Tool self-registers on startup
class ToolDefinition(BaseModel):
name: str
version: str
description: str
parameters: Dict[str, Any] # JSON Schema
returns: Dict[str, Any] # JSON Schema
endpoint: str # where registry routes calls
health_url: str
auth_type: str # "api_key" | "oauth2" | "none"
rate_limit: int # calls per minute per agent
timeout_ms: int = 10000
# Registration call at tool service startup
@app.on_event("startup")
async def register_tool():
registry = ToolRegistryClient(TOOL_REGISTRY_URL)
await registry.register(ToolDefinition(
name="web_search",
version="2.1.0",
description="Search the web and return ranked results",
parameters={
"type": "object",
"properties": {
"query": {"type": "string", "maxLength": 500},
"num_results": {"type": "integer", "minimum": 1, "maximum": 20}
},
"required": ["query"]
},
returns={
"type": "array",
"items": {
"type": "object",
"properties": {
"url": {"type": "string"},
"title": {"type": "string"},
"snippet": {"type": "string"}
}
}
},
endpoint=f"{SERVICE_URL}/invoke",
health_url=f"{SERVICE_URL}/health",
auth_type="api_key",
rate_limit=60,
timeout_ms=8000
))
Registry Validation Layer
# Tool Registry validates before forwarding
async def invoke_tool(agent_id: str, tool_name: str, params: dict):
tool = await db.get_tool(tool_name)
if not tool:
raise ToolNotFoundError(tool_name)
# Validate against JSON Schema
jsonschema.validate(params, tool.parameters) # raises on invalid input
# Check rate limit
if not await rate_limiter.check(agent_id, tool_name, tool.rate_limit):
raise RateLimitExceeded(f"{tool_name} limit: {tool.rate_limit}/min")
# Forward to tool service with timeout
async with httpx.AsyncClient(timeout=tool.timeout_ms / 1000) as client:
response = await client.post(
tool.endpoint,
json={"params": params},
headers={"X-Agent-Id": agent_id, "X-Request-Id": str(uuid4())}
)
response.raise_for_status()
return response.json()
Memory Architecture
Memory Tier Selection
ContextManager Implementation
# agent/memory.py
import json
from redis.asyncio import Redis
from qdrant_client import QdrantClient
from typing import List
class ContextManager:
def __init__ (self, agent_id: str, max_tokens: int = 4096):
self.agent_id = agent_id
self.max_tokens = max_tokens
self.redis = Redis.from_url(REDIS_URL)
self.qdrant = QdrantClient(QDRANT_URL)
self.embedder = EmbeddingClient()
async def load(self, session_id: str) -> List[dict]:
# 1. Load recent turns from Redis
raw = await self.redis.get(f"session:{session_id}:messages")
messages = json.loads(raw) if raw else []
# 2. Retrieve semantically relevant past context
if messages:
last_user_msg = next(m for m in reversed(messages) if m["role"] == "user")
embedding = await self.embedder.embed(last_user_msg["content"])
relevant = await self.qdrant.search(
collection_name=f"agent_{self.agent_id}_memory",
query_vector=embedding,
limit=3
)
# Prepend as system context
for hit in relevant:
messages.insert(0, {
"role": "system",
"content": f"[Past context] {hit.payload['summary']}"
})
# 3. Compress if over token limit
return await self._compress_if_needed(messages)
async def save(self, session_id: str, messages: List[dict]):
# Save last 20 turns to Redis
recent = messages[-20:]
await self.redis.setex(
f"session:{session_id}:messages",
86400, # 24h TTL
json.dumps(recent)
)
# If session is long, generate and store a summary in vector DB
if len(messages) > 30:
summary = await self._summarize(messages)
embedding = await self.embedder.embed(summary)
await self.qdrant.upsert(
collection_name=f"agent_{self.agent_id}_memory",
points=[{
"id": session_id,
"vector": embedding,
"payload": {"summary": summary, "session_id": session_id}
}]
)
async def _compress_if_needed(self, messages: List[dict]) -> List[dict]:
token_count = estimate_tokens(messages)
if token_count <= self.max_tokens:
return messages
# Keep system messages + last N user/assistant turns
system_msgs = [m for m in messages if m["role"] == "system"]
recent_turns = messages[-12:] # last 6 exchanges
return system_msgs + recent_turns
Context Window Management
Token Estimation
import tiktoken
def estimate_tokens(messages: list, model: str = "gpt-4o") -> int:
enc = tiktoken.encoding_for_model(model)
total = 0
for msg in messages:
total += 4 # per-message overhead
total += len(enc.encode(msg.get("content", "") or ""))
if "tool_calls" in msg:
for tc in msg["tool_calls"]:
total += len(enc.encode(json.dumps(tc)))
return total
class TokenBudget:
def __init__ (self, total: int, model: str):
self.total = total
self.model = model
self.used = 0
self.reserved = 1024 # always reserve for output
@property
def available_for_input(self):
return self.total - self.reserved - self.used
def consume(self, tokens: int):
self.used += tokens
if self.used > self.total - self.reserved:
raise TokenBudgetExceeded(tokens_used=self.used)
Orchestrator & Supervisor Pattern
Orchestrator: Task Decomposition
The Orchestrator is itself an agent microservice, but its role is planning and coordination rather than execution.
# orchestrator/core.py
class OrchestratorAgent:
async def execute(self, user_request: str, session_id: str) -> str:
# Step 1: Decompose into a DAG of sub-tasks
plan = await self.planner.decompose(user_request)
# Returns: [{"id": "t1", "agent": "search", "task": "...", "deps": []},
# {"id": "t2", "agent": "summarize", "task": "...", "deps": ["t1"]},
# {"id": "t3", "agent": "email", "task": "...", "deps": ["t2"]}]
# Step 2: Execute in topological order, parallel where possible
results = {}
for wave in topological_waves(plan):
# All tasks in a wave have their deps satisfied
wave_results = await asyncio.gather(*[
self.supervisor.dispatch(step, results)
for step in wave
])
for step, result in zip(wave, wave_results):
results[step["id"]] = result
# Step 3: Synthesize final output
return await self.synthesizer.merge(results, user_request)
def topological_waves(plan: list) -> list:
"""Return plan steps grouped into parallel execution waves."""
completed = set()
waves = []
remaining = list(plan)
while remaining:
wave = [s for s in remaining if all(d in completed for d in s["deps"])]
waves.append(wave)
completed.update(s["id"] for s in wave)
remaining = [s for s in remaining if s["id"] not in completed]
return waves
Supervisor: Retry & Escalation
class Supervisor:
def __init__ (self, agent_clients: dict):
self.agent_clients = agent_clients
async def dispatch(self, step: dict, context: dict) -> StepResult:
task_prompt = self._inject_context(step["task"], context, step["deps"])
for attempt in range(3):
try:
return await asyncio.wait_for(
self.agent_clients[step["agent"]].run(task_prompt),
timeout=60.0
)
except asyncio.TimeoutError:
if attempt == 2:
raise SupervisorEscalation(step, "timeout_after_3_attempts")
await asyncio.sleep(2 ** attempt) # 1s, 2s, 4s
except AgentError as e:
if e.is_unrecoverable:
raise SupervisorEscalation(step, str(e))
await asyncio.sleep(2 ** attempt)
def _inject_context(self, task: str, results: dict, dep_ids: list) -> str:
context_parts = [results[dep_id].output for dep_id in dep_ids if dep_id in results]
if context_parts:
return f"Context from previous steps:\n{chr(10).join(context_parts)}\n\nTask: {task}"
return task
Security & Authorization
Agent Identity & JWT Verification
Each agent service must verify that incoming requests are from authorized callers. Use short-lived JWT tokens signed by an internal auth service.
# api/middleware.py
from fastapi import Request, HTTPException
from jose import jwt, JWTError
ALLOWED_CALLERS = {"orchestrator", "supervisor", "api-gateway"}
async def verify_agent_token(request: Request):
token = request.headers.get("Authorization", "").removeprefix("Bearer ")
if not token:
raise HTTPException(status_code=401, detail="Missing auth token")
try:
payload = jwt.decode(token, PUBLIC_KEY, algorithms=["RS256"])
caller = payload.get("sub")
if caller not in ALLOWED_CALLERS:
raise HTTPException(status_code=403, detail=f"Caller {caller} not authorized")
request.state.caller = caller
except JWTError as e:
raise HTTPException(status_code=401, detail=f"Invalid token: {e}")
Secrets Management
Never store API keys in environment literals or ConfigMaps. Use Kubernetes Secrets mounted as environment variables, or preferably HashiCorp Vault with the Vault Agent Sidecar.
# k8s/deployment.yaml (secrets section)
env:
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: agent-secrets
key: openai-api-key
- name: TOOL_REGISTRY_TOKEN
valueFrom:
secretKeyRef:
name: agent-secrets
key: tool-registry-token
Tool Call Authorization
The Tool Registry enforces agent-level RBAC: which agents can invoke which tools.
# Tool Registry ACL check
TOOL_ACL = {
"agent-search": ["web_search", "vector_search", "knowledge_base"],
"agent-email": ["send_email", "get_email_thread"],
"agent-code": ["code_exec", "git_read", "package_search"],
"agent-data": ["sql_query", "csv_read", "chart_generate"],
}
async def check_tool_acl(agent_id: str, tool_name: str):
allowed_tools = TOOL_ACL.get(agent_id, [])
if tool_name not in allowed_tools:
raise PermissionError(f"{agent_id} is not authorized to call {tool_name}")
Observability: Traces, Logs, Metrics
Distributed Tracing Setup (OpenTelemetry)
# observability/tracing.py
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
def setup_tracing(service_name: str):
provider = TracerProvider(
resource=Resource(attributes={SERVICE_NAME: service_name})
)
provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint=OTEL_ENDPOINT))
)
trace.set_tracer_provider(provider)
# Auto-instrument frameworks
FastAPIInstrumentor().instrument()
RedisInstrumentor().instrument()
HTTPXClientInstrumentor().instrument()
Standard Span Attributes for Agent Calls
Always set these attributes on every agent and LLM span:
# In AgentRunner._run_loop:
span.set_attribute("agent.id", self.agent_id)
span.set_attribute("agent.task_id", task.id)
span.set_attribute("agent.session_id", task.session_id)
span.set_attribute("agent.step", step)
span.set_attribute("llm.model", config.model)
span.set_attribute("llm.prompt_tokens", response.usage.prompt_tokens)
span.set_attribute("llm.completion_tokens", response.usage.completion_tokens)
span.set_attribute("llm.finish_reason", response.finish_reason)
# In Tool Registry on invoke:
span.set_attribute("tool.name", tool_name)
span.set_attribute("tool.version", tool.version)
span.set_attribute("tool.caller_agent", agent_id)
span.set_attribute("tool.latency_ms", latency_ms)
Prometheus Metrics
# observability/metrics.py
from prometheus_client import Counter, Histogram, Gauge
agent_tasks_total = Counter(
"agent_tasks_total",
"Total tasks processed",
["agent_id", "status"]
)
agent_task_duration = Histogram(
"agent_task_duration_seconds",
"Task end-to-end latency",
["agent_id"],
buckets=[0.5, 1, 2, 5, 10, 30, 60, 120]
)
agent_llm_tokens = Counter(
"agent_llm_tokens_total",
"LLM tokens consumed",
["agent_id", "token_type"] # token_type: prompt | completion
)
agent_tool_calls = Counter(
"agent_tool_calls_total",
"Tool invocations",
["agent_id", "tool_name", "status"]
)
agent_steps_per_task = Histogram(
"agent_steps_per_task",
"Number of steps per task (runaway guard)",
["agent_id"],
buckets=[1, 2, 3, 5, 8, 10, 15, 20, 25]
)
orchestrator_queue_depth = Gauge(
"orchestrator_queue_depth",
"Pending tasks in orchestrator queue"
)
Alert Rules
# alerting/rules.yaml
groups:
- name: agent-alerts
rules:
- alert: AgentHighErrorRate
expr: rate(agent_tasks_total{status="failed"}[5m]) > 0.05
for: 2m
annotations:
summary: "{{ $labels.agent_id }} failure rate above 5%"
- alert: AgentRunawayTask
expr: histogram_quantile(0.99, agent_steps_per_task) > 15
for: 5m
annotations:
summary: "Agent tasks exceeding 15 steps — possible runaway loop"
- alert: LLMTokenCostSpike
expr: rate(agent_llm_tokens_total[10m]) > 50000
for: 5m
annotations:
summary: "Token consumption rate spike — check for loops"
- alert: AgentLatencyHigh
expr: histogram_quantile(0.99, agent_task_duration_seconds) > 10
for: 5m
annotations:
summary: "p99 task latency above 10s"
Structured Logging
# Never log raw prompts or PII. Log task IDs and outcome codes.
import structlog
log = structlog.get_logger()
log.info("agent.task.completed",
task_id=task.id,
session_id=task.session_id, # hashed in prod
agent_id=self.agent_id,
steps=result.steps_used,
tokens=result.tokens_used,
duration_ms=result.duration_ms,
tool_calls=result.tool_calls,
status=result.status,
trace_id=get_current_trace_id()
)
Deployment on Kubernetes
Deployment Manifest
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-search
labels:
app: agent-search
version: v1.4.2
team: ai-platform
spec:
replicas: 2
selector:
matchLabels:
app: agent-search
template:
metadata:
labels:
app: agent-search
version: v1.4.2
annotations:
prometheus.io/scrape: "true"
prometheus.io/path: "/metrics"
prometheus.io/port: "8080"
spec:
serviceAccountName: agent-search
containers:
- name: agent
image: registry.myco.io/agent-search@sha256:<digest> # Always pin by digest
ports:
- containerPort: 8080 # HTTP API
name: http
- containerPort: 50051 # gRPC
name: grpc
env:
- name: AGENT_ID
value: "agent-search"
- name: TOOL_REGISTRY_URL
valueFrom: {configMapKeyRef: {name: agent-config, key: tool-registry-url}}
- name: REDIS_URL
valueFrom: {secretKeyRef: {name: agent-secrets, key: redis-url}}
- name: OPENAI_API_KEY
valueFrom: {secretKeyRef: {name: agent-secrets, key: openai-api-key}}
- name: OTEL_EXPORTER_OTLP_ENDPOINT
valueFrom: {configMapKeyRef: {name: observability-config, key: otel-endpoint}}
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "2"
memory: "2Gi"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 15
failureThreshold: 3
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
failureThreshold: 2
lifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 5"] # drain connections before shutdown
topologySpreadConstraints:
- maxSkew: 1
topologyKey: kubernetes.io/hostname
whenUnsatisfiable: DoNotSchedule
labelSelector:
matchLabels: {app: agent-search}
Horizontal Pod Autoscaler (Custom Metrics)
Scale on Kafka consumer lag and p99 task latency, not just CPU:
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: agent-search-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: agent-search
minReplicas: 2
maxReplicas: 20
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Pods
value: 4
periodSeconds: 60
scaleDown:
stabilizationWindowSeconds: 300 # be conservative scaling down
metrics:
- type: External
external:
metric:
name: kafka_consumer_group_lag
selector:
matchLabels:
topic: agent.tasks.search
target:
type: AverageValue
averageValue: "100"
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
PodDisruptionBudget
Ensure at least one replica is always available during rolling updates:
# k8s/pdb.yaml
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: agent-search-pdb
spec:
minAvailable: 1
selector:
matchLabels:
app: agent-search
Scaling Strategies
Per-Agent Scaling Logic
Multi-Model Fallback
If the primary LLM is unavailable or rate-limited, automatically route to a fallback:
class LLMClient:
MODEL_CASCADE = [
"gpt-4o", # primary
"gpt-4o-mini", # cheaper fallback
"claude-sonnet-4-6", # cross-vendor fallback
]
async def complete(self, messages: list, **kwargs) -> LLMResponse:
for model in self.MODEL_CASCADE:
try:
return await self._call_model(model, messages, **kwargs)
except (RateLimitError, ModelUnavailable):
log.warning("llm.fallback", from_model=model, reason="rate_limit_or_unavailable")
continue
raise AllModelsUnavailable()
Fault Tolerance & Retry Strategies
Circuit Breaker on LLM Client
from circuitbreaker import circuit
class LLMClientWithCircuitBreaker:
@circuit(failure_threshold=5, recovery_timeout=30, expected_exception=LLMError)
async def complete(self, messages: list, **kwargs) -> LLMResponse:
return await self._raw_complete(messages, **kwargs)
The circuit opens after 5 consecutive failures and remains open for 30 seconds, serving fallback responses or routing to a secondary model during that window.
Exponential Backoff with Jitter
from tenacity import (
retry, stop_after_attempt,
wait_exponential_jitter, retry_if_exception_type
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=60),
retry=retry_if_exception_type((RateLimitError, TimeoutError, ServiceUnavailable))
)
async def call_tool_with_retry(tool_name: str, params: dict):
return await tool_registry.invoke(tool_name, params)
Dead Letter Queue Handler
# dlq_handler.py — consumes from dead-letter topic
class DLQHandler:
async def process(self, event: AgentTaskEvent):
log.error("agent.task.dlq",
task_id=event.task_id,
target_agent=event.target_agent,
attempt_count=event.retry_count,
original_error=event.last_error
)
# Alert on-call if error is novel
if await self.is_novel_error(event.last_error):
await self.pagerduty.alert(event)
# Store for human review dashboard
await self.db.insert_dlq_item(event)
# Auto-re-queue with modified params after 1 hour (optional)
if event.retry_count < 2 and event.auto_retry_eligible:
await asyncio.sleep(3600)
event.retry_count += 1
await self.kafka.send("agent.tasks." + event.target_agent, event)
Step-Level Checkpointing
class CheckpointedAgentRunner(AgentRunner):
async def _run_loop(self, task: AgentTask, span) -> AgentResult:
# Restore from checkpoint if available
checkpoint = await self.redis.get(f"checkpoint:{task.id}")
if checkpoint:
state = json.loads(checkpoint)
messages = state["messages"]
total_tokens = state["total_tokens"]
start_step = state["step"] + 1
log.info("agent.checkpoint.restored", task_id=task.id, step=start_step)
else:
context = await self.memory.load(task.session_id)
messages = build_messages(context, task.prompt)
total_tokens = 0
start_step = 0
for step in range(start_step, task.max_steps):
response = await self._complete_with_retry(messages, tool_schemas)
messages.append(response.message)
# Persist checkpoint after each step
await self.redis.setex(
f"checkpoint:{task.id}",
3600,
json.dumps({"messages": messages, "total_tokens": total_tokens, "step": step})
)
if response.finish_reason == "stop":
await self.redis.delete(f"checkpoint:{task.id}")
break
return build_result(task, response, total_tokens, step)
Testing Agent Microservices
Testing Pyramid
# tests/unit/test_agent_runner.py
import pytest
from unittest.mock import AsyncMock, patch
@pytest.fixture
def mock_llm():
llm = AsyncMock()
llm.complete.return_value = LLMResponse(
content="Here is the search result.",
finish_reason="stop",
usage=Usage(prompt_tokens=100, completion_tokens=50, total_tokens=150)
)
return llm
async def test_agent_completes_in_one_step(mock_llm):
runner = AgentRunner("agent-search", test_config)
runner.llm = mock_llm
result = await runner.run(AgentTask(id="t1", session_id="s1", prompt="find AI news"))
assert result.status == TaskStatus.COMPLETED
assert result.steps_used == 1
assert result.tokens_used == 150
mock_llm.complete.assert_called_once()
async def test_agent_respects_token_budget(mock_llm):
mock_llm.complete.return_value = LLMResponse(
content="...", finish_reason="tool_calls",
usage=Usage(prompt_tokens=900, completion_tokens=100, total_tokens=1000)
)
task = AgentTask(id="t1", session_id="s1", prompt="...", token_budget=500)
runner = AgentRunner("agent-search", test_config)
runner.llm = mock_llm
result = await runner.run(task)
assert result.error == "token_budget_exceeded"
Integration Testing with a Mock LLM Server
Use a local mock LLM server (e.g., wiremock or a FastAPI stub) that returns deterministic responses for testing tool call flows end-to-end without hitting real APIs.
# tests/integration/test_tool_flow.py
async def test_search_agent_calls_web_search_tool(mock_llm_server, real_redis, real_tool_registry):
# Configure mock LLM to respond with a tool call on first turn
mock_llm_server.set_response(step=0, response=TOOL_CALL_RESPONSE)
mock_llm_server.set_response(step=1, response=FINAL_RESPONSE)
runner = AgentRunner("agent-search", integration_config)
result = await runner.run(AgentTask(id="t1", session_id="s1", prompt="Search for AI news"))
assert result.status == TaskStatus.COMPLETED
assert result.tool_calls == 1
assert real_tool_registry.was_invoked("web_search")
Chaos Testing
Use Chaos Mesh or Litmus to test resilience:
- Pod kill: Kill a random agent pod — verify Supervisor retries succeed
- Network partition: Block agent→tool-registry traffic — verify circuit breaker opens
- LLM latency injection: Add 15s delay to LLM calls — verify timeout and fallback activate
- Kafka partition leader election: Simulate Kafka failover — verify no task loss via consumer offset management
CI/CD Pipeline for Agent Services
# .github/workflows/agent-service.yml
name: Agent Service CI/CD
on:
push:
paths: ["agents/agent-search/**"]
jobs:
test:
runs-on: ubuntu-latest
services:
redis:
image: redis:7-alpine
ports: ["6379:6379"]
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with: {python-version: "3.12"}
- run: pip install -e ".[dev]"
- run: pytest tests/ --cov=agent --cov-fail-under=85
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Trivy vulnerability scan
uses: aquasecurity/trivy-action@master
with: {image-ref: "agent-search:${{ github.sha }}", exit-code: "1"}
build-push:
needs: [test, security-scan]
runs-on: ubuntu-latest
steps:
- name: Build and push (pinned by digest)
run: |
docker buildx build --platform linux/amd64,linux/arm64 \
-t registry.myco.io/agent-search:${{ github.sha }} \
--push agents/agent-search/
# Capture digest for deployment
DIGEST=$(docker inspect --format='{{index .RepoDigests 0}}' \
registry.myco.io/agent-search:${{ github.sha }})
echo "IMAGE_DIGEST=$DIGEST" >> $GITHUB_ENV
deploy-staging:
needs: build-push
runs-on: ubuntu-latest
steps:
- name: Deploy to staging
run: |
kubectl set image deployment/agent-search \
agent=registry.myco.io/agent-search@${{ env.IMAGE_DIGEST }} \
-n staging
kubectl rollout status deployment/agent-search -n staging --timeout=120s
smoke-test-staging:
needs: deploy-staging
steps:
- run: python tests/smoke/run_smoke_tests.py --env staging
deploy-production:
needs: smoke-test-staging
environment: production
steps:
- name: Rolling deploy to production
run: |
kubectl set image deployment/agent-search \
agent=registry.myco.io/agent-search@${{ env.IMAGE_DIGEST }} \
-n production
kubectl rollout status deployment/agent-search -n production --timeout=300s
Cost Management & Token Budgeting
Per-Agent Token Accounting
Track token usage per agent, per session, and per user to enable chargebacks and anomaly detection.
class TokenAccountant:
async def record(self, agent_id: str, session_id: str, usage: Usage):
# Increment per-agent daily counter
await self.redis.incrby(f"tokens:{agent_id}:{today()}", usage.total_tokens)
await self.redis.expire(f"tokens:{agent_id}:{today()}", 86400 * 7)
# Increment per-session counter (for user billing)
await self.redis.incrby(f"tokens:session:{session_id}", usage.total_tokens)
# Write to time-series DB for cost dashboards
await self.influx.write(
measurement="llm_tokens",
tags={"agent_id": agent_id, "model": usage.model},
fields={"prompt": usage.prompt_tokens, "completion": usage.completion_tokens},
)
async def get_estimated_cost(agent_id: str) -> float:
tokens = int(await redis.get(f"tokens:{agent_id}:{today()}") or 0)
# GPT-4o pricing: $2.50/1M prompt, $10/1M completion (example)
return (tokens / 1_000_000) * 5.0 # blended estimate
Budget Enforcement at Session Level
MAX_SESSION_TOKENS = 50_000 # hard cap per user session
async def check_session_budget(session_id: str):
used = int(await redis.get(f"tokens:session:{session_id}") or 0)
if used > MAX_SESSION_TOKENS:
raise SessionBudgetExceeded(
session_id=session_id,
tokens_used=used,
limit=MAX_SESSION_TOKENS
)
Production Readiness Checklist
Service-Level Requirements
- Agent has /health endpoint that checks LLM client connectivity
- Agent has /ready endpoint that checks memory store (Redis) and Tool Registry reachability
- All tool calls are schema-validated by Tool Registry before execution
- Agent-level RBAC enforced: agent X cannot invoke tools it is not authorized for
- JWT verification on all inter-agent gRPC and HTTP calls
- Secrets loaded from Kubernetes Secrets or Vault — never from env literals or ConfigMaps
Reliability Requirements
- Context window size is bounded — no unbounded message history growth
- Token budget enforced per task with hard ceiling
- MAX_STEPS guard in place to prevent runaway loops
- Exponential backoff with jitter on all LLM calls
- Circuit breaker configured on LLM client (threshold, recovery timeout)
- Exponential backoff on all tool calls
- Failed tasks routed to Dead Letter Queue — not silently dropped
- Step-level checkpointing for tasks expected to exceed 60 seconds
- Multi-model fallback cascade configured (primary → cheaper → cross-vendor)
Observability Requirements
- OpenTelemetry distributed tracing with trace context propagation
- All LLM completions traced with token counts and latency
- All tool calls traced with tool name, version, and outcome
- Prometheus metrics exported: task count, duration, token usage, tool calls, step count
- Alerts configured: high error rate, runaway steps, token cost spike, high latency
- Structured logging (JSON) with task_id, session_id (hashed), trace_id — no raw prompt content
Deployment Requirements
- Agent image pinned to digest, not mutable tag (never :latest)
- HPA configured with appropriate metrics (queue lag and latency, not just CPU)
- PodDisruptionBudget set (minAvailable >= 1)
- Pod topology spread constraints configured for HA across nodes
- Resource requests and limits set (no QoS class “BestEffort”)
- Rolling update strategy with preStop sleep for graceful shutdown
- Integration tests cover “tool call fails → agent recovers” path
- Load tests simulate 10× expected peak concurrency before go-live
Cost Control Requirements
- Token usage recorded per agent and per session
- Session-level budget cap enforced
- Token cost alerting configured per agent
- DLQ monitored — no silent retry storms









Top comments (0)