DEV Community

Cover image for A Systematic Approach to AI Agent Engineering
Shayan Holakouee
Shayan Holakouee

Posted on

A Systematic Approach to AI Agent Engineering

You've shipped microservices that handle millions of requests. You've designed event-driven systems that wake you up at 3am. Now your CTO is asking you to "add AI agents." This is that guide written for engineers who know what a race condition is and have opinions about retry logic.


The Mental Model Shift

Most AI agent frameworks are demos dressed up as infrastructure. Before touching any library, internalize this:

An AI agent is a control loop with a probabilistic step.

That's it. The "intelligence" is just one step in a workflow usually a call to an LLM that produces a decision. Everything around it: state management, tool dispatch, error handling, observability that's engineering. And that's where senior engineers have an actual edge.

while not done:
    observation = perceive(environment)
    action = llm_decide(observation, memory, tools)  # the probabilistic step
    result = execute(action)
    memory.update(result)
Enter fullscreen mode Exit fullscreen mode

Once you see it this way, your existing intuitions about fault tolerance, idempotency, and distributed systems apply directly.


Architecture Patterns That Actually Scale

1. The Supervisor Pattern (Multi-Agent Orchestration)

For complex workflows, a single agent becomes a god object. Instead, build a supervisor that routes to specialized sub-agents.

from dataclasses import dataclass
from typing import Callable, Any
import asyncio

@dataclass
class AgentTask:
    task_id: str
    task_type: str
    payload: dict
    priority: int = 5

class SupervisorAgent:
    def __init__(self):
        self._registry: dict[str, Callable] = {}
        self._queue: asyncio.PriorityQueue = asyncio.PriorityQueue()

    def register(self, task_type: str):
        """Decorator to register specialized sub-agents."""
        def decorator(fn: Callable):
            self._registry[task_type] = fn
            return fn
        return decorator

    async def dispatch(self, task: AgentTask) -> Any:
        handler = self._registry.get(task.task_type)
        if not handler:
            raise ValueError(f"No agent registered for task type: {task.task_type}")
        return await handler(task.payload)

    async def run_parallel(self, tasks: list[AgentTask]) -> list[Any]:
        """Dispatch multiple tasks concurrently with priority ordering."""
        sorted_tasks = sorted(tasks, key=lambda t: t.priority, reverse=True)
        return await asyncio.gather(*[self.dispatch(t) for t in sorted_tasks])


supervisor = SupervisorAgent()

@supervisor.register("code_review")
async def code_review_agent(payload: dict) -> dict:
    # Specialized agent with its own system prompt and tools
    ...

@supervisor.register("test_generation")
async def test_generation_agent(payload: dict) -> dict:
    ...
Enter fullscreen mode Exit fullscreen mode

The key insight: each sub-agent is stateless from the supervisor's perspective. State lives in your message store, not in the agent object.


2. Pipelines vs. Agents: Know Which One You Need

A pipeline is deterministic every input follows the same steps. An agent is non-deterministic the LLM decides what to do next. Don't use an agent when a pipeline will do.

Use a Pipeline when... Use an Agent when...
Steps are known at design time Steps depend on runtime context
Latency matters (low variance needed) Flexibility matters
You need 100% auditability Best-effort is acceptable
Input/output schema is fixed Open-ended task completion
# Pipeline: deterministic, predictable, fast
class DocumentPipeline:
    steps = [extract_text, chunk_document, embed_chunks, store_vectors]

    async def run(self, document: bytes) -> list[str]:
        result = document
        for step in self.steps:
            result = await step(result)
        return result

# Agent: flexible, non-deterministic, slower
class ResearchAgent:
    async def run(self, query: str) -> str:
        # May call web_search 0-5 times, then summarize
        # The LLM decides
        ...
Enter fullscreen mode Exit fullscreen mode

Advanced Python Patterns for AI Systems

Structured Outputs with Pydantic + Runtime Validation

Never parse LLM output with regex. Define your schema, enforce it, validate at runtime.

from pydantic import BaseModel, field_validator, model_validator
from typing import Literal
import instructor
import anthropic

class ActionPlan(BaseModel):
    reasoning: str
    action: Literal["search", "summarize", "delegate", "terminate"]
    confidence: float
    tool_args: dict | None = None

    @field_validator("confidence")
    @classmethod
    def confidence_must_be_probability(cls, v: float) -> float:
        if not 0.0 <= v <= 1.0:
            raise ValueError("Confidence must be between 0 and 1")
        return round(v, 3)

    @model_validator(mode="after")
    def tool_args_required_for_search(self) -> "ActionPlan":
        if self.action == "search" and not self.tool_args:
            raise ValueError("tool_args required when action is 'search'")
        return self

