DEV Community

Kotcherla Murali Krishna
Kotcherla Murali Krishna

Posted on

Building Micro Agents as Production-Grade Microservices

Build production-grade AI agent systems using microservices. Covers FastAPI, gRPC, Kafka, Kubernetes, OpenTelemetry, and fault-tolerant orchestration patterns in Python.

Micro agents as Production-Grade Microservices

Table of Contents

  • Introduction & Motivation
  • Core Architecture Principles
  • Agent Service Design
  • The AgentRunner Loop
  • Inter-Agent Communication
  • Tool Registry Service
  • Memory Architecture
  • Context Window Management
  • Orchestrator & Supervisor Pattern
  • Security & Authorization
  • Observability: Traces, Logs, Metrics
  • Deployment on Kubernetes
  • Scaling Strategies
  • Fault Tolerance & Retry Strategies
  • Testing Agent Microservices
  • CI/CD Pipeline for Agent Services
  • Cost Management & Token Budgeting
  • Production Readiness Checklist
  • Reference Architecture Diagram

Introduction & Motivation

Why monolithic agent systems fail in production

A single-process agent that handles reasoning, tool calls, memory retrieval, and output generation works well in prototypes. In production it breaks in predictable ways:

  • Latency coupling  — one slow tool call blocks the entire inference loop
  • Unscalable compute  — you cannot scale the summarization workload independently from the search workload
  • Blast radius  — a single LLM API timeout or memory corruption takes the whole system down
  • Zero deployment granularity  — updating one tool integration requires redeploying everything
  • No isolation for billing  — impossible to attribute compute cost to individual agent functions

The microservice solution

Each autonomous capability becomes an independently deployable, independently scalable service with:

  • Its own API surface (HTTP/gRPC)
  • Its own health checks and readiness probes
  • Its own memory scope (no shared in-process state)
  • Its own tool bindings (resolved at runtime from a Tool Registry)
  • Its own observability (distributed traces, metrics, structured logs)

What is a Micro Agent?

A micro agent is a bounded autonomous service that:

  1. Accepts a task (prompt + context + session ID) via an API call
  2. Runs a plan → act → observe loop using an LLM backend
  3. Invokes tools via a centralized Tool Registry
  4. Stores and retrieves conversation state from an external memory store
  5. Returns a typed result or emits an event to downstream consumers

Key insight: A micro agent is not a “smart function” — it is a service with its own API contract, memory scope, failure modes, and SLA. Design it accordingly.

Core Architecture Principles

Single Responsibility

Each agent owns exactly one reasoning domain. Examples:

Single Responsibility

Stateless Reasoning, Stateful Memory

The LLM inference step must be stateless. Memory lives in external stores:

LLM inference step

No conversation history should ever live in in-process RAM between requests.

Schema-First Tool Contracts

Every tool must have a JSON Schema definition published to a shared Tool Registry before any agent can invoke it. No ad-hoc function signatures. This enables:

  • Runtime input validation before LLM output reaches backend services
  • Auto-generated documentation
  • Tool versioning with backwards compatibility checks

Idempotent Actions

Any tool call that modifies external state (send email, write to DB, trigger webhook) must be idempotent. Strategies:

  • Use idempotency keys at the HTTP layer (pass Idempotency-Key header)
  • Use message deduplication at the queue level (Kafka exactly-once semantics)
  • Design tool handlers to be safe to retry: check-then-act patterns

Async by Default

Long-running agent tasks (multi-step research, code generation + execution) must use async task queues — not synchronous HTTP with long timeouts.

Client ──► POST /tasks ──► Kafka/BullMQ ──► AgentWorker

Client ──► GET /tasks/{id} ──► Redis (status polling)

◄── WebSocket/SSE push (optional)

Explicit Context Boundaries

Each agent invocation carries a bounded context packet  — never grow unbounded message histories. A ContextManager service compresses/summarizes history before injection.

Agent Service Design

Project Layout

Each agent is a containerized FastAPI or gRPC service with this canonical structure:

agent-search/

├── agent/

│ ├── core.py # AgentRunner: plan → act → observe loop

│ ├── prompts.py # System prompt + few-shot templates

│ ├── memory.py # ContextManager: load/compress/save

│ ├── tools.py # Tool bindings (calls Tool Registry)

│ └── schemas.py # Pydantic models for all I/O

├── api/

│ ├── routes.py # POST /run, GET /status/{task_id}

│ ├── middleware.py # Auth, rate limiting, request tracing

│ └── deps.py # Dependency injection: DB, Redis, LLM client

├── tests/

│ ├── unit/

│ ├── integration/

│ └── fixtures/

├── Dockerfile

├── pyproject.toml

└── k8s/

├── deployment.yaml

├── service.yaml

├── hpa.yaml

└── configmap.yaml

API Contract

Every agent exposes these HTTP endpoints at minimum:

POST /run Submit a task (sync, short tasks only)

POST /tasks Submit a task (async, returns task_id)

GET /tasks/{task_id} Poll task status and result

GET /health Liveness probe

GET /ready Readiness probe (checks LLM + memory store)

GET /metrics Prometheus metrics endpoint

# agent/schemas.py
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
from enum import Enum

