DEV Community

Alexsandro Souza
Alexsandro Souza

Posted on

Building a Production-Ready AI Agent Harness

Building an AI agent is 10% "The Brain" and 90% "The Plumbing." AI can write your agent in an afternoon. What it can't do is tell you what production looks like.

It won't ask about authentication, rate limiting, or what happens when your LLM hallucinates a credit card number into a response. It won't add guardrails that block prompt injection before the message reaches the model, or redact PII from the output before it reaches the user. It won't wire up tracing so you can replay exactly what the agent did six calls deep, persist conversation state across restarts, or set up the metrics dashboard that tells you why latency spiked at 3 AM. It won't build the eval pipeline that catches quality regressions before your users do, handle context overflow when a tool dumps 80,000 characters into the conversation, or add retry logic with exponential backoff so a single upstream timeout doesn't crash the entire session. That part is still on you.

And yet, most LangGraph and LangChain tutorials stop right before any of this matters -- a working agent in a Jupyter notebook that answers questions, uses tools, and streams tokens. The gap between that demo and a service you'd actually deploy is enormous, and every team fills it by reinventing the same infrastructure from scratch.

An agent harness is the answer to this problem. Think of it like a test harness, but for production concerns instead of assertions. It is the infrastructure shell that wraps around your agent -- handling authentication, guardrails, memory, state persistence, observability, and deployment -- so the agent itself only contains the logic that makes it unique. You write the brain; the harness provides the skeleton, the nervous system, and the immune system.

This matters because AI agents are not stateless functions. They hold conversations across sessions, remember user preferences, call unpredictable external tools, and produce outputs that need safety checks before reaching the user. Without a harness, every one of these concerns leaks into your agent code, turning a clean graph into a tangled mess of infrastructure that is impossible to reuse across agents.

This article walks through the architecture of such a harness -- built with LangGraph, FastAPI, Langfuse, PostgreSQL with pgvector, MCP and AI Skills. The full implementation is open source. You define the agent logic, the harness provides everything else.


1. Project Architecture

The codebase separates agent logic from infrastructure through a clean directory structure:

src/
├── app/
│   ├── agents/              # Self-contained agent directories
│   │   ├── chatbot/         # Reference agent implementation
│   │   ├── open_deep_research/  # Multi-agent research workflow
│   │   ├── text_to_sql/     # Text-to-SQL agent
│   │   └── tools/           # Shared tools (search, think)
│   ├── api/
│   │   ├── v1/              # Versioned API routes
│   │   ├── metrics/         # Prometheus middleware
│   │   ├── security/        # Auth and rate limiting
│   │   └── logging_context.py
│   └── core/                # Shared infrastructure
│       ├── guardrails/      # Input/output safety
│       ├── context/         # LLM context overflow prevention
│       ├── memory/          # Long-term memory (mem0)
│       ├── checkpoint/      # State persistence
│       ├── mcp/             # Model Context Protocol
│       ├── metrics/         # Prometheus definitions
│       ├── llm/             # LLM utilities
│       ├── db/              # Database connections
│       └── common/          # Config, logging, models
├── evals/                   # Evaluation framework
├── mcp/                     # Sample MCP server
└── cli/                     # CLI clients
Enter fullscreen mode Exit fullscreen mode

The key design decision: agents are self-contained directories under src/app/agents/. Each agent defines its own graph, system prompt, and tools. Everything else -- auth, memory, checkpointing, guardrails, metrics -- is injected by the harness through src/app/core/.

The harness ships with three reference agents, each demonstrating a different architecture approach:

Agent Architecture Pattern
Chatbot Custom graph workflow Linear pipeline with tool loop and guardrail nodes
Deep Research Multi-agent supervisor/researcher Nested subgraphs with parallel delegation
Text-to-SQL ReAct (via Deep Agents) Autonomous reasoning-action loop with skills and memory files

2. Agent Architecture Approaches

The harness does not prescribe a single agent pattern. Different tasks call for different architectures. The three included agents demonstrate the spectrum from simple to complex.

Approach 1: Custom Graph Workflow (Chatbot)

The chatbot agent uses a hand-crafted LangGraph StateGraph where every node and edge is explicitly defined. This gives full control over the execution flow.

The graph is built in _create_graph():

async def _create_graph(self) -> StateGraph:
    input_guardrail = create_input_guardrail_node(next_node="chat")
    output_guardrail = create_output_guardrail_node()

    graph_builder = StateGraph(GraphState)
    graph_builder.add_node("input_guardrail", input_guardrail, ends=["chat", END])
    graph_builder.add_node("chat", self._chat_node, ends=["tool_call", "output_guardrail"])
    graph_builder.add_node("tool_call", self._tool_call_node, ends=["chat"])
    graph_builder.add_node("output_guardrail", output_guardrail)
    graph_builder.set_entry_point("input_guardrail")
    graph_builder.add_edge("output_guardrail", END)
    return graph_builder
Enter fullscreen mode Exit fullscreen mode

Each node returns a Command that controls routing. The chat node decides whether to route to tools or the output guardrail based on the LLM response:

async def _chat_node(self, state: GraphState, config: RunnableConfig) -> Command:
    system_prompt = load_system_prompt(long_term_memory=state.long_term_memory)
    messages = prepare_messages(state.messages, chatbot_model, system_prompt)

    model = chatbot_model.bind_tools(self._get_all_tools()).with_retry(stop_after_attempt=3)

    response_message = await model_invoke_with_metrics(
        model, dump_messages(messages), settings.DEFAULT_LLM_MODEL, self.name, config
    )
    response_message = process_llm_response(response_message)

    goto = "tool_call" if response_message.tool_calls else "output_guardrail"
    return Command(update={"messages": [response_message]}, goto=goto)
Enter fullscreen mode Exit fullscreen mode

The state schema is minimal -- LangGraph's add_messages reducer handles message accumulation:

class GraphState(MessagesState):
    long_term_memory: str
Enter fullscreen mode Exit fullscreen mode

System prompts are markdown templates with placeholders that get filled at invocation time:

# Name: {agent_name}
# Role: A friendly and professional assistant

## What you know about the user
{long_term_memory}

## Current date and time
{current_date_and_time}
Enter fullscreen mode Exit fullscreen mode

When to use this approach: General-purpose assistants, chatbots, and any agent where you want explicit control over every routing decision. The graph is easy to visualize, debug, and extend with new nodes.

Approach 2: Multi-Agent Supervisor/Researcher (Deep Research)

The deep research agent uses nested subgraphs -- a supervisor that plans research, then delegates to multiple researcher subagents running in parallel. Each subagent is itself a compiled LangGraph with its own ReAct-style tool loop.

The main graph composes these subgraphs as nodes:

def _build_deep_research_graph(self) -> StateGraph:
    deep_researcher_builder = StateGraph(AgentState, input=AgentInputState)

    deep_researcher_builder.add_node("input_guardrail", input_guardrail, ends=["clarify_with_user", END])
    deep_researcher_builder.add_node("clarify_with_user", clarify_with_user)
    deep_researcher_builder.add_node("write_research_brief", write_research_brief)
    deep_researcher_builder.add_node("research_supervisor", self.supervisor_subagent.get_graph())
    deep_researcher_builder.add_node("final_report_generation", final_report_generation)
    deep_researcher_builder.add_node("output_guardrail", output_guardrail)

    deep_researcher_builder.add_edge("research_supervisor", "final_report_generation")
    deep_researcher_builder.add_edge("final_report_generation", "output_guardrail")
    deep_researcher_builder.add_edge("output_guardrail", END)
    return deep_researcher_builder
Enter fullscreen mode Exit fullscreen mode

The supervisor delegates research tasks to sub-researchers in parallel using asyncio.gather:

research_tasks = [
    self._researcher_agent.agent_invoke({
        "researcher_messages": [HumanMessage(content=tool_call["args"]["research_topic"])],
        "research_topic": tool_call["args"]["research_topic"]
    }, "session_id_placeholder", 1)
    for tool_call in allowed_conduct_research_calls
]
tool_results = await asyncio.gather(*research_tasks)
Enter fullscreen mode Exit fullscreen mode

Each researcher runs its own ReAct loop -- calling search tools, reflecting with think_tool, and iterating up to MAX_REACT_TOOL_CALLS before compressing findings into a summary.

When to use this approach: Complex research tasks, multi-step analysis, or any workflow that benefits from decomposition into parallel sub-tasks. The subgraph composition lets you scale horizontally by adding more specialized agents.

Approach 3: ReAct with Deep Agents (Text-to-SQL)

The text-to-SQL agent takes a fundamentally different approach: instead of hand-crafting a graph, it uses the Deep Agents framework (deepagents library) to create an autonomous ReAct agent with persistent skills and memory files.

def create_sql_deep_agent():
    db = SQLDatabase.from_uri(f"sqlite:///{db_path}", sample_rows_in_table_info=3)
    model = ChatOpenAI(model="gpt-5-mini", reasoning={"effort": "medium"}, temperature=0)

    toolkit = SQLDatabaseToolkit(db=db, llm=model)
    sql_tools = toolkit.get_tools()

    agent = create_deep_agent(
        model=model,
        memory=["./AGENTS.md"],
        skills=["./skills/"],
        tools=sql_tools,
        subagents=[],
        backend=FilesystemBackend(root_dir=base_dir),
    )
    return agent
Enter fullscreen mode Exit fullscreen mode

The agent is configured through markdown files rather than code:

  • AGENTS.md -- Agent identity, role definition, safety rules (read-only SQL access), and planning strategies
  • skills/ -- Specialized workflows defined as markdown skill files (schema-exploration, query-writing)
  • FilesystemBackend -- Persistent storage for intermediate results and planning artifacts

Skills: Extending Agents with Natural Language

Skills are emerging as a modern pattern for extending agent capabilities -- and for good reason. Instead of writing new Python functions or graph nodes, you define a skill as a markdown file that describes when to activate and how to execute a workflow. The agent reads skill descriptions in its context and loads the full instructions on demand, only when the task requires it.