# Using instructor for structured extraction
client = instructor.from_anthropic(anthropic.Anthropic())

def get_action(observation: str, available_tools: list[str]) -> ActionPlan:
    return client.messages.create(
        model="claude-opus-4-5",
        max_tokens=1024,
        response_model=ActionPlan,
        messages=[{
            "role": "user",
            "content": f"Observation: {observation}\nAvailable tools: {available_tools}\nWhat action should we take?"
        }]
    )
Enter fullscreen mode Exit fullscreen mode

Memory Architecture: Don't Reinvent the Wheel

Production agents need layered memory. Model it explicitly:

from abc import ABC, abstractmethod
from datetime import datetime, timedelta
import hashlib

class MemoryLayer(ABC):
    @abstractmethod
    async def store(self, key: str, value: dict) -> None: ...

    @abstractmethod
    async def retrieve(self, query: str, top_k: int = 5) -> list[dict]: ...


class WorkingMemory:
    """Short-term, in-process. Cleared between runs."""
    def __init__(self, max_tokens: int = 8000):
        self._buffer: list[dict] = []
        self._max_tokens = max_tokens

    def add(self, message: dict) -> None:
        self._buffer.append(message)
        self._trim()

    def _trim(self) -> None:
        # Sliding window keep most recent context under token budget
        # In practice, count tokens properly with tiktoken or equivalent
        while len(self._buffer) > 20:
            self._buffer.pop(0)

    def snapshot(self) -> list[dict]:
        return list(self._buffer)


class EpisodicMemory(MemoryLayer):
    """Medium-term. Key episodes from past runs. Stored in vector DB."""

    async def store(self, key: str, value: dict) -> None:
        embedding = await embed(value["content"])
        await vector_db.upsert(id=key, vector=embedding, metadata=value)

    async def retrieve(self, query: str, top_k: int = 5) -> list[dict]:
        query_embedding = await embed(query)
        results = await vector_db.query(vector=query_embedding, top_k=top_k)
        return [r.metadata for r in results]


class AgentMemoryManager:
    """Orchestrates all memory layers."""
    def __init__(self):
        self.working = WorkingMemory()
        self.episodic = EpisodicMemory()

    async def get_context(self, query: str) -> list[dict]:
        working_context = self.working.snapshot()
        episodic_context = await self.episodic.retrieve(query, top_k=3)
        # Merge, deduplicate, and rank
        return self._merge(working_context, episodic_context)

    def _merge(self, working: list, episodic: list) -> list:
        seen = set()
        merged = []
        for item in working + episodic:
            content_hash = hashlib.md5(str(item).encode()).hexdigest()
            if content_hash not in seen:
                seen.add(content_hash)
                merged.append(item)
        return merged
Enter fullscreen mode Exit fullscreen mode

Tool Use: The Sharp Edge

Tool execution is where most agent systems fail in production. Every tool is an external side effect treat it like a database write.

Idempotent Tool Design

import functools
import hashlib
import json
from typing import TypeVar, Callable

T = TypeVar("T")

def idempotent_tool(cache_ttl_seconds: int = 300):
    """
    Makes tool calls idempotent by caching results keyed on input.
    Critical for retry scenarios where the LLM re-calls a tool that already succeeded.
    """
    def decorator(fn: Callable[..., T]) -> Callable[..., T]:
        _cache: dict[str, tuple[T, datetime]] = {}

        @functools.wraps(fn)
        async def wrapper(*args, **kwargs) -> T:
            cache_key = hashlib.sha256(
                json.dumps({"args": args, "kwargs": kwargs}, sort_keys=True).encode()
            ).hexdigest()

            if cache_key in _cache:
                result, cached_at = _cache[cache_key]
                if datetime.utcnow() - cached_at < timedelta(seconds=cache_ttl_seconds):
                    return result

            result = await fn(*args, **kwargs)
            _cache[cache_key] = (result, datetime.utcnow())
            return result

        return wrapper
    return decorator


@idempotent_tool(cache_ttl_seconds=60)
async def web_search(query: str, max_results: int = 5) -> list[dict]:
    """Tool that won't hammer the search API on LLM retries."""
    ...
Enter fullscreen mode Exit fullscreen mode

Tool Registry with Schema Validation

from typing import get_type_hints
import inspect