class TaskStatus(str, Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

class AgentTask(BaseModel):
    id: str
    session_id: str
    prompt: str
    metadata: Dict[str, Any] = Field(default_factory=dict)
    max_steps: int = Field(default=10, ge=1, le=25)
    token_budget: int = Field(default=8192, ge=512, le=32768)

class AgentResult(BaseModel):
    task_id: str
    status: TaskStatus
    output: Optional[str] = None
    steps_used: int = 0
    tokens_used: int = 0
    tool_calls: int = 0
    error: Optional[str] = None
    duration_ms: int = 0
Enter fullscreen mode Exit fullscreen mode

The AgentRunner Loop

Full Implementation

# agent/core.py
import asyncio
import time
from opentelemetry import trace
from tenacity import retry, stop_after_attempt, wait_exponential_jitter

tracer = trace.get_tracer( __name__ )
MAX_STEPS = 15

class AgentRunner:
    def __init__ (self, agent_id: str, config: AgentConfig):
        self.agent_id = agent_id
        self.llm = LLMClient(model=config.model, timeout=30)
        self.memory = ContextManager(agent_id, max_tokens=config.context_limit)
        self.tools = ToolRegistryClient(config.tool_registry_url)
        self.metrics = AgentMetrics(agent_id)

    async def run(self, task: AgentTask) -> AgentResult:
        start = time.monotonic()

        with tracer.start_as_current_span("agent.run") as span:
            span.set_attribute("agent.id", self.agent_id)
            span.set_attribute("agent.task_id", task.id)
            span.set_attribute("agent.session", task.session_id)

            try:
                result = await self._run_loop(task, span)
            except TokenBudgetExceeded as e:
                result = AgentResult(
                    task_id=task.id,
                    status=TaskStatus.COMPLETED,
                    output=e.partial_output,
                    error="token_budget_exceeded"
                )
            except Exception as e:
                span.record_exception(e)
                result = AgentResult(
                    task_id=task.id,
                    status=TaskStatus.FAILED,
                    error=str(e)
                )
            finally:
                result.duration_ms = int((time.monotonic() - start) * 1000)
                self.metrics.record(result)

            return result

    async def _run_loop(self, task: AgentTask, span) -> AgentResult:
        # Load available tools from registry
        tool_schemas = await self.tools.fetch(agent_id=self.agent_id)

        # Load and compress conversation history
        context = await self.memory.load(task.session_id)
        messages = build_messages(context, task.prompt)

        total_tokens = 0
        tool_call_count = 0

        for step in range(task.max_steps):
            span.set_attribute("agent.current_step", step)

            with tracer.start_as_current_span("agent.llm_call") as llm_span:
                response = await self._complete_with_retry(messages, tool_schemas)
                llm_span.set_attribute("llm.prompt_tokens", response.usage.prompt_tokens)
                llm_span.set_attribute("llm.completion_tokens", response.usage.completion_tokens)

            total_tokens += response.usage.total_tokens

            if total_tokens > task.token_budget:
                raise TokenBudgetExceeded(
                    partial_output=response.content,
                    tokens_used=total_tokens
                )

            if response.finish_reason == "stop":
                await self.memory.save(task.session_id, messages + [response.message])
                return AgentResult(
                    task_id=task.id,
                    status=TaskStatus.COMPLETED,
                    output=response.content,
                    steps_used=step + 1,
                    tokens_used=total_tokens,
                    tool_calls=tool_call_count
                )

            if response.tool_calls:
                tool_call_count += len(response.tool_calls)
                results = await self._execute_tools(response.tool_calls)
                messages.append(response.message)
                messages.extend(tool_result_messages(results))

        # Hit max steps — return best available output
        return AgentResult(
            task_id=task.id,
            status=TaskStatus.COMPLETED,
            output=response.content,
            steps_used=task.max_steps,
            tokens_used=total_tokens,
            error="max_steps_reached"
        )

    @retry(stop=stop_after_attempt(3), wait=wait_exponential_jitter(max=15))
    async def _complete_with_retry(self, messages, tools):
        return await self.llm.complete(messages=messages, tools=tools)

    async def _execute_tools(self, tool_calls):
        tasks = [self.tools.invoke(tc) for tc in tool_calls]
        return await asyncio.gather(*tasks, return_exceptions=True)
Enter fullscreen mode Exit fullscreen mode

Inter-Agent Communication

Pattern Selection Matrix

Pattern Selection Matrix

gRPC Service Definition

For synchronous sub-agent calls, gRPC provides strong typing, bidirectional streaming, and efficient binary serialization.

// proto/agent_service.proto
syntax = "proto3";
package agents.v1;

service AgentService {
  rpc RunTask (TaskRequest) returns (TaskResponse);
  rpc StreamSteps (TaskRequest) returns (stream StepEvent);
  rpc Health (HealthRequest) returns (HealthResponse);
}

message TaskRequest {
  string task_id = 1;
  string session_id = 2;
  string prompt = 3;
  map<string, string> metadata = 4;
  int32 max_steps = 5;
  int32 token_budget = 6;
}

message TaskResponse {
  string task_id = 1;
  string status = 2;
  string output = 3;
  int32 steps_used = 4;
  int32 tokens_used = 5;
  string error = 6;
}

message StepEvent {
  int32 step_number = 1;
  string type = 2; // "llm_call" | "tool_call" | "tool_result"
  string content = 3;
}
Enter fullscreen mode Exit fullscreen mode

Kafka Event Schema

For async pipeline handoffs between agents, use Avro or JSON schemas registered in a Schema Registry.

{
  "schema": {
    "type": "record",
    "name": "AgentTaskEvent",
    "namespace": "com.myco.agents.v1",
    "fields": [
      {"name": "task_id", "type": "string"},
      {"name": "source_agent", "type": "string"},
      {"name": "target_agent", "type": "string"},
      {"name": "session_id", "type": "string"},
      {"name": "prompt", "type": "string"},
      {"name": "context", "type": {"type": "map", "values": "string"}},
      {"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}}
    ]
  }
}
Enter fullscreen mode Exit fullscreen mode

Kafka Producer (in Orchestrator)

# In orchestrator when dispatching to agent-search
from aiokafka import AIOKafkaProducer
import json

async def dispatch_to_agent(target_agent: str, task: AgentTask):
    producer = AIOKafkaProducer(bootstrap_servers=KAFKA_BROKERS)
    await producer.start()
    try:
        event = {
            "task_id": task.id,
            "source_agent": "orchestrator",
            "target_agent": target_agent,
            "session_id": task.session_id,
            "prompt": task.prompt,
            "created_at": int(time.time() * 1000)
        }
        await producer.send_and_wait(
            topic=f"agent.tasks.{target_agent}",
            value=json.dumps(event).encode(),
            key=task.session_id.encode(), # partition by session
            headers=[("trace-id", get_current_trace_id().encode())]
        )
    finally:
        await producer.stop()
Enter fullscreen mode Exit fullscreen mode

Tool Registry Service

Architecture

The Tool Registry is a centralized FastAPI service that stores, validates, and serves tool definitions. It acts as a typed API gateway for all agent→tool traffic.

Tool registry

Tool Registration Schema

# Tool self-registers on startup
class ToolDefinition(BaseModel):
    name: str
    version: str
    description: str
    parameters: Dict[str, Any] # JSON Schema
    returns: Dict[str, Any] # JSON Schema
    endpoint: str # where registry routes calls
    health_url: str
    auth_type: str # "api_key" | "oauth2" | "none"
    rate_limit: int # calls per minute per agent
    timeout_ms: int = 10000

# Registration call at tool service startup
@app.on_event("startup")
async def register_tool():
    registry = ToolRegistryClient(TOOL_REGISTRY_URL)
    await registry.register(ToolDefinition(
        name="web_search",
        version="2.1.0",
        description="Search the web and return ranked results",
        parameters={
            "type": "object",
            "properties": {
                "query": {"type": "string", "maxLength": 500},
                "num_results": {"type": "integer", "minimum": 1, "maximum": 20}
            },
            "required": ["query"]
        },
        returns={
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "url": {"type": "string"},
                    "title": {"type": "string"},
                    "snippet": {"type": "string"}
                }
            }
        },
        endpoint=f"{SERVICE_URL}/invoke",
        health_url=f"{SERVICE_URL}/health",
        auth_type="api_key",
        rate_limit=60,
        timeout_ms=8000
    ))