The text-to-SQL agent ships with two skills. The schema-exploration skill teaches the agent how to discover database structure:

---
name: schema-exploration
description: For discovering and understanding database structure, tables, columns, and relationships
---

## When to Use This Skill
Use this skill when you need to:
- Understand the database structure
- Find which tables contain certain types of data
- Discover column names and data types

## Workflow
### 1. List All Tables
Use `sql_db_list_tables` tool to see all available tables in the database.

### 2. Get Schema for Specific Tables
Use `sql_db_schema` tool with table names to examine columns, data types, and relationships.

### 3. Map Relationships
Identify how tables connect via foreign keys.
Enter fullscreen mode Exit fullscreen mode

The query-writing skill teaches the agent a structured approach to SQL generation -- planning with write_todos for complex queries, validating JOIN conditions, and enforcing safety rules like LIMIT clauses and read-only access.

This is progressive disclosure applied to agent design. The agent always sees the skill names and descriptions, but it only loads the full multi-page instructions when it decides a skill is relevant. This keeps the context window lean for simple questions while providing deep, step-by-step expertise for complex ones. Adding a new capability is as simple as dropping a new .md file into the skills/ directory -- no code changes, no redeployment, no graph rewiring.

The wrapper class applies the same harness guardrails (content filter, PII blocking, output safety check) before and after the ReAct loop:

class TextSQLDeepAgent:
    async def agent_invoke(self, agent_input, session_id, user_id=None):
        query = agent_input.get("query", "")

        filter_result = check_content_filter(query)
        if filter_result.is_blocked:
            return [Message(role="assistant", content="I cannot process this request.")]

        pii_findings = detect_pii(query, pii_types=[PIIType.API_KEY, PIIType.SSN, PIIType.CREDIT_CARD])
        if pii_findings:
            return [Message(role="assistant", content="Your message contains sensitive information.")]

        response = await self.agent.ainvoke({"messages": [{"role": "user", "content": query}]}, config=config)
        messages = process_messages(response["messages"])
        await self.process_safe_output(messages)
        return messages
Enter fullscreen mode Exit fullscreen mode

When to use this approach: Domain-specific agents where the reasoning strategy is better expressed in natural language than graph code. The ReAct pattern lets the agent decide its own execution order -- explore schema, write queries, fix errors, and iterate -- without you encoding every possible path.

Architecture Comparison

The three approaches represent different trade-offs:

Dimension Custom Graph Multi-Agent Subgraphs ReAct (Deep Agents)
Control Full -- every edge is explicit Hierarchical -- control delegation boundaries Minimal -- agent decides its own path
Debuggability High -- deterministic flow Medium -- subgraph boundaries are clear Lower -- emergent behavior
Parallelism Manual Built-in via supervisor delegation Sequential by default
Flexibility Add/remove nodes in code Compose new subgraph combinations Change behavior via markdown files
Best for Chat, simple workflows Research, analysis, decomposable tasks Domain-specific tool use (SQL, APIs)

All three share the same harness infrastructure: guardrails, Langfuse tracing, Prometheus metrics, structured logging, and authentication. The architecture choice only affects what lives inside src/app/agents/<your_agent>/.


3. Guardrails: Input and Output Safety

The harness provides guardrails as factory functions that return LangGraph-compatible node functions. This lets each agent customize its safety checks without reimplementing the pattern.

Input Guardrails

Input guardrails run deterministic checks before the message reaches the LLM:

def create_input_guardrail_node(
    next_node: str,
    banned_keywords: list[str] | None = None,
    pii_check_enabled: bool = True,
    prompt_injection_check: bool = True,
    block_pii_types: list[PIIType] | None = None,
) -> Callable:
Enter fullscreen mode Exit fullscreen mode

Two layers of validation run in sequence:

1. Content filter -- Banned keyword matching and prompt injection detection using regex patterns:

PROMPT_INJECTION_PATTERNS: list[str] = [
    r"ignore\s+(all\s+)?(previous|prior|above)\s+(instructions|prompts|rules)",
    r"disregard\s+(all\s+)?(previous|prior|above)\s+(instructions|prompts|rules)",
    r"you\s+are\s+now\s+(?:a|an|in)\s+(?:jailbreak|unrestricted|evil|DAN)",
    r"override\s+(?:your|all|system)\s+(?:instructions|rules|constraints)",
    r"reveal\s+(?:your|the|system)\s+(?:prompt|instructions|rules)",
]
Enter fullscreen mode Exit fullscreen mode

2. PII detection -- Regex-based scanning for sensitive data types (SSN, API keys, credit cards) with Luhn validation for credit card numbers:

class PIIType(str, Enum):
    EMAIL = "email"
    CREDIT_CARD = "credit_card"
    IP = "ip"
    API_KEY = "api_key"
    PHONE = "phone"
    SSN = "ssn"
Enter fullscreen mode Exit fullscreen mode

When input is blocked, the guardrail routes directly to END with a rejection message:

if filter_result.is_blocked:
    return Command(
        update={"messages": [AIMessage(content=BLOCKED_INPUT_MESSAGE)]},
        goto=END,
    )
Enter fullscreen mode Exit fullscreen mode

Output Guardrails

Output guardrails apply two checks to the agent's response before it reaches the user:

1. Deterministic PII redaction -- Scans the output and replaces detected PII with [REDACTED_EMAIL], [REDACTED_SSN], etc. Supports multiple strategies: redact, mask, hash, or block.

2. LLM-based safety evaluation -- Uses a fast, low-cost model to classify the response as SAFE or UNSAFE. Unsafe responses are replaced with a standard message:

async def evaluate_safety(content: str) -> bool:
    model = _get_safety_model()
    prompt = SAFETY_EVALUATION_PROMPT.format(response=content[:2000])
    result = await model.ainvoke([{"role": "user", "content": prompt}])
    verdict = result.content.strip().upper()
    return "UNSAFE" not in verdict
Enter fullscreen mode Exit fullscreen mode

The output guardrail is fail-open: if the safety check itself errors, the response passes through. This prevents the safety system from becoming a point of failure.


4. Long-Term Memory: Semantic Memory with mem0 and pgvector

The harness provides per-user semantic memory backed by pgvector. Before every agent invocation, relevant memories are retrieved by semantic similarity. After invocation, new memories are extracted from the conversation.

async def get_relevant_memory(user_id: int, query: str) -> str:
    memory = await get_memory_instance()
    results = await memory.search(user_id=str(user_id), query=query)
    return "\n".join([f"* {result['memory']}" for result in results["results"]])
Enter fullscreen mode Exit fullscreen mode

Memory updates happen in the background to avoid blocking the response:

def bg_update_memory(user_id: int, messages: list[dict], metadata: dict = None) -> None:
    asyncio.create_task(update_memory(user_id, messages, metadata))
Enter fullscreen mode Exit fullscreen mode

The agent orchestrates this in its agent_invoke method -- retrieve first, update after:

async def agent_invoke(self, messages, session_id, user_id=None):
    relevant_memory = (await get_relevant_memory(user_id, messages[-1].content)) or "No relevant memory found."
    agent_input = {"messages": dump_messages(messages), "long_term_memory": relevant_memory}

    response = await self._graph.ainvoke(input=agent_input, config=config)
    # ... process response ...

    bg_update_memory(user_id, messages_dic, {"session_id": session_id, "agent_name": self.name})
    return result
Enter fullscreen mode Exit fullscreen mode

The memory singleton connects to pgvector using the same PostgreSQL instance as the rest of the application, configured through the shared Settings class.


5. Context Management: Preventing LLM Context Overflow

Long-running conversations and large tool results can push the message history past the model's context window. The harness addresses this with a two-layer strategy in src/app/core/context/: evict oversized tool results immediately, and summarize the conversation history before it overflows.

Layer 1: Tool Result Eviction

When a tool returns a result exceeding ~80,000 characters (~20K tokens), the full output is written to disk and the in-state message is replaced with a compact head/tail preview:

MAX_TOOL_RESULT_CHARS = 80_000
PREVIEW_LINES = 5
MAX_LINE_CHARS = 1_000

def truncate_tool_call_if_too_long(tool_message: ToolMessage) -> ToolMessage:
    content = tool_message.content
    if not isinstance(content, str) or len(content) <= MAX_TOOL_RESULT_CHARS:
        return tool_message

    file_path = _write_large_result(tool_message.tool_call_id, content)
    preview = _build_preview(content)

    evicted_content = (
        f"[Tool result too large ({len(content):,} chars). "
        f"Full output saved to: {file_path}]\n\n"
        f"--- Preview (first {PREVIEW_LINES} lines) ---\n"
        f"{preview['head']}\n...\n"
        f"--- Preview (last {PREVIEW_LINES} lines) ---\n"
        f"{preview['tail']}\n\n"
        f'Use read_file("{file_path}") to access the full content.'
    )

    return ToolMessage(content=evicted_content, name=tool_message.name, tool_call_id=tool_message.tool_call_id)
Enter fullscreen mode Exit fullscreen mode

The preview gives the LLM enough context to decide whether it needs the full output. If it does, it can call read_file on the saved path to retrieve the content in chunks. This wrapping is applied at every tool-result creation site -- built-in tools, MCP tools, and parallel tool execution:

# In the tool_call node
outputs.append(truncate_tool_call_if_too_long(
    ToolMessage(content=tool_result, name=tool_name, tool_call_id=tool_call["id"])
))
Enter fullscreen mode Exit fullscreen mode

Layer 2: Conversation Summarization

Even with tool results evicted, conversations accumulate context over many turns. The summarize_if_too_long function runs before every LLM call and triggers when the conversation reaches 85% of the model's context window:

async def _chat_node(self, state: GraphState, config: RunnableConfig) -> Command:
    condensed_messages = await summarize_if_too_long(
        messages=state.messages,
        model_name=f"openai:{settings.DEFAULT_LLM_MODEL}",
        llm=chatbot_model,
        session_id=config["configurable"]["thread_id"],
    )

    system_prompt = load_system_prompt(long_term_memory=state.long_term_memory)
    messages = prepare_messages(condensed_messages, chatbot_model, system_prompt)
    # ... LLM call ...
Enter fullscreen mode Exit fullscreen mode

Token counting uses count_tokens_approximately from langchain-core for the threshold check. The cheaper character-based heuristic (NUM_CHARS_PER_TOKEN = 4) is used for the split-point search and argument truncation to avoid repeated full counts.

The function is a no-op when the context is within budget. When it triggers, it proceeds in two stages:

Stage 1 -- Lightweight truncation (no LLM call). Tool-call arguments in older messages -- like file contents passed to write_file -- are truncated to 2K characters. If this alone brings the count below the threshold, no summarization is needed:

def _truncate_tool_call_args(messages: list[BaseMessage]) -> list[BaseMessage]:
    result = []
    for msg in messages:
        if isinstance(msg, AIMessage) and msg.tool_calls:
            truncated_calls = []
            for tc in msg.tool_calls:
                new_args = {}
                for key, value in tc.get("args", {}).items():
                    if isinstance(value, str) and len(value) > TOOL_ARG_TRUNCATE_CHARS:
                        new_args[key] = value[:TOOL_ARG_TRUNCATE_CHARS] + f"\n... [truncated, {len(value):,} chars total]"
                    else:
                        new_args[key] = value
                truncated_calls.append({**tc, "args": new_args})
            result.append(msg.model_copy(update={"tool_calls": truncated_calls}))
        else:
            result.append(msg)
    return result
Enter fullscreen mode Exit fullscreen mode

Stage 2 -- Full summarization. If truncation was not enough, the conversation is split into "old" and "recent" segments. The split snaps to a HumanMessage boundary so tool-call chains are never broken. The old segment is written in full to a markdown file, and an LLM generates a concise summary that replaces it:

file_path = _write_messages_to_markdown(old_messages, session_id)
summary_text = await _generate_summary(truncated_old, llm)

summary_message = HumanMessage(
    content=(
        f"[Summary of earlier conversation ({len(old_messages)} messages). "
        f"Full transcript: {file_path}]\n\n{summary_text}"
    )
)

return [summary_message] + recent_messages
Enter fullscreen mode Exit fullscreen mode

The split-point algorithm walks backward from the end of the message list, accumulating characters until it fills ~30% of the model's context window for the "recent" portion. It then snaps forward to the next HumanMessage to find a clean boundary:

def _find_safe_split_index(messages, token_limit):
    recent_chars_budget = int(token_limit * RECENT_CONTEXT_RATIO * NUM_CHARS_PER_TOKEN)
    chars_from_end = 0

    for i in range(len(messages) - 1, -1, -1):
        chars_from_end += _estimate_message_chars(messages[i])
        if chars_from_end >= recent_chars_budget:
            raw_split = i + 1
            break

    # Snap to nearest HumanMessage boundary
    for j in range(raw_split, len(messages)):
        if isinstance(messages[j], HumanMessage):
            return j
    return 0
Enter fullscreen mode Exit fullscreen mode

If the summarization LLM call itself fails, the function falls back to a plain message-role listing so the conversation can continue without crashing. The full old messages are always preserved in the markdown file regardless of whether the LLM summary succeeds.


6. State Persistence: Conversation Checkpointing

LangGraph's AsyncPostgresSaver automatically persists the full graph state after every node execution. This means conversations survive server restarts and can be resumed from exactly where they left off.

async def get_checkpointer():
    connection_pool = await get_connection_pool()
    if connection_pool:
        checkpointer = AsyncPostgresSaver(connection_pool)
        await checkpointer.setup()
    else:
        checkpointer = None
        if settings.ENVIRONMENT != Environment.PRODUCTION:
            raise Exception("Connection pool initialization failed")
    return checkpointer
Enter fullscreen mode Exit fullscreen mode

The checkpointer is compiled into the graph once and reused:

self._graph = graph_builder.compile(checkpointer=self.checkpointer, name=self.name)
Enter fullscreen mode Exit fullscreen mode

Every invocation passes a thread_id (the session ID) so LangGraph knows which conversation to resume:

config["configurable"] = {"thread_id": session_id}
Enter fullscreen mode Exit fullscreen mode

When a user deletes a session, the harness cleans up checkpoint tables:

async def clear_checkpoints(session_id: str) -> None:
    async with conn_pool.connection() as conn:
        for table in settings.CHECKPOINT_TABLES:
            await conn.execute(f"DELETE FROM {table} WHERE thread_id = %s", (session_id,))
Enter fullscreen mode Exit fullscreen mode

7. Authentication and Session Management

The harness uses JWT-based authentication with a session-per-conversation model. Users register, log in to get a user-scoped token, then create sessions -- each session represents a separate conversation thread.

Protected endpoints use FastAPI dependency injection:

@router.post("/chat", response_model=ChatResponse)
@limiter.limit(settings.RATE_LIMIT_ENDPOINTS["chat"][0])
async def chat(
    request: Request,
    chat_request: ChatRequest,
    session: Session = Depends(get_current_session),
):
Enter fullscreen mode Exit fullscreen mode

Rate limiting is configured per-endpoint through environment variables:

default_endpoints = {
    "chat": ["30 per minute"],
    "chat_stream": ["20 per minute"],
    "deep_research": ["10 per minute"],
    "register": ["10 per hour"],
    "login": ["20 per minute"],
}
Enter fullscreen mode Exit fullscreen mode

All user input passes through sanitization that strips HTML, removes <script> tags, and filters null bytes -- applied at both the API and validation layers.


8. Observability: Tracing, Metrics, and Structured Logging

Production observability requires three complementary systems. The harness integrates all three.

Langfuse Tracing

Every LLM call is traced through Langfuse via a callback handler injected into the agent config:

self._config = {
    "callbacks": [langfuse_callback_handler],
    "metadata": {
        "environment": settings.ENVIRONMENT.value,
        "debug": settings.DEBUG,
    },
}
Enter fullscreen mode Exit fullscreen mode

This captures the full chain of LLM calls, tool invocations, token usage, and latency for every conversation turn -- without modifying agent code.

Langfuse tracing view showing the full call hierarchy, tool invocations, and graph visualization for an agent execution

Langfuse dashboard showing traces, model costs, usage by model, and user consumption

Prometheus Metrics

Custom metrics track both infrastructure and LLM-specific performance:

llm_inference_duration_seconds = Histogram(
    "llm_inference_duration_seconds",
    "Time spent processing LLM inference",
    ["model", "agent_name"],
    buckets=[0.1, 0.3, 0.5, 1.0, 2.0, 5.0]
)

tool_executions_total = Counter(
    "tool_executions_total",
    "Total tool executions",
    ["tool_name", "status"]
)

tokens_in_counter = Counter("llm_tokens_in", "Number of input tokens", ["agent_name"])
tokens_out_counter = Counter("llm_tokens_out", "Number of output tokens", ["agent_name"])
Enter fullscreen mode Exit fullscreen mode

Every LLM call flows through model_invoke_with_metrics(), which times the inference and records token usage:

async def model_invoke_with_metrics(model, model_input, model_name, agent_name, config=None):
    with llm_inference_duration_seconds.labels(model=model_name, agent_name=agent_name).time():
        response = await model.ainvoke(model_input, config)
    record_token_usage(response, model_name, agent_name)
    return response
Enter fullscreen mode Exit fullscreen mode

HTTP metrics are captured automatically by MetricsMiddleware, tracking request duration, counts, and status codes by endpoint.

Grafana LLM Observability dashboard showing token usage, error rates, and tool execution metrics

Structured Logging

All logging uses structlog with strict conventions: lowercase_underscore event names and kwargs instead of f-strings, so logs are filterable and machine-parseable.

logger.info("chat_request_received", session_id=session.id, message_count=len(messages))
Enter fullscreen mode Exit fullscreen mode

A LoggingContextMiddleware extracts session_id and user_id from the JWT on every request and binds them to the logging context:

class LoggingContextMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        clear_context()
        # Extract session_id from JWT
        payload = jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
        session_id = payload.get("sub")
        if session_id:
            bind_context(session_id=session_id)
        response = await call_next(request)
        clear_context()
        return response
Enter fullscreen mode Exit fullscreen mode

The logging format switches automatically based on environment: colored console output in development, structured JSON in production.


9. The API Layer: FastAPI Endpoints and Streaming

The API provides both synchronous and streaming (SSE) endpoints for chat. The streaming endpoint uses FastAPI's StreamingResponse with an async generator:

@router.post("/chat/stream")
@limiter.limit(settings.RATE_LIMIT_ENDPOINTS["chat_stream"][0])
async def chat_stream(request, chat_request, session=Depends(get_current_session)):
    agent = await get_agent_example()

    async def event_generator():
        full_response = ""
        with llm_stream_duration_seconds.labels(model=settings.DEFAULT_LLM_MODEL, agent_name=agent.name).time():
            async for chunk in agent.agent_invoke_stream(
                chat_request.messages, session.id, user_id=session.user_id
            ):
                full_response += chunk
                response = StreamResponse(content=chunk, done=False)
                yield f"data: {json.dumps(response.model_dump())}\n\n"

        final_response = StreamResponse(content="", done=True)
        yield f"data: {json.dumps(final_response.model_dump())}\n\n"

    return StreamingResponse(event_generator(), media_type="text/event-stream")
Enter fullscreen mode Exit fullscreen mode

The application wires everything together at startup using FastAPI's lifespan pattern:

@asynccontextmanager
async def lifespan(app: FastAPI):
    logger.info("application_startup", project_name=settings.PROJECT_NAME, version=settings.VERSION)
    await mcp_dependencies_init()
    yield
    await mcp_dependencies_cleanup()
    logger.info("application_shutdown")