class ToolRegistry:
    def __init__(self):
        self._tools: dict[str, dict] = {}

    def register(self, description: str):
        def decorator(fn: Callable):
            hints = get_type_hints(fn)
            sig = inspect.signature(fn)

            schema = {
                "name": fn.__name__,
                "description": description,
                "input_schema": {
                    "type": "object",
                    "properties": {
                        param: {"type": self._python_type_to_json(hints.get(param))}
                        for param in sig.parameters
                        if param != "return"
                    },
                    "required": [
                        p for p, v in sig.parameters.items()
                        if v.default is inspect.Parameter.empty
                    ]
                }
            }
            self._tools[fn.__name__] = {"fn": fn, "schema": schema}
            return fn
        return decorator

    def get_schemas(self) -> list[dict]:
        return [t["schema"] for t in self._tools.values()]

    async def call(self, name: str, **kwargs) -> Any:
        if name not in self._tools:
            raise KeyError(f"Tool '{name}' not found in registry")
        return await self._tools[name]["fn"](**kwargs)

    @staticmethod
    def _python_type_to_json(t) -> str:
        mapping = {str: "string", int: "integer", float: "number", bool: "boolean"}
        return mapping.get(t, "string")
Enter fullscreen mode Exit fullscreen mode

Observability Is Not Optional

An agent you can't observe is a black box your on-call rotation will hate. Instrument everything.

import time
import uuid
from contextlib import asynccontextmanager
from dataclasses import dataclass, field

@dataclass
class AgentSpan:
    span_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    trace_id: str = ""
    parent_id: str | None = None
    name: str = ""
    start_time: float = field(default_factory=time.time)
    end_time: float | None = None
    attributes: dict = field(default_factory=dict)
    events: list[dict] = field(default_factory=list)
    status: str = "ok"

    def add_event(self, name: str, attrs: dict = None):
        self.events.append({
            "name": name,
            "timestamp": time.time(),
            "attributes": attrs or {}
        })

    def finish(self, status: str = "ok"):
        self.end_time = time.time()
        self.status = status
        return self

class AgentTracer:
    def __init__(self, trace_id: str | None = None):
        self.trace_id = trace_id or str(uuid.uuid4())
        self._spans: list[AgentSpan] = []
        self._current_span: AgentSpan | None = None

    @asynccontextmanager
    async def span(self, name: str, **attrs):
        s = AgentSpan(
            trace_id=self.trace_id,
            parent_id=self._current_span.span_id if self._current_span else None,
            name=name,
            attributes=attrs
        )
        prev = self._current_span
        self._current_span = s
        try:
            yield s
            s.finish("ok")
        except Exception as e:
            s.add_event("exception", {"message": str(e), "type": type(e).__name__})
            s.finish("error")
            raise
        finally:
            self._spans.append(s)
            self._current_span = prev

    def export(self) -> list[dict]:
        return [
            {
                "trace_id": s.trace_id,
                "span_id": s.span_id,
                "parent_id": s.parent_id,
                "name": s.name,
                "duration_ms": round((s.end_time - s.start_time) * 1000, 2) if s.end_time else None,
                "status": s.status,
                "attributes": s.attributes,
                "events": s.events,
            }
            for s in self._spans
        ]


# Usage inside your agent loop
async def agent_step(observation: str, tracer: AgentTracer) -> dict:
    async with tracer.span("llm_call", model="claude-opus-4-5", input_tokens=len(observation)):
        action = await call_llm(observation)

    async with tracer.span("tool_execution", tool=action.tool_name):
        result = await tool_registry.call(action.tool_name, **action.args)

    return result
Enter fullscreen mode Exit fullscreen mode

Feed spans into your existing observability stack (Datadog, Grafana, Honeycomb). AI agents are distributed systems trace them like one.


Failure Modes That Will Bite You

1. The Infinite Loop

LLMs can get stuck in thought loops calling the same tool repeatedly, or alternating between two actions.

from collections import Counter

class LoopDetector:
    def __init__(self, window: int = 10, threshold: int = 3):
        self._window = window
        self._threshold = threshold
        self._action_history: list[str] = []

    def check(self, action: str) -> bool:
        """Returns True if a loop is detected."""
        self._action_history.append(action)
        if len(self._action_history) > self._window:
            self._action_history.pop(0)

        counts = Counter(self._action_history)
        return any(count >= self._threshold for count in counts.values())
Enter fullscreen mode Exit fullscreen mode

2. Context Window Overflow

For long-running agents, the context window fills up. Build compaction in from the start.