Enter fullscreen mode Exit fullscreen mode

Registry Validation Layer

# Tool Registry validates before forwarding
async def invoke_tool(agent_id: str, tool_name: str, params: dict):
    tool = await db.get_tool(tool_name)

    if not tool:
        raise ToolNotFoundError(tool_name)

    # Validate against JSON Schema
    jsonschema.validate(params, tool.parameters) # raises on invalid input

    # Check rate limit
    if not await rate_limiter.check(agent_id, tool_name, tool.rate_limit):
        raise RateLimitExceeded(f"{tool_name} limit: {tool.rate_limit}/min")

    # Forward to tool service with timeout
    async with httpx.AsyncClient(timeout=tool.timeout_ms / 1000) as client:
        response = await client.post(
            tool.endpoint,
            json={"params": params},
            headers={"X-Agent-Id": agent_id, "X-Request-Id": str(uuid4())}
        )
        response.raise_for_status()
        return response.json()
Enter fullscreen mode Exit fullscreen mode

Memory Architecture

Memory Tier Selection

Memory Tier Selection

ContextManager Implementation

# agent/memory.py
import json
from redis.asyncio import Redis
from qdrant_client import QdrantClient
from typing import List

class ContextManager:
    def __init__ (self, agent_id: str, max_tokens: int = 4096):
        self.agent_id = agent_id
        self.max_tokens = max_tokens
        self.redis = Redis.from_url(REDIS_URL)
        self.qdrant = QdrantClient(QDRANT_URL)
        self.embedder = EmbeddingClient()

    async def load(self, session_id: str) -> List[dict]:
        # 1. Load recent turns from Redis
        raw = await self.redis.get(f"session:{session_id}:messages")
        messages = json.loads(raw) if raw else []

        # 2. Retrieve semantically relevant past context
        if messages:
            last_user_msg = next(m for m in reversed(messages) if m["role"] == "user")
            embedding = await self.embedder.embed(last_user_msg["content"])
            relevant = await self.qdrant.search(
                collection_name=f"agent_{self.agent_id}_memory",
                query_vector=embedding,
                limit=3
            )
            # Prepend as system context
            for hit in relevant:
                messages.insert(0, {
                    "role": "system",
                    "content": f"[Past context] {hit.payload['summary']}"
                })

        # 3. Compress if over token limit
        return await self._compress_if_needed(messages)

    async def save(self, session_id: str, messages: List[dict]):
        # Save last 20 turns to Redis
        recent = messages[-20:]
        await self.redis.setex(
            f"session:{session_id}:messages",
            86400, # 24h TTL
            json.dumps(recent)
        )

        # If session is long, generate and store a summary in vector DB
        if len(messages) > 30:
            summary = await self._summarize(messages)
            embedding = await self.embedder.embed(summary)
            await self.qdrant.upsert(
                collection_name=f"agent_{self.agent_id}_memory",
                points=[{
                    "id": session_id,
                    "vector": embedding,
                    "payload": {"summary": summary, "session_id": session_id}
                }]
            )

    async def _compress_if_needed(self, messages: List[dict]) -> List[dict]:
        token_count = estimate_tokens(messages)
        if token_count <= self.max_tokens:
            return messages

        # Keep system messages + last N user/assistant turns
        system_msgs = [m for m in messages if m["role"] == "system"]
        recent_turns = messages[-12:] # last 6 exchanges
        return system_msgs + recent_turns