app = FastAPI(title=settings.PROJECT_NAME, version=settings.VERSION, lifespan=lifespan)
setup_metrics(app)
setup_rate_limit(app)
app.add_middleware(LoggingContextMiddleware)
app.add_middleware(MetricsMiddleware)
Enter fullscreen mode Exit fullscreen mode

10. MCP Integration: Model Context Protocol

The harness supports MCP (Model Context Protocol) for dynamically loading tools from external servers. MCP sessions are initialized at application startup and persist for the application lifetime.

class MCPSessionManager:
    async def initialize(self) -> Resource:
        self._exit_stack = AsyncExitStack()
        await self._exit_stack.__aenter__()

        tools, sessions = [], []
        for hostname in settings.MCP_HOSTNAMES:
            session = await self._exit_stack.enter_async_context(
                mcp_sse_client(hostname, correlation_id=generate_correlation_id())
            )
            session_tools = await load_mcp_tools(session)
            tools.extend(session_tools)
            sessions.append(session)

        self._resource = Resource(tools=tools, sessions=sessions)
        return self._resource
Enter fullscreen mode Exit fullscreen mode

Key resilience features:

  • Multi-server support via MCP_HOSTNAMES_CSV environment variable
  • Automatic reconnection on ClosedResourceError with configurable retries
  • Graceful degradation -- if MCP servers are unavailable, the agent continues with built-in tools only
  • Correlation IDs on every MCP operation for traceability

MCP tools are loaded once and merged with built-in tools when the graph is compiled:

def _get_all_tools(self) -> list[BaseTool]:
    return self.tools + list(self.mcp_tools_by_name.values())
Enter fullscreen mode Exit fullscreen mode

11. Evaluation Framework: LLM-as-Judge

The harness includes a metric-based evaluation framework that uses Langfuse traces as its data source and an LLM as the judge.

Metrics are defined as markdown files in src/evals/metrics/prompts/. Adding a new .md file auto-discovers it as a metric. Built-in metrics include: relevancy, helpfulness, conciseness, hallucination, and toxicity.

Each metric prompt instructs the evaluator LLM to score the output on a 0-to-1 scale:

Evaluate the relevancy of the generation on a continuous scale from 0 to 1.

## Scoring Criteria
A generation can be considered relevant (Score: 1) if it:
- Directly addresses the user's specific question or request
- Provides information that is pertinent to the query
- Stays on topic without introducing unrelated information
Enter fullscreen mode Exit fullscreen mode

The evaluator fetches un-scored traces from the last 24 hours, runs each metric, and pushes scores back to Langfuse:

class Evaluator:
    async def run(self, generate_report_file=True):
        traces = self.__fetch_traces()
        for trace in tqdm(traces, desc="Evaluating traces"):
            for metric in metrics:
                input, output = get_input_output(trace)
                score = await self._run_metric_evaluation(metric, input, output)
                if score:
                    self._push_to_langfuse(trace, score, metric)
Enter fullscreen mode Exit fullscreen mode

The framework uses OpenAI's structured outputs to ensure the LLM returns a valid score and reasoning:

class ScoreSchema(BaseModel):
    score: float  # 0-1
    reasoning: str
Enter fullscreen mode Exit fullscreen mode

Reports are saved as JSON with per-metric averages and per-trace breakdowns. Run evaluations with:

make eval              # Interactive mode
make eval-quick        # Default settings, no prompts
make eval-no-report    # Skip report generation
Enter fullscreen mode Exit fullscreen mode

12. Configuration and Environment Management

The harness uses environment-specific configuration files loaded in priority order:

.env.{environment}.local  (highest priority, git-ignored)
.env.{environment}
.env.local
.env                      (lowest priority)
Enter fullscreen mode Exit fullscreen mode

Four environments are supported -- development, staging, production, and test -- each with automatic overrides:

env_settings = {
    Environment.DEVELOPMENT: {
        "DEBUG": True, "LOG_LEVEL": "DEBUG", "LOG_FORMAT": "console",
        "RATE_LIMIT_DEFAULT": ["1000 per day", "200 per hour"],
    },
    Environment.PRODUCTION: {
        "DEBUG": False, "LOG_LEVEL": "WARNING",
        "RATE_LIMIT_DEFAULT": ["200 per day", "50 per hour"],
    },
}
Enter fullscreen mode Exit fullscreen mode

Explicit environment variables always take precedence over these defaults, so you can override any setting per-deployment without code changes.


13. DevOps: Docker, Compose, and CI/CD

The Dockerfile uses a slim Python base with a non-root user for security:

FROM python:3.13.2-slim

COPY pyproject.toml .
RUN uv venv && . .venv/bin/activate && uv pip install -e .

COPY . .
RUN useradd -m appuser && chown -R appuser:appuser /app
USER appuser