async def compact_context(messages: list[dict], llm_client) -> list[dict]:
    """Summarize older messages to free context space."""
    if len(messages) < 20:
        return messages

    old_messages = messages[:-10]
    recent_messages = messages[-10:]

    summary_response = await llm_client.messages.create(
        model="claude-haiku-4-5",  # Use cheaper model for summarization
        max_tokens=512,
        messages=[
            *old_messages,
            {"role": "user", "content": "Summarize the key facts and decisions from this conversation in under 200 words."}
        ]
    )

    summary_message = {
        "role": "assistant",
        "content": f"[Context Summary]: {summary_response.content[0].text}"
    }

    return [summary_message] + recent_messages
Enter fullscreen mode Exit fullscreen mode

3. Prompt Injection in Tool Outputs

External data (web pages, user files, database rows) can contain adversarial instructions. Sanitize before feeding back to the LLM.

import re

INJECTION_PATTERNS = [
    r"ignore\s+previous\s+instructions",
    r"new\s+system\s+prompt",
    r"you\s+are\s+now\s+",
    r"disregard\s+all",
]

def sanitize_tool_output(raw_output: str) -> str:
    """Basic prompt injection mitigation for tool outputs."""
    sanitized = raw_output
    for pattern in INJECTION_PATTERNS:
        sanitized = re.sub(pattern, "[REDACTED]", sanitized, flags=re.IGNORECASE)

    # Wrap in a context boundary
    return f"<tool_result>\n{sanitized}\n</tool_result>"
Enter fullscreen mode Exit fullscreen mode

Putting It Together: A Production Agent Loop

async def run_agent(
    task: str,
    max_iterations: int = 20,
    tracer: AgentTracer | None = None,
) -> str:
    tracer = tracer or AgentTracer()
    memory = AgentMemoryManager()
    loop_detector = LoopDetector()

    memory.working.add({"role": "user", "content": task})

    for iteration in range(max_iterations):
        async with tracer.span("agent_iteration", iteration=iteration):

            # Build context
            context = await memory.get_context(task)
            if len(context) > 15:
                context = await compact_context(context, llm_client)

            # Get action from LLM
            async with tracer.span("llm_decision"):
                action = get_action(
                    observation=json.dumps(context[-5:]),
                    available_tools=tool_registry.get_schemas()
                )

            # Loop detection
            if loop_detector.check(action.action):
                tracer._current_span.add_event("loop_detected")
                return "Agent halted: repetitive action loop detected."

            # Terminal condition
            if action.action == "terminate":
                async with tracer.span("final_answer"):
                    return await generate_final_answer(context)

            # Execute tool
            async with tracer.span("tool_call", tool=action.action, confidence=action.confidence):
                tool_result = await tool_registry.call(action.action, **(action.tool_args or {}))
                sanitized_result = sanitize_tool_output(str(tool_result))

            # Update memory
            memory.working.add({"role": "assistant", "content": str(action)})
            memory.working.add({"role": "tool", "content": sanitized_result})

    return "Agent halted: max iterations reached."
Enter fullscreen mode Exit fullscreen mode

Framework Landscape (2025 State of Play)

Don't build everything from scratch. Know when to reach for existing tools:

Framework Best For Watch Out For
LangGraph Stateful, graph-based workflows Steep learning curve; abstraction leaks
CrewAI Multi-agent role-playing Can be too "magic" for complex logic
Pydantic AI Type-safe, structured agent outputs Younger ecosystem
Temporal + custom Long-running, durable workflows Infrastructure overhead
Raw API + your code Full control, complex routing You own the bugs

The right answer for most teams: use LangGraph for the graph/state machine, instructor for structured outputs, Pydantic for validation, and write tool execution yourself.


Closing Thoughts

AI agents are not magic. They're non-deterministic state machines wrapped around probabilistic function calls and they demand the same rigor as any other distributed system.

Your unfair advantage as a senior engineer isn't knowing the hottest framework. It's knowing that:

  • Idempotency matters. Tools get retried.
  • Observability is load-bearing. You will debug this at midnight.
  • Failure modes compound. Design for them explicitly, not as an afterthought.
  • Simpler is almost always better. A pipeline beats an agent when you know the steps.

The teams shipping reliable AI systems aren't the ones with the most sophisticated prompts. They're the ones who treated AI components like infrastructure with circuit breakers, retries, schema validation, and on-call runbooks.

Build accordingly.


Found a bug or a better pattern? Drop a comment. This is production code, not a tutorial corrections welcome.

If this was useful, follow for more on AI systems engineering, Python architecture, and the sharp edges that don't make it into the demos.

Top comments (0)