Enter fullscreen mode Exit fullscreen mode

Context Window Management

Token Estimation

import tiktoken

def estimate_tokens(messages: list, model: str = "gpt-4o") -> int:
    enc = tiktoken.encoding_for_model(model)
    total = 0
    for msg in messages:
        total += 4 # per-message overhead
        total += len(enc.encode(msg.get("content", "") or ""))
        if "tool_calls" in msg:
            for tc in msg["tool_calls"]:
                total += len(enc.encode(json.dumps(tc)))
    return total

class TokenBudget:
    def __init__ (self, total: int, model: str):
        self.total = total
        self.model = model
        self.used = 0
        self.reserved = 1024 # always reserve for output

    @property
    def available_for_input(self):
        return self.total - self.reserved - self.used

    def consume(self, tokens: int):
        self.used += tokens
        if self.used > self.total - self.reserved:
            raise TokenBudgetExceeded(tokens_used=self.used)
Enter fullscreen mode Exit fullscreen mode

Orchestrator & Supervisor Pattern

Orchestrator: Task Decomposition

The Orchestrator is itself an agent microservice, but its role is planning and coordination rather than execution.

# orchestrator/core.py
class OrchestratorAgent:
    async def execute(self, user_request: str, session_id: str) -> str:
        # Step 1: Decompose into a DAG of sub-tasks
        plan = await self.planner.decompose(user_request)
        # Returns: [{"id": "t1", "agent": "search", "task": "...", "deps": []},
        # {"id": "t2", "agent": "summarize", "task": "...", "deps": ["t1"]},
        # {"id": "t3", "agent": "email", "task": "...", "deps": ["t2"]}]

        # Step 2: Execute in topological order, parallel where possible
        results = {}
        for wave in topological_waves(plan):
            # All tasks in a wave have their deps satisfied
            wave_results = await asyncio.gather(*[
                self.supervisor.dispatch(step, results)
                for step in wave
            ])
            for step, result in zip(wave, wave_results):
                results[step["id"]] = result

        # Step 3: Synthesize final output
        return await self.synthesizer.merge(results, user_request)

def topological_waves(plan: list) -> list:
    """Return plan steps grouped into parallel execution waves."""
    completed = set()
    waves = []
    remaining = list(plan)
    while remaining:
        wave = [s for s in remaining if all(d in completed for d in s["deps"])]
        waves.append(wave)
        completed.update(s["id"] for s in wave)
        remaining = [s for s in remaining if s["id"] not in completed]
    return waves
Enter fullscreen mode Exit fullscreen mode

Supervisor: Retry & Escalation

class Supervisor:
    def __init__ (self, agent_clients: dict):
        self.agent_clients = agent_clients

    async def dispatch(self, step: dict, context: dict) -> StepResult:
        task_prompt = self._inject_context(step["task"], context, step["deps"])

        for attempt in range(3):
            try:
                return await asyncio.wait_for(
                    self.agent_clients[step["agent"]].run(task_prompt),
                    timeout=60.0
                )
            except asyncio.TimeoutError:
                if attempt == 2:
                    raise SupervisorEscalation(step, "timeout_after_3_attempts")
                await asyncio.sleep(2 ** attempt) # 1s, 2s, 4s
            except AgentError as e:
                if e.is_unrecoverable:
                    raise SupervisorEscalation(step, str(e))
                await asyncio.sleep(2 ** attempt)

    def _inject_context(self, task: str, results: dict, dep_ids: list) -> str:
        context_parts = [results[dep_id].output for dep_id in dep_ids if dep_id in results]
        if context_parts:
            return f"Context from previous steps:\n{chr(10).join(context_parts)}\n\nTask: {task}"
        return task
Enter fullscreen mode Exit fullscreen mode

Security & Authorization

Agent Identity & JWT Verification

Each agent service must verify that incoming requests are from authorized callers. Use short-lived JWT tokens signed by an internal auth service.

# api/middleware.py
from fastapi import Request, HTTPException
from jose import jwt, JWTError

ALLOWED_CALLERS = {"orchestrator", "supervisor", "api-gateway"}

async def verify_agent_token(request: Request):
    token = request.headers.get("Authorization", "").removeprefix("Bearer ")
    if not token:
        raise HTTPException(status_code=401, detail="Missing auth token")
    try:
        payload = jwt.decode(token, PUBLIC_KEY, algorithms=["RS256"])
        caller = payload.get("sub")
        if caller not in ALLOWED_CALLERS:
            raise HTTPException(status_code=403, detail=f"Caller {caller} not authorized")
        request.state.caller = caller
    except JWTError as e:
        raise HTTPException(status_code=401, detail=f"Invalid token: {e}")