ENTRYPOINT ["/app/scripts/docker-entrypoint.sh"]
CMD ["/app/.venv/bin/uvicorn", "src.app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Enter fullscreen mode Exit fullscreen mode

Docker Compose provides the full monitoring stack:

  • PostgreSQL with pgvector -- Database for users, sessions, checkpoints, and vector memory
  • Prometheus -- Scrapes /metrics from the FastAPI app
  • Grafana -- Pre-configured dashboards for API latency, LLM inference duration, token usage, and rate limiting
  • cAdvisor -- Container-level resource metrics

The Makefile serves as the single CLI entrypoint:

make dev                              # Start development server with uvloop
make test                             # Run pytest
make eval                             # Run evaluations
make docker-build-env ENV=production  # Build Docker image
make docker-compose-up ENV=development # Full stack
make lint                             # Ruff check and format
Enter fullscreen mode Exit fullscreen mode

14. Building Your Own Agent

Every agent lives in its own directory. Start by choosing the architecture that fits your use case -- refer to the comparison in section 2 -- then follow the pattern from the matching reference agent.

Step 1: Create the agent directory

src/app/agents/my_agent/
  __init__.py          # Factory function
  agent.py             # Agent class with graph definition
  system.md            # System prompt template (or AGENTS.md for ReAct)
  tools/
    __init__.py        # Export tools list
    my_tool.py         # Custom tool implementations
Enter fullscreen mode Exit fullscreen mode

Step 2: Choose your architecture

Custom graph workflow (like chatbot) -- Define a StateGraph with explicit nodes and edges. Best when you want full control over execution flow.

Multi-agent subgraphs (like deep research) -- Build separate compiled subgraphs and compose them as nodes in a parent graph. Best for complex tasks with decomposable sub-problems.

ReAct with Deep Agents (like text-to-SQL) -- Use create_deep_agent with markdown memory files, skill directories, and tool kits. Best for domain-specific tool use where the agent should decide its own execution order.

Step 3: Define the system prompt

For graph-based agents, system.md supports {long_term_memory} and {current_date_and_time} placeholders, plus any custom kwargs:

# Name: {agent_name}
# Role: A domain-specific assistant

Your specific instructions here.

## What you know about the user
{long_term_memory}

## Current date and time
{current_date_and_time}
Enter fullscreen mode Exit fullscreen mode

For ReAct agents, use AGENTS.md to define the agent's identity, rules, and planning strategies in natural language.

Step 4: Build the agent class

Follow the pattern from the reference agent that matches your chosen architecture. At minimum, implement agent_invoke (and optionally agent_invoke_stream for SSE support). Apply guardrails either as graph nodes (custom graph / subgraph approaches) or as wrapper logic (ReAct approach):

class MyAgent:
    def __init__(self, name, tools, checkpointer):
        self.name = name
        self.tools = tools
        self.checkpointer = checkpointer

    async def compile(self):
        graph_builder = await self._create_graph()
        self._graph = graph_builder.compile(checkpointer=self.checkpointer, name=self.name)
Enter fullscreen mode Exit fullscreen mode

Step 5: Wire it to an API endpoint

Create a route in src/app/api/v1/ following the chatbot pattern:

@router.post("/chat", response_model=ChatResponse)
@limiter.limit(settings.RATE_LIMIT_ENDPOINTS["my_endpoint"][0])
async def chat(request: Request, chat_request: ChatRequest, session=Depends(get_current_session)):
    agent = await get_my_agent()
    result = await agent.agent_invoke(chat_request.messages, session.id, user_id=session.user_id)
    return ChatResponse(messages=result)
Enter fullscreen mode Exit fullscreen mode

The harness handles everything else: auth, memory retrieval and update, checkpointing, metrics, tracing, and guardrails.


15. Production Readiness Checklist

Here is a summary of the production concerns addressed by the harness:

Concern Implementation
Authentication JWT tokens with session-per-conversation model
Input guardrails Content filter, prompt injection detection, PII blocking
Output guardrails PII redaction, LLM-based safety evaluation
Long-term memory mem0 + pgvector, per-user, non-blocking updates
Context management Tool-result eviction to disk, two-stage conversation summarization
State persistence LangGraph AsyncPostgresSaver with automatic checkpointing
Observability Langfuse tracing, Prometheus metrics, structlog logging
Rate limiting Per-endpoint limits via slowapi, configurable per environment
Error handling Retries with exponential backoff, graceful degradation
Streaming SSE endpoints with token-by-token streaming
Evaluation LLM-as-judge with auto-discovered metrics and Langfuse integration
Deployment Docker with non-root user, Compose with full monitoring stack
MCP Multi-server tool loading with reconnection and graceful fallback
Agent architectures Custom graph, multi-agent subgraphs, and ReAct -- pick the pattern that fits

The gap between a working agent and a production agent is not the LLM logic -- it is everything around it. A harness approach lets you solve these problems once, then focus on what matters: the agent's behavior. Whether you need a simple chatbot with a tool loop, a multi-agent research system with parallel delegation, or an autonomous ReAct agent reasoning over a database, the same infrastructure wraps them all.

The full source code is available on GitHub. Clone it, swap in your agent, and ship.

Top comments (0)