Enter fullscreen mode Exit fullscreen mode

Secrets Management

Never store API keys in environment literals or ConfigMaps. Use Kubernetes Secrets mounted as environment variables, or preferably HashiCorp Vault with the Vault Agent Sidecar.

# k8s/deployment.yaml (secrets section)
env:
  - name: OPENAI_API_KEY
    valueFrom:
      secretKeyRef:
        name: agent-secrets
        key: openai-api-key
  - name: TOOL_REGISTRY_TOKEN
    valueFrom:
      secretKeyRef:
        name: agent-secrets
        key: tool-registry-token
Enter fullscreen mode Exit fullscreen mode

Tool Call Authorization

The Tool Registry enforces agent-level RBAC: which agents can invoke which tools.

# Tool Registry ACL check
TOOL_ACL = {
    "agent-search": ["web_search", "vector_search", "knowledge_base"],
    "agent-email": ["send_email", "get_email_thread"],
    "agent-code": ["code_exec", "git_read", "package_search"],
    "agent-data": ["sql_query", "csv_read", "chart_generate"],
}

async def check_tool_acl(agent_id: str, tool_name: str):
    allowed_tools = TOOL_ACL.get(agent_id, [])
    if tool_name not in allowed_tools:
        raise PermissionError(f"{agent_id} is not authorized to call {tool_name}")
Enter fullscreen mode Exit fullscreen mode

Observability: Traces, Logs, Metrics

Distributed Tracing Setup (OpenTelemetry)

# observability/tracing.py
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

def setup_tracing(service_name: str):
    provider = TracerProvider(
        resource=Resource(attributes={SERVICE_NAME: service_name})
    )
    provider.add_span_processor(
        BatchSpanProcessor(OTLPSpanExporter(endpoint=OTEL_ENDPOINT))
    )
    trace.set_tracer_provider(provider)

    # Auto-instrument frameworks
    FastAPIInstrumentor().instrument()
    RedisInstrumentor().instrument()
    HTTPXClientInstrumentor().instrument()
Enter fullscreen mode Exit fullscreen mode

Standard Span Attributes for Agent Calls

Always set these attributes on every agent and LLM span:

# In AgentRunner._run_loop:
span.set_attribute("agent.id", self.agent_id)
span.set_attribute("agent.task_id", task.id)
span.set_attribute("agent.session_id", task.session_id)
span.set_attribute("agent.step", step)
span.set_attribute("llm.model", config.model)
span.set_attribute("llm.prompt_tokens", response.usage.prompt_tokens)
span.set_attribute("llm.completion_tokens", response.usage.completion_tokens)
span.set_attribute("llm.finish_reason", response.finish_reason)

# In Tool Registry on invoke:
span.set_attribute("tool.name", tool_name)
span.set_attribute("tool.version", tool.version)
span.set_attribute("tool.caller_agent", agent_id)
span.set_attribute("tool.latency_ms", latency_ms)
Enter fullscreen mode Exit fullscreen mode

Prometheus Metrics

# observability/metrics.py
from prometheus_client import Counter, Histogram, Gauge

agent_tasks_total = Counter(
    "agent_tasks_total",
    "Total tasks processed",
    ["agent_id", "status"]
)

agent_task_duration = Histogram(
    "agent_task_duration_seconds",
    "Task end-to-end latency",
    ["agent_id"],
    buckets=[0.5, 1, 2, 5, 10, 30, 60, 120]
)

agent_llm_tokens = Counter(
    "agent_llm_tokens_total",
    "LLM tokens consumed",
    ["agent_id", "token_type"] # token_type: prompt | completion
)

agent_tool_calls = Counter(
    "agent_tool_calls_total",
    "Tool invocations",
    ["agent_id", "tool_name", "status"]
)

agent_steps_per_task = Histogram(
    "agent_steps_per_task",
    "Number of steps per task (runaway guard)",
    ["agent_id"],
    buckets=[1, 2, 3, 5, 8, 10, 15, 20, 25]
)

orchestrator_queue_depth = Gauge(
    "orchestrator_queue_depth",
    "Pending tasks in orchestrator queue"
)
Enter fullscreen mode Exit fullscreen mode

Alert Rules

# alerting/rules.yaml
groups:
  - name: agent-alerts
    rules:
      - alert: AgentHighErrorRate
        expr: rate(agent_tasks_total{status="failed"}[5m]) > 0.05
        for: 2m
        annotations:
          summary: "{{ $labels.agent_id }} failure rate above 5%"

      - alert: AgentRunawayTask
        expr: histogram_quantile(0.99, agent_steps_per_task) > 15
        for: 5m
        annotations:
          summary: "Agent tasks exceeding 15 steps  possible runaway loop"

      - alert: LLMTokenCostSpike
        expr: rate(agent_llm_tokens_total[10m]) > 50000
        for: 5m
        annotations:
          summary: "Token consumption rate spike  check for loops"

      - alert: AgentLatencyHigh
        expr: histogram_quantile(0.99, agent_task_duration_seconds) > 10
        for: 5m
        annotations:
          summary: "p99 task latency above 10s"
Enter fullscreen mode Exit fullscreen mode

Structured Logging

# Never log raw prompts or PII. Log task IDs and outcome codes.
import structlog

log = structlog.get_logger()

log.info("agent.task.completed",
    task_id=task.id,
    session_id=task.session_id, # hashed in prod
    agent_id=self.agent_id,
    steps=result.steps_used,
    tokens=result.tokens_used,
    duration_ms=result.duration_ms,
    tool_calls=result.tool_calls,
    status=result.status,
    trace_id=get_current_trace_id()
)
Enter fullscreen mode Exit fullscreen mode

Deployment on Kubernetes

Deployment Manifest

# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: agent-search
  labels:
    app: agent-search
    version: v1.4.2
    team: ai-platform
spec:
  replicas: 2
  selector:
    matchLabels:
      app: agent-search
  template:
    metadata:
      labels:
        app: agent-search
        version: v1.4.2
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/path: "/metrics"
        prometheus.io/port: "8080"
    spec:
      serviceAccountName: agent-search
      containers:
        - name: agent
          image: registry.myco.io/agent-search@sha256:<digest> # Always pin by digest
          ports:
            - containerPort: 8080 # HTTP API
              name: http
            - containerPort: 50051 # gRPC
              name: grpc
          env:
            - name: AGENT_ID
              value: "agent-search"
            - name: TOOL_REGISTRY_URL
              valueFrom: {configMapKeyRef: {name: agent-config, key: tool-registry-url}}
            - name: REDIS_URL
              valueFrom: {secretKeyRef: {name: agent-secrets, key: redis-url}}
            - name: OPENAI_API_KEY
              valueFrom: {secretKeyRef: {name: agent-secrets, key: openai-api-key}}
            - name: OTEL_EXPORTER_OTLP_ENDPOINT
              valueFrom: {configMapKeyRef: {name: observability-config, key: otel-endpoint}}
          resources:
            requests:
              cpu: "500m"
              memory: "512Mi"
            limits:
              cpu: "2"
              memory: "2Gi"
          livenessProbe:
            httpGet:
              path: /health
              port: 8080
            initialDelaySeconds: 10
            periodSeconds: 15
            failureThreshold: 3
          readinessProbe:
            httpGet:
              path: /ready
              port: 8080
            initialDelaySeconds: 5
            periodSeconds: 10
            failureThreshold: 2
          lifecycle:
            preStop:
              exec:
                command: ["/bin/sh", "-c", "sleep 5"] # drain connections before shutdown
      topologySpreadConstraints:
        - maxSkew: 1
          topologyKey: kubernetes.io/hostname
          whenUnsatisfiable: DoNotSchedule
          labelSelector:
            matchLabels: {app: agent-search}
Enter fullscreen mode Exit fullscreen mode

Horizontal Pod Autoscaler (Custom Metrics)

Scale on Kafka consumer lag and p99 task latency, not just CPU:

# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: agent-search-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: agent-search
  minReplicas: 2
  maxReplicas: 20
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
        - type: Pods
          value: 4
          periodSeconds: 60
    scaleDown:
      stabilizationWindowSeconds: 300 # be conservative scaling down
  metrics:
    - type: External
      external:
        metric:
          name: kafka_consumer_group_lag
          selector:
            matchLabels:
              topic: agent.tasks.search
        target:
          type: AverageValue
          averageValue: "100"
    - type: Resource
      resource:
        name: cpu
        target:
          type: Utilization
          averageUtilization: 70
Enter fullscreen mode Exit fullscreen mode

PodDisruptionBudget

Ensure at least one replica is always available during rolling updates:

# k8s/pdb.yaml
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
  name: agent-search-pdb
spec:
  minAvailable: 1
  selector:
    matchLabels:
      app: agent-search
Enter fullscreen mode Exit fullscreen mode

Scaling Strategies

Per-Agent Scaling Logic

Per-Agent Scaling Logic

Multi-Model Fallback

If the primary LLM is unavailable or rate-limited, automatically route to a fallback:

class LLMClient:
    MODEL_CASCADE = [
        "gpt-4o", # primary
        "gpt-4o-mini", # cheaper fallback
        "claude-sonnet-4-6", # cross-vendor fallback
    ]

    async def complete(self, messages: list, **kwargs) -> LLMResponse:
        for model in self.MODEL_CASCADE:
            try:
                return await self._call_model(model, messages, **kwargs)
            except (RateLimitError, ModelUnavailable):
                log.warning("llm.fallback", from_model=model, reason="rate_limit_or_unavailable")
                continue
        raise AllModelsUnavailable()
Enter fullscreen mode Exit fullscreen mode

Fault Tolerance & Retry Strategies

Circuit Breaker on LLM Client

from circuitbreaker import circuit

class LLMClientWithCircuitBreaker:
    @circuit(failure_threshold=5, recovery_timeout=30, expected_exception=LLMError)
    async def complete(self, messages: list, **kwargs) -> LLMResponse:
        return await self._raw_complete(messages, **kwargs)
Enter fullscreen mode Exit fullscreen mode

The circuit opens after 5 consecutive failures and remains open for 30 seconds, serving fallback responses or routing to a secondary model during that window.

Exponential Backoff with Jitter

from tenacity import (
    retry, stop_after_attempt,
    wait_exponential_jitter, retry_if_exception_type
)

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential_jitter(initial=1, max=60),
    retry=retry_if_exception_type((RateLimitError, TimeoutError, ServiceUnavailable))
)
async def call_tool_with_retry(tool_name: str, params: dict):
    return await tool_registry.invoke(tool_name, params)
Enter fullscreen mode Exit fullscreen mode

Dead Letter Queue Handler

# dlq_handler.py — consumes from dead-letter topic
class DLQHandler:
    async def process(self, event: AgentTaskEvent):
        log.error("agent.task.dlq",
            task_id=event.task_id,
            target_agent=event.target_agent,
            attempt_count=event.retry_count,
            original_error=event.last_error
        )

        # Alert on-call if error is novel
        if await self.is_novel_error(event.last_error):
            await self.pagerduty.alert(event)

        # Store for human review dashboard
        await self.db.insert_dlq_item(event)

        # Auto-re-queue with modified params after 1 hour (optional)
        if event.retry_count < 2 and event.auto_retry_eligible:
            await asyncio.sleep(3600)
            event.retry_count += 1
            await self.kafka.send("agent.tasks." + event.target_agent, event)
Enter fullscreen mode Exit fullscreen mode

Step-Level Checkpointing

class CheckpointedAgentRunner(AgentRunner):
    async def _run_loop(self, task: AgentTask, span) -> AgentResult:
        # Restore from checkpoint if available
        checkpoint = await self.redis.get(f"checkpoint:{task.id}")
        if checkpoint:
            state = json.loads(checkpoint)
            messages = state["messages"]
            total_tokens = state["total_tokens"]
            start_step = state["step"] + 1
            log.info("agent.checkpoint.restored", task_id=task.id, step=start_step)
        else:
            context = await self.memory.load(task.session_id)
            messages = build_messages(context, task.prompt)
            total_tokens = 0
            start_step = 0

        for step in range(start_step, task.max_steps):
            response = await self._complete_with_retry(messages, tool_schemas)
            messages.append(response.message)

            # Persist checkpoint after each step
            await self.redis.setex(
                f"checkpoint:{task.id}",
                3600,
                json.dumps({"messages": messages, "total_tokens": total_tokens, "step": step})
            )

            if response.finish_reason == "stop":
                await self.redis.delete(f"checkpoint:{task.id}")
                break

        return build_result(task, response, total_tokens, step)
Enter fullscreen mode Exit fullscreen mode

Testing Agent Microservices

Testing Pyramid

Testing Pyramid

# tests/unit/test_agent_runner.py
import pytest
from unittest.mock import AsyncMock, patch

@pytest.fixture
def mock_llm():
    llm = AsyncMock()
    llm.complete.return_value = LLMResponse(
        content="Here is the search result.",
        finish_reason="stop",
        usage=Usage(prompt_tokens=100, completion_tokens=50, total_tokens=150)
    )
    return llm

async def test_agent_completes_in_one_step(mock_llm):
    runner = AgentRunner("agent-search", test_config)
    runner.llm = mock_llm

    result = await runner.run(AgentTask(id="t1", session_id="s1", prompt="find AI news"))

    assert result.status == TaskStatus.COMPLETED
    assert result.steps_used == 1
    assert result.tokens_used == 150
    mock_llm.complete.assert_called_once()

async def test_agent_respects_token_budget(mock_llm):
    mock_llm.complete.return_value = LLMResponse(
        content="...", finish_reason="tool_calls",
        usage=Usage(prompt_tokens=900, completion_tokens=100, total_tokens=1000)
    )
    task = AgentTask(id="t1", session_id="s1", prompt="...", token_budget=500)
    runner = AgentRunner("agent-search", test_config)
    runner.llm = mock_llm

    result = await runner.run(task)
    assert result.error == "token_budget_exceeded"
Enter fullscreen mode Exit fullscreen mode

Integration Testing with a Mock LLM Server

Use a local mock LLM server (e.g., wiremock or a FastAPI stub) that returns deterministic responses for testing tool call flows end-to-end without hitting real APIs.

# tests/integration/test_tool_flow.py
async def test_search_agent_calls_web_search_tool(mock_llm_server, real_redis, real_tool_registry):
    # Configure mock LLM to respond with a tool call on first turn
    mock_llm_server.set_response(step=0, response=TOOL_CALL_RESPONSE)
    mock_llm_server.set_response(step=1, response=FINAL_RESPONSE)

    runner = AgentRunner("agent-search", integration_config)
    result = await runner.run(AgentTask(id="t1", session_id="s1", prompt="Search for AI news"))

    assert result.status == TaskStatus.COMPLETED
    assert result.tool_calls == 1
    assert real_tool_registry.was_invoked("web_search")
Enter fullscreen mode Exit fullscreen mode

Chaos Testing

Use Chaos Mesh or Litmus to test resilience:

  • Pod kill: Kill a random agent pod — verify Supervisor retries succeed
  • Network partition: Block agent→tool-registry traffic — verify circuit breaker opens
  • LLM latency injection: Add 15s delay to LLM calls — verify timeout and fallback activate
  • Kafka partition leader election: Simulate Kafka failover — verify no task loss via consumer offset management

CI/CD Pipeline for Agent Services

# .github/workflows/agent-service.yml
name: Agent Service CI/CD

on:
  push:
    paths: ["agents/agent-search/**"]

jobs:
  test:
    runs-on: ubuntu-latest
    services:
      redis:
        image: redis:7-alpine
        ports: ["6379:6379"]
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with: {python-version: "3.12"}
      - run: pip install -e ".[dev]"
      - run: pytest tests/ --cov=agent --cov-fail-under=85

  security-scan:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Trivy vulnerability scan
        uses: aquasecurity/trivy-action@master
        with: {image-ref: "agent-search:${{ github.sha }}", exit-code: "1"}

  build-push:
    needs: [test, security-scan]
    runs-on: ubuntu-latest
    steps:
      - name: Build and push (pinned by digest)
        run: |
          docker buildx build --platform linux/amd64,linux/arm64 \
            -t registry.myco.io/agent-search:${{ github.sha }} \
            --push agents/agent-search/
          # Capture digest for deployment
          DIGEST=$(docker inspect --format='{{index .RepoDigests 0}}' \
            registry.myco.io/agent-search:${{ github.sha }})
          echo "IMAGE_DIGEST=$DIGEST" >> $GITHUB_ENV

  deploy-staging:
    needs: build-push
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to staging
        run: |
          kubectl set image deployment/agent-search \
            agent=registry.myco.io/agent-search@${{ env.IMAGE_DIGEST }} \
            -n staging
          kubectl rollout status deployment/agent-search -n staging --timeout=120s

  smoke-test-staging:
    needs: deploy-staging
    steps:
      - run: python tests/smoke/run_smoke_tests.py --env staging

  deploy-production:
    needs: smoke-test-staging
    environment: production
    steps:
      - name: Rolling deploy to production
        run: |
          kubectl set image deployment/agent-search \
            agent=registry.myco.io/agent-search@${{ env.IMAGE_DIGEST }} \
            -n production
          kubectl rollout status deployment/agent-search -n production --timeout=300s
Enter fullscreen mode Exit fullscreen mode

Cost Management & Token Budgeting

Per-Agent Token Accounting

Track token usage per agent, per session, and per user to enable chargebacks and anomaly detection.

class TokenAccountant:
    async def record(self, agent_id: str, session_id: str, usage: Usage):
        # Increment per-agent daily counter
        await self.redis.incrby(f"tokens:{agent_id}:{today()}", usage.total_tokens)
        await self.redis.expire(f"tokens:{agent_id}:{today()}", 86400 * 7)

        # Increment per-session counter (for user billing)
        await self.redis.incrby(f"tokens:session:{session_id}", usage.total_tokens)

        # Write to time-series DB for cost dashboards
        await self.influx.write(
            measurement="llm_tokens",
            tags={"agent_id": agent_id, "model": usage.model},
            fields={"prompt": usage.prompt_tokens, "completion": usage.completion_tokens},
        )

async def get_estimated_cost(agent_id: str) -> float:
    tokens = int(await redis.get(f"tokens:{agent_id}:{today()}") or 0)
    # GPT-4o pricing: $2.50/1M prompt, $10/1M completion (example)
    return (tokens / 1_000_000) * 5.0 # blended estimate
Enter fullscreen mode Exit fullscreen mode

Budget Enforcement at Session Level

MAX_SESSION_TOKENS = 50_000 # hard cap per user session

async def check_session_budget(session_id: str):
    used = int(await redis.get(f"tokens:session:{session_id}") or 0)
    if used > MAX_SESSION_TOKENS:
        raise SessionBudgetExceeded(
            session_id=session_id,
            tokens_used=used,
            limit=MAX_SESSION_TOKENS
        )
Enter fullscreen mode Exit fullscreen mode

Production Readiness Checklist

Service-Level Requirements

  • Agent has /health endpoint that checks LLM client connectivity
  • Agent has /ready endpoint that checks memory store (Redis) and Tool Registry reachability
  • All tool calls are schema-validated by Tool Registry before execution
  • Agent-level RBAC enforced: agent X cannot invoke tools it is not authorized for
  • JWT verification on all inter-agent gRPC and HTTP calls
  • Secrets loaded from Kubernetes Secrets or Vault — never from env literals or ConfigMaps

Reliability Requirements

  • Context window size is bounded — no unbounded message history growth
  • Token budget enforced per task with hard ceiling
  • MAX_STEPS guard in place to prevent runaway loops
  • Exponential backoff with jitter on all LLM calls
  • Circuit breaker configured on LLM client (threshold, recovery timeout)
  • Exponential backoff on all tool calls
  • Failed tasks routed to Dead Letter Queue — not silently dropped
  • Step-level checkpointing for tasks expected to exceed 60 seconds
  • Multi-model fallback cascade configured (primary → cheaper → cross-vendor)

Observability Requirements

  • OpenTelemetry distributed tracing with trace context propagation
  • All LLM completions traced with token counts and latency
  • All tool calls traced with tool name, version, and outcome
  • Prometheus metrics exported: task count, duration, token usage, tool calls, step count
  • Alerts configured: high error rate, runaway steps, token cost spike, high latency
  • Structured logging (JSON) with task_id, session_id (hashed), trace_id — no raw prompt content

Deployment Requirements

  • Agent image pinned to digest, not mutable tag (never :latest)
  • HPA configured with appropriate metrics (queue lag and latency, not just CPU)
  • PodDisruptionBudget set (minAvailable >= 1)
  • Pod topology spread constraints configured for HA across nodes
  • Resource requests and limits set (no QoS class “BestEffort”)
  • Rolling update strategy with preStop sleep for graceful shutdown
  • Integration tests cover “tool call fails → agent recovers” path
  • Load tests simulate 10× expected peak concurrency before go-live

Cost Control Requirements

  • Token usage recorded per agent and per session
  • Session-level budget cap enforced
  • Token cost alerting configured per agent
  • DLQ monitored — no silent retry storms

Reference Architecture Diagram

Cost control Requirements

Top comments (0)