DEV Community

Ali Suleyman TOPUZ
Ali Suleyman TOPUZ

Posted on

Agentic Architectures — Article 5: Harness Engineering and the Agent Runtime Layer

Agentic Architectures — Article 5: Harness Engineering and the Agent Runtime Layer

There's a specific kind of frustration that only agent builders know. You've spent two weeks tuning your LLM. Your evals look clean. You demo it to your team and it works beautifully. Then you push it to production and watch it slowly destroy itself — calling the same tool eleven times, confidently returning a result it never actually verified, or hanging indefinitely because a downstream API returned a 401 and nobody taught it what to do next.

That happened to me on a document analysis agent I built earlier this year. The model was fine. Claude Sonnet was doing exactly what I asked. The problem was everything around it — the runtime layer I'd barely thought about. I had a prompt and some tools. I did not have a harness.

This article is about what I learned building one, and how to build yours on AWS Bedrock with LangGraph as the orchestration backbone. I'll be drawing on a pattern LangChain published around their deepagents-cli work — they moved a coding agent from 52.8% to 66.5% on Terminal Bench 2.0 without changing the model. They only changed the harness. That result is worth taking seriously: read their post here.

We'll go further than their benchmark setup. We'll add the production concerns they didn't cover: identity propagation, JWT handling at the tool boundary, retry policies that distinguish model failures from tool failures, and circuit breakers whose state survives agent restarts.


What Is the Harness, Really?

The harness is everything that wraps the model at runtime. It's not your application code. It's not your prompt templates living in a file. It's the live, executing system that receives the model's output, decides what to do with it, prepares the next input, and enforces the constraints your application needs.

Think of it this way. Your model is a very capable but somewhat unpredictable engine. The harness is the drivetrain — it takes raw engine output and translates it into controlled motion. Without it, you have power but no direction.

┌─────────────────────────────────────────────────────────┐
│                    Agent Application                    │
│                                                         │
│  ┌─────────────────────────────────────────────────┐    │
│  │               HARNESS LAYER                     │    │
│  │                                                 │    │
│  │  System Prompt Architecture                     │    │
│  │  ┌───────────────────────────────────────────┐  │    │
│  │  │         Middleware / Hooks                │  │    │
│  │  │  ┌─────────────────────────────────────┐  │  │    │
│  │  │  │         MODEL (Bedrock)             │  │  │    │
│  │  │  │   Claude / Nova / Titan             │  │  │    │
│  │  │  └─────────────────────────────────────┘  │  │    │
│  │  │  ┌─────────────────────────────────────┐  │  │    │
│  │  │  │         Tool Execution              │  │  │    │
│  │  │  │  Auth │ Retry │ Circuit Breaker     │  │  │    │
│  │  │  └─────────────────────────────────────┘  │  │    │
│  │  └───────────────────────────────────────────┘  │    │
│  │  Reasoning Budget Allocator                     │    │
│  └─────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

The LangChain research identified three primary "knobs" in a harness: system prompt, tools, and middleware. I've found it useful to think in four levers, because reasoning compute allocation is distinct enough to warrant its own category:

+---------------------------+-------------------------------------------+
| Lever                     | What It Controls                          |
+---------------------------+-------------------------------------------+
| System Prompt Architecture| Intent, constraints, verification         |
|                           | guidance, env context injection           |
+---------------------------+-------------------------------------------+
| Tool Design & Availability| What the agent can reach, how calls are   |
|                           | structured, auth at the boundary          |
+---------------------------+-------------------------------------------+
| Middleware / Lifecycle    | Pre/post model call hooks, loop detection,|
| Hooks                     | verification intercepts, identity prop.   |
+---------------------------+-------------------------------------------+
| Reasoning Compute         | Which model per phase, thinking budget,   |
| Allocation                | cost-aware routing                        |
+---------------------------+-------------------------------------------+
Enter fullscreen mode Exit fullscreen mode

Let's build each of these out concretely.


Setting Up: Project Structure and Dependencies

Before we get into each layer, here's the baseline project structure this article assumes:

agent-harness/
├── harness/
│   ├── __init__.py
│   ├── graph.py              # LangGraph state machine
│   ├── middleware/
│   │   ├── __init__.py
│   │   ├── verification.py   # Self-verification hooks
│   │   ├── loop_detection.py # Doom loop prevention
│   │   ├── auth.py           # JWT + credential management
│   │   └── retry.py          # Retry + circuit breaker
│   ├── tools/
│   │   ├── __init__.py
│   │   ├── base.py           # ToolWrapper with auth + retry
│   │   └── registry.py       # Tool registry with circuit state
│   ├── context/
│   │   ├── __init__.py
│   │   └── bootstrap.py      # Environment context injection
│   └── routing/
│       ├── __init__.py
│       └── reasoning.py      # Phase-aware model routing
├── infrastructure/
│   └── dynamodb_tables.tf    # Terraform for loop + circuit state
├── tests/
└── requirements.txt
Enter fullscreen mode Exit fullscreen mode

Core dependencies:

# requirements.txt
langchain>=0.3.0
langchain-aws>=0.2.0
langchain-community>=0.3.0
langgraph>=0.2.0
boto3>=1.35.0
pyjwt>=2.8.0
cryptography>=42.0.0
tenacity>=8.2.0
Enter fullscreen mode Exit fullscreen mode

For local development without Bedrock costs, we'll also support Ollama:

# requirements-local.txt
langchain-ollama>=0.2.0
ollama>=0.3.0
Enter fullscreen mode Exit fullscreen mode

Part 1: Self-Verification Loop

This is the highest-leverage change LangChain made. The failure pattern is subtle and very common: the agent writes a solution, re-reads its own code, decides it looks reasonable, and exits. It never ran anything. It verified its logic against its own reasoning, which is circular.

The fix is a PreCompletionChecklistMiddleware — an intercept that fires before the agent signals completion and forces a structured verification pass.

Here's how I implement this in LangGraph with Bedrock.

First, define the agent state to track verification:

# harness/graph.py
from typing import TypedDict, Annotated, List, Optional
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage


class AgentState(TypedDict):
    messages: Annotated[List[BaseMessage], add_messages]
    phase: str                        # planning | executing | verifying
    verification_passed: bool
    verification_attempts: int
    tool_call_counts: dict            # for loop detection
    agent_run_id: str
    identity_context: dict            # JWT claims, propagated to tools
    circuit_states: dict              # per-tool circuit breaker state
Enter fullscreen mode Exit fullscreen mode

Now build the verification subgraph. The key insight is that verification is its own reasoning pass — not just "check if the answer is non-empty":

# harness/middleware/verification.py
from langchain_aws import ChatBedrock
from langchain_core.messages import SystemMessage, HumanMessage
from harness.graph import AgentState
import boto3


VERIFICATION_SYSTEM_PROMPT = """
You are a strict verification agent. Your job is NOT to produce a solution.
Your job is to verify that the solution already produced is correct and complete.

For every verification request you must:
1. Re-read the original task specification carefully
2. Check that the solution addresses ALL requirements, not just the ones that
   seemed easiest
3. Identify any edge cases that were not handled
4. Check that any file paths, function signatures, or API contracts match
   what was specified, not what seemed convenient
5. If the solution involves code, mentally trace through at least two execution
   paths: the happy path and one error path

Return a JSON object with:
{
  "passed": true | false,
  "issues": ["list of specific issues if failed"],
  "recommendation": "COMPLETE | RETRY_WITH_FIXES | ESCALATE"
}
"""


def build_verification_node(model_id: str = "anthropic.claude-3-5-sonnet-20241022-v2:0"):
    """
    Returns a LangGraph node function that performs structured verification.
    Uses a separate model call with a focused verification system prompt.
    """
    bedrock_client = boto3.client("bedrock-runtime", region_name="us-east-1")

    verifier = ChatBedrock(
        client=bedrock_client,
        model_id=model_id,
        model_kwargs={
            "temperature": 0,       # Verification should be deterministic
            "max_tokens": 1024,
        }
    )

    def verification_node(state: AgentState) -> AgentState:
        # Extract the task spec and proposed solution from message history
        task_spec = extract_task_spec(state["messages"])
        proposed_solution = extract_last_agent_output(state["messages"])

        verification_prompt = f"""
ORIGINAL TASK:
{task_spec}

PROPOSED SOLUTION:
{proposed_solution}

Verify this solution. Return only the JSON object described in your instructions.
"""
        response = verifier.invoke([
            SystemMessage(content=VERIFICATION_SYSTEM_PROMPT),
            HumanMessage(content=verification_prompt)
        ])

        import json
        try:
            result = json.loads(response.content)
        except json.JSONDecodeError:
            # Model didn't follow format — treat as failed verification
            result = {
                "passed": False,
                "issues": ["Verification model returned unparseable output"],
                "recommendation": "RETRY_WITH_FIXES"
            }

        if result["passed"]:
            return {
                **state,
                "phase": "complete",
                "verification_passed": True,
            }
        else:
            # Inject verification feedback back into message history
            # so the agent can see exactly what it missed
            feedback_message = HumanMessage(
                content=f"Verification failed. Issues found:\n" +
                        "\n".join(f"- {issue}" for issue in result["issues"]) +
                        f"\n\nPlease address these issues and try again."
            )
            return {
                **state,
                "messages": state["messages"] + [feedback_message],
                "phase": "executing",
                "verification_passed": False,
                "verification_attempts": state.get("verification_attempts", 0) + 1,
            }

    return verification_node


def should_verify(state: AgentState) -> str:
    """
    Routing function: intercepts AgentFinish and forces verification.
    This is the PreCompletionChecklistMiddleware equivalent.
    """
    if state.get("verification_passed"):
        return "complete"

    max_attempts = 3
    if state.get("verification_attempts", 0) >= max_attempts:
        # Don't loop forever — escalate after max attempts
        return "escalate"

    return "verify"
Enter fullscreen mode Exit fullscreen mode

For local development, swap the Bedrock client for Ollama — the verification logic is identical:

# Local dev alternative using Ollama
# pip install langchain-ollama
from langchain_ollama import ChatOllama

def build_verification_node_local(model: str = "llama3.1:8b"):
    """
    Same verification logic, Ollama backend for local testing.
    Run: ollama pull llama3.1:8b
    """
    verifier = ChatOllama(
        model=model,
        temperature=0,
        format="json",  # Ollama's native JSON mode
    )
    # ... rest of implementation identical
Enter fullscreen mode Exit fullscreen mode

Wire it into the LangGraph state machine:

# harness/graph.py
from langgraph.graph import StateGraph, END
from harness.middleware.verification import build_verification_node, should_verify

def build_agent_graph():
    graph = StateGraph(AgentState)

    graph.add_node("agent", agent_node)
    graph.add_node("tools", tool_node)
    graph.add_node("verify", build_verification_node())
    graph.add_node("escalate", escalation_node)

    graph.set_entry_point("agent")

    graph.add_conditional_edges(
        "agent",
        route_agent_output,           # tools | should_verify | end
        {
            "tools": "tools",
            "verify": "verify",
            "end": END,
        }
    )

    graph.add_conditional_edges(
        "verify",
        should_verify,
        {
            "complete": END,
            "verify": "agent",        # feed issues back and re-run
            "escalate": "escalate",
        }
    )

    graph.add_edge("tools", "agent")

    return graph.compile(checkpointer=MemorySaver())
Enter fullscreen mode Exit fullscreen mode

Reference: LangGraph docs on conditional edges and state management: https://langchain-ai.github.io/langgraph/concepts/low_level/


Part 2: Context Injection as Infrastructure

Agents fail in unfamiliar environments for a simple reason: they spend the first several steps discovering context they should have arrived with. Every discovery step is a potential failure point, a source of latency, and a token cost you're paying for nothing.

The fix is a context bootstrap that runs before the first reasoning step and injects a structured environment summary directly into the system prompt.

# harness/context/bootstrap.py
import boto3
import json
from typing import Optional
from langchain_core.messages import SystemMessage


class ContextBootstrap:
    """
    Runs at agent startup. Discovers environment state and injects it
    into the agent's initial context so the first reasoning step
    can focus on the task, not on discovery.
    """

    def __init__(
        self,
        knowledge_base_id: Optional[str] = None,
        region: str = "us-east-1"
    ):
        self.bedrock_agent = boto3.client(
            "bedrock-agent-runtime",
            region_name=region
        )
        self.knowledge_base_id = knowledge_base_id
        self.secrets_client = boto3.client("secretsmanager", region_name=region)

    def build_environment_context(
        self,
        task_spec: str,
        available_tools: list[str],
        constraints: dict,
    ) -> str:
        """
        Assembles the environment context block injected at agent start.
        """
        # Retrieve relevant docs from Knowledge Base if configured
        kb_context = ""
        if self.knowledge_base_id:
            kb_context = self._retrieve_from_knowledge_base(task_spec)

        tool_manifest = self._build_tool_manifest(available_tools)
        constraint_summary = self._format_constraints(constraints)

        return f"""
## Environment Context (injected at startup — do not re-discover)

### Available Tools
{tool_manifest}

### Constraints
{constraint_summary}

### Relevant Knowledge Base Context
{kb_context if kb_context else "No KB context retrieved for this task."}

### Verification Standards
Your work will be evaluated programmatically. This means:
- File paths must match specifications exactly, including case and extension
- Function signatures must match the interface contract, not your preference
- Any output files must be written to the exact paths specified
- Tests must pass in an automated runner, not just look correct to you

### Problem Solving Protocol
1. PLAN: Read the full task. Identify all requirements including edge cases.
2. BUILD: Implement with testability in mind from the first line.
3. VERIFY: Run your solution. Read the full output. Compare against the spec.
4. FIX: If verification fails, re-read the spec before touching the code.

Do not skip from BUILD to reporting completion.
"""

    def _retrieve_from_knowledge_base(self, query: str) -> str:
        """Retrieve relevant context from Bedrock Knowledge Base."""
        try:
            response = self.bedrock_agent.retrieve(
                knowledgeBaseId=self.knowledge_base_id,
                retrievalQuery={"text": query},
                retrievalConfiguration={
                    "vectorSearchConfiguration": {"numberOfResults": 5}
                }
            )
            chunks = [
                r["content"]["text"]
                for r in response.get("retrievalResults", [])
                if r.get("score", 0) > 0.6   # Only high-confidence retrievals
            ]
            return "\n\n".join(chunks)
        except Exception as e:
            # KB retrieval failure should never crash the agent
            return f"[KB retrieval unavailable: {str(e)}]"

    def _build_tool_manifest(self, tool_names: list[str]) -> str:
        lines = []
        for name in tool_names:
            lines.append(f"  - {name}")
        return "\n".join(lines)

    def _format_constraints(self, constraints: dict) -> str:
        lines = []
        for key, value in constraints.items():
            lines.append(f"  - {key}: {value}")
        return "\n".join(lines)
Enter fullscreen mode Exit fullscreen mode

Inject this at graph entry point:

# harness/graph.py
def build_agent_graph(
    task_spec: str,
    knowledge_base_id: Optional[str] = None
):
    bootstrap = ContextBootstrap(knowledge_base_id=knowledge_base_id)

    env_context = bootstrap.build_environment_context(
        task_spec=task_spec,
        available_tools=["read_file", "write_file", "run_tests", "search_docs"],
        constraints={
            "max_execution_time": "300s",
            "max_tool_calls": 50,
            "verification_required": True,
        }
    )

    # This becomes part of the initial state, not a runtime discovery
    initial_state = AgentState(
        messages=[SystemMessage(content=BASE_SYSTEM_PROMPT + env_context)],
        phase="planning",
        verification_passed=False,
        verification_attempts=0,
        tool_call_counts={},
        agent_run_id=str(uuid.uuid4()),
        identity_context={},
        circuit_states={},
    )

    return graph, initial_state
Enter fullscreen mode Exit fullscreen mode

Reference: AWS Bedrock Knowledge Bases retrieval API: https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-retrieve.html


Part 3: Loop Detection and Doom Loop Prevention

I've seen this pattern in traces more times than I care to admit: the agent makes a small edit to a file, runs a test, sees it fail, makes a nearly identical edit to the same file, runs the test, sees it fail again. Repeat ten times. Each iteration is a variation of the same broken approach.

LangChain calls these "doom loops" and addresses them with a LoopDetectionMiddleware that tracks per-file edit counts. I've extended this to be state-aware and externalized to DynamoDB — which matters in multi-agent and parallel execution scenarios.

First, the DynamoDB table (Terraform):

# infrastructure/dynamodb_tables.tf
resource "aws_dynamodb_table" "agent_loop_state" {
  name         = "agent-loop-state"
  billing_mode = "PAY_PER_REQUEST"
  hash_key     = "run_id"
  range_key    = "resource_key"

  attribute {
    name = "run_id"
    type = "S"
  }

  attribute {
    name = "resource_key"
    type = "S"
  }

  ttl {
    attribute_name = "expires_at"
    enabled        = true
  }

  tags = {
    Environment = "production"
    Component   = "agent-harness"
  }
}
Enter fullscreen mode Exit fullscreen mode

Now the loop detection middleware:

# harness/middleware/loop_detection.py
import boto3
import time
from typing import Optional
from langchain_core.messages import HumanMessage
from harness.graph import AgentState


class LoopDetectionMiddleware:
    """
    Tracks tool call patterns per agent run using DynamoDB.
    External state is critical for multi-agent and parallel execution —
    if two branches of the same agent are both stuck in a loop,
    an in-memory counter wouldn't catch it.
    """

    LOOP_THRESHOLD = 4       # Same resource edited this many times → intervene
    SIMILARITY_WINDOW = 6    # Look at the last N tool calls for pattern detection
    TTL_SECONDS = 3600       # State expires after 1 hour

    def __init__(self, table_name: str = "agent-loop-state", region: str = "us-east-1"):
        self.dynamodb = boto3.resource("dynamodb", region_name=region)
        self.table = self.dynamodb.Table(table_name)

    def record_tool_call(
        self,
        run_id: str,
        tool_name: str,
        resource_key: str,     # file path, API endpoint, etc.
    ) -> int:
        """
        Increments the call count for (run_id, resource_key).
        Returns the new count.
        """
        expires_at = int(time.time()) + self.TTL_SECONDS

        response = self.table.update_item(
            Key={"run_id": run_id, "resource_key": resource_key},
            UpdateExpression=(
                "SET call_count = if_not_exists(call_count, :zero) + :inc, "
                "tool_name = :tool, "
                "expires_at = :ttl"
            ),
            ExpressionAttributeValues={
                ":zero": 0,
                ":inc": 1,
                ":tool": tool_name,
                ":ttl": expires_at,
            },
            ReturnValues="UPDATED_NEW",
        )
        return int(response["Attributes"]["call_count"])

    def check_and_inject(self, state: AgentState) -> AgentState:
        """
        Called as a LangGraph node before each tool execution.
        If a doom loop is detected, injects a reconsideration message.
        """
        last_tool_call = extract_last_tool_call(state["messages"])
        if not last_tool_call:
            return state

        tool_name = last_tool_call.get("name")
        resource_key = last_tool_call.get("args", {}).get("path") or \
                       last_tool_call.get("args", {}).get("endpoint") or \
                       tool_name   # Fallback to tool name if no specific resource

        count = self.record_tool_call(
            run_id=state["agent_run_id"],
            tool_name=tool_name,
            resource_key=resource_key,
        )

        if count >= self.LOOP_THRESHOLD:
            intervention_message = HumanMessage(
                content=f"""
[LOOP DETECTION] You have interacted with '{resource_key}' {count} times
in this session. This suggests your current approach may not be working.

Before proceeding:
1. Step back and re-read the original task specification
2. Identify the root cause of the repeated failures — not the symptoms
3. Consider a fundamentally different approach
4. If you're stuck, explain what you've tried and what's blocking you

Do not make another small variation of the same attempt.
"""
            )
            return {
                **state,
                "messages": state["messages"] + [intervention_message],
            }

        return state

    def get_loop_summary(self, run_id: str) -> dict:
        """
        Returns a summary of loop patterns for a completed run.
        Useful for post-run analysis and harness improvement.
        """
        response = self.table.query(
            KeyConditionExpression="run_id = :rid",
            ExpressionAttributeValues={":rid": run_id},
        )
        return {
            item["resource_key"]: item["call_count"]
            for item in response.get("Items", [])
            if item.get("call_count", 0) > 1
        }
Enter fullscreen mode Exit fullscreen mode

The loop detector also integrates with the circuit breaker layer — if a tool is in an open circuit state and the agent keeps trying to call it anyway, that registers as a loop. We'll connect these in Part 5.


Part 4: Reasoning Budget Management

This is the lever I was most skeptical about initially. Managing reasoning compute sounds like premature optimization. It isn't. On Bedrock, the cost and latency difference between a fast model for simple execution steps and a slow model for complex planning steps is substantial enough to matter at scale.

LangChain's finding — a "reasoning sandwich" of max-budget reasoning at planning and verification, medium at execution — maps cleanly to model selection on Bedrock.

# harness/routing/reasoning.py
from enum import Enum
from dataclasses import dataclass
from langchain_aws import ChatBedrock
import boto3


class AgentPhase(Enum):
    PLANNING = "planning"
    EXECUTING = "executing"
    VERIFYING = "verifying"
    DEBUGGING = "debugging"


@dataclass
class ModelConfig:
    model_id: str
    max_tokens: int
    temperature: float
    thinking_budget: Optional[int]    # For extended thinking on Claude 3.7+
    description: str


# Phase-to-model mapping for Bedrock
# Adjust model IDs as new versions become available
PHASE_MODEL_MAP = {
    AgentPhase.PLANNING: ModelConfig(
        model_id="anthropic.claude-3-7-sonnet-20250219-v1:0",
        max_tokens=16000,
        temperature=0.2,
        thinking_budget=8000,          # Extended thinking for deep planning
        description="Slow, thorough — plan the whole approach before touching tools"
    ),
    AgentPhase.EXECUTING: ModelConfig(
        model_id="anthropic.claude-3-5-sonnet-20241022-v2:0",
        max_tokens=8000,
        temperature=0,
        thinking_budget=None,          # No extended thinking for execution steps
        description="Fast, precise — follow the plan, call tools correctly"
    ),
    AgentPhase.VERIFYING: ModelConfig(
        model_id="anthropic.claude-3-7-sonnet-20250219-v1:0",
        max_tokens=8000,
        temperature=0,
        thinking_budget=4000,          # Moderate thinking for verification
        description="Thorough check — don't rubber-stamp the execution output"
    ),
    AgentPhase.DEBUGGING: ModelConfig(
        model_id="anthropic.claude-3-7-sonnet-20250219-v1:0",
        max_tokens=16000,
        temperature=0.3,
        thinking_budget=10000,         # Maximum reasoning for stuck situations
        description="Deep diagnosis — something is wrong and we need to find it"
    ),
}


class ReasoningRouter:
    """
    Selects the appropriate Bedrock model and configuration
    based on the current agent phase.
    """

    def __init__(self, region: str = "us-east-1"):
        self.bedrock_client = boto3.client("bedrock-runtime", region_name=region)
        self._model_cache: dict[AgentPhase, ChatBedrock] = {}

    def get_model(self, phase: AgentPhase) -> ChatBedrock:
        if phase not in self._model_cache:
            config = PHASE_MODEL_MAP[phase]
            model_kwargs = {
                "temperature": config.temperature,
                "max_tokens": config.max_tokens,
            }
            # Extended thinking only on Claude 3.7+
            if config.thinking_budget:
                model_kwargs["thinking"] = {
                    "type": "enabled",
                    "budget_tokens": config.thinking_budget
                }

            self._model_cache[phase] = ChatBedrock(
                client=self.bedrock_client,
                model_id=config.model_id,
                model_kwargs=model_kwargs,
            )
        return self._model_cache[phase]

    def get_phase_from_state(self, state: AgentState) -> AgentPhase:
        phase_str = state.get("phase", "executing")
        try:
            return AgentPhase(phase_str)
        except ValueError:
            return AgentPhase.EXECUTING
Enter fullscreen mode Exit fullscreen mode

Approximate cost/latency profile across phases (AWS Bedrock on-demand pricing, us-east-1, approximate as of mid-2025):

+------------------+---------------------------+----------+-----------+-----------+
| Phase            | Model                     | Approx.  | Approx.   | Thinking  |
|                  |                           | Input    | Output    | Budget    |
|                  |                           | $/1M tok | $/1M tok  | Tokens    |
+------------------+---------------------------+----------+-----------+-----------+
| PLANNING         | Claude 3.7 Sonnet         | $3.00    | $15.00    | 8,000     |
| EXECUTING        | Claude 3.5 Sonnet v2      | $3.00    | $15.00    | None      |
| VERIFYING        | Claude 3.7 Sonnet         | $3.00    | $15.00    | 4,000     |
| DEBUGGING        | Claude 3.7 Sonnet         | $3.00    | $15.00    | 10,000    |
+------------------+---------------------------+----------+-----------+-----------+
Note: Thinking tokens billed at output rate. Budget ≠ tokens used — model
decides how much thinking to apply within the budget ceiling.
Enter fullscreen mode Exit fullscreen mode

The key principle: you're not saving money by using a cheaper model for execution. You're spending budget more intentionally — reserving the deep reasoning capacity for the moments where it actually changes the outcome.

Reference: AWS Bedrock pricing and model catalog: https://aws.amazon.com/bedrock/pricing/
Reference: Claude extended thinking on Bedrock: https://docs.aws.amazon.com/bedrock/latest/userguide/inference-extended-thinking.html


Part 5: Production Hardening — Auth, Retry, and Circuit Breakers

This is the part most harness engineering articles skip. They show you how to make the agent smart. They don't show you what happens when the JWT expires mid-run, or when a downstream API starts returning 503s and the agent keeps retrying against it for fifteen minutes.

5.1 Identity Propagation and JWT at the Tool Boundary

The most important architectural decision here: authentication happens at the tool boundary, not inside the agent reasoning loop.

The agent should never see raw credentials. It doesn't need to. It operates on an identity context that was resolved before it started running, and that context gets propagated transparently into every tool call.

# harness/middleware/auth.py
import boto3
import jwt
import time
import requests
import json
from typing import Optional
from dataclasses import dataclass
from functools import lru_cache


@dataclass
class IdentityContext:
    """
    Resolved identity for an agent run.
    Attached to agent state, propagated to all tool calls.
    """
    subject: str              # user ID or service ID
    roles: list[str]          # ["analyst", "data-reader"] etc.
    tenant_id: str
    access_token: str         # short-lived JWT
    refresh_token: Optional[str]
    token_expires_at: float   # Unix timestamp


class CredentialManager:
    """
    Manages JWT lifecycle for a single agent run.
    Tools call this to get a valid token — they never store tokens themselves.
    """

    # Refresh when less than 5 minutes remain on the token
    REFRESH_THRESHOLD_SECONDS = 300

    def __init__(
        self,
        token_endpoint: str,
        client_id_secret: str,    # ARN of Secrets Manager secret
        region: str = "us-east-1"
    ):
        self.token_endpoint = token_endpoint
        self.region = region
        self._secrets_client = boto3.client("secretsmanager", region_name=region)
        self._client_credentials = self._load_client_credentials(client_id_secret)
        self._current_context: Optional[IdentityContext] = None

    def _load_client_credentials(self, secret_arn: str) -> dict:
        """Load client_id/client_secret from Secrets Manager."""
        response = self._secrets_client.get_secret_value(SecretId=secret_arn)
        return json.loads(response["SecretString"])

    def get_valid_token(self) -> str:
        """
        Returns a valid access token. Refreshes automatically if needed.
        This is the only method tools should call.
        """
        if self._current_context is None:
            raise RuntimeError("CredentialManager not initialized. Call initialize() first.")

        # Check if we need to refresh
        if self._should_refresh():
            self._refresh_token()

        return self._current_context.access_token

    def _should_refresh(self) -> bool:
        if not self._current_context:
            return True
        remaining = self._current_context.token_expires_at - time.time()
        return remaining < self.REFRESH_THRESHOLD_SECONDS

    def _refresh_token(self):
        """Refresh using the refresh token or client credentials flow."""
        ctx = self._current_context

        if ctx and ctx.refresh_token:
            # Use refresh token if available
            payload = {
                "grant_type": "refresh_token",
                "refresh_token": ctx.refresh_token,
                "client_id": self._client_credentials["client_id"],
                "client_secret": self._client_credentials["client_secret"],
            }
        else:
            # Fall back to client credentials
            payload = {
                "grant_type": "client_credentials",
                "client_id": self._client_credentials["client_id"],
                "client_secret": self._client_credentials["client_secret"],
                "scope": "agent:execute",
            }

        response = requests.post(self.token_endpoint, data=payload, timeout=10)
        response.raise_for_status()
        token_data = response.json()

        # Decode to extract claims without full verification
        # (we trust our own token endpoint; full verification would re-verify
        # the JWKS on every call which is unnecessarily expensive)
        claims = jwt.decode(
            token_data["access_token"],
            options={"verify_signature": False}
        )

        self._current_context = IdentityContext(
            subject=claims.get("sub", ""),
            roles=claims.get("roles", []),
            tenant_id=claims.get("tenant_id", ""),
            access_token=token_data["access_token"],
            refresh_token=token_data.get("refresh_token"),
            token_expires_at=claims.get("exp", time.time() + 3600),
        )

    def initialize(self, initial_token: str, refresh_token: Optional[str] = None):
        """Call this before the agent run starts."""
        claims = jwt.decode(
            initial_token,
            options={"verify_signature": False}
        )
        self._current_context = IdentityContext(
            subject=claims.get("sub", ""),
            roles=claims.get("roles", []),
            tenant_id=claims.get("tenant_id", ""),
            access_token=initial_token,
            refresh_token=refresh_token,
            token_expires_at=claims.get("exp", time.time() + 3600),
        )

    def check_role(self, required_role: str) -> bool:
        if not self._current_context:
            return False
        return required_role in self._current_context.roles
Enter fullscreen mode Exit fullscreen mode

Now wrap every tool with role checking and credential injection:

# harness/tools/base.py
from functools import wraps
from typing import Callable, Any
from harness.middleware.auth import CredentialManager


class AuthorizedToolWrapper:
    """
    Wraps a tool function with role-based authorization.
    The tool implementation never handles auth — the wrapper does.
    """

    def __init__(
        self,
        tool_fn: Callable,
        required_role: str,
        credential_manager: CredentialManager,
    ):
        self.tool_fn = tool_fn
        self.required_role = required_role
        self.credential_manager = credential_manager
        self.__name__ = tool_fn.__name__

    def __call__(self, *args, **kwargs) -> Any:
        # Role check first — fail fast before any external calls
        if not self.credential_manager.check_role(self.required_role):
            raise PermissionError(
                f"Agent identity does not have required role: {self.required_role}. "
                f"Current roles: {self.credential_manager._current_context.roles}"
            )

        # Inject the current valid token into kwargs
        kwargs["_auth_token"] = self.credential_manager.get_valid_token()

        return self.tool_fn(*args, **kwargs)


# Example: wrapping a document reader tool
def _read_document_impl(document_id: str, _auth_token: str = None) -> str:
    """Internal implementation — receives token as injected kwarg."""
    headers = {"Authorization": f"Bearer {_auth_token}"}
    response = requests.get(
        f"https://docs-api.internal/documents/{document_id}",
        headers=headers,
        timeout=30,
    )
    response.raise_for_status()
    return response.json()["content"]


# Usage: wrap at registration time, not at call time
def build_tool_registry(credential_manager: CredentialManager) -> dict:
    return {
        "read_document": AuthorizedToolWrapper(
            tool_fn=_read_document_impl,
            required_role="document-reader",
            credential_manager=credential_manager,
        ),
        # "write_document": requires "document-writer" role
        # "execute_query": requires "data-analyst" role
    }
Enter fullscreen mode Exit fullscreen mode

Reference: AWS Secrets Manager with Python: https://docs.aws.amazon.com/secretsmanager/latest/userguide/retrieving-secrets_cache-python.html


5.2 Retry Policies: Model Calls vs. Tool Calls

This is a distinction that took me longer to appreciate than it should have. Model call failures and tool call failures require different retry strategies because they have different failure modes.

+---------------------+------------------+---------------------------+--------+
| Failure Source      | Error Type       | Retry Strategy            | Max    |
+---------------------+------------------+---------------------------+--------+
| Bedrock model call  | 429 throttling   | Exp backoff + jitter,     | 5      |
|                     |                  | fallback to smaller model |        |
+---------------------+------------------+---------------------------+--------+
| Bedrock model call  | 503 service      | Exp backoff               | 3      |
+---------------------+------------------+---------------------------+--------+
| External API (tool) | 429 rate limit   | Exp backoff + jitter      | 3      |
+---------------------+------------------+---------------------------+--------+
| External API (tool) | 401 unauthorized | NO RETRY — refresh token, | 1      |
|                     |                  | then single retry         |        |
+---------------------+------------------+---------------------------+--------+
| External API (tool) | 500 server error | Exp backoff               | 2      |
+---------------------+------------------+---------------------------+--------+
| External API (tool) | 400 bad request  | NO RETRY — agent logic    | 0      |
|                     |                  | error, surface immediately|        |
+---------------------+------------------+---------------------------+--------+
| External API (tool) | Connection error | Exp backoff               | 3      |
+---------------------+------------------+---------------------------+--------+
Enter fullscreen mode Exit fullscreen mode

Implementation using tenacity — the most production-tested retry library in the Python ecosystem:

# harness/middleware/retry.py
import time
import random
import logging
from typing import Callable, TypeVar, Optional
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential_jitter,
    retry_if_exception,
    before_sleep_log,
    RetryError,
)
import boto3
import botocore.exceptions
import requests.exceptions

logger = logging.getLogger(__name__)

T = TypeVar("T")


def is_retryable_bedrock_error(exception: Exception) -> bool:
    """Identify transient Bedrock errors worth retrying."""
    if isinstance(exception, botocore.exceptions.ClientError):
        code = exception.response["Error"]["Code"]
        return code in (
            "ThrottlingException",
            "ServiceUnavailableException",
            "InternalServerException",
        )
    return False


def is_retryable_api_error(exception: Exception) -> bool:
    """Identify transient HTTP errors from external APIs."""
    if isinstance(exception, requests.exceptions.ConnectionError):
        return True
    if isinstance(exception, requests.exceptions.HTTPError):
        status = exception.response.status_code if exception.response else 0
        # 429, 500, 502, 503, 504 are retryable; 400, 401, 403, 404 are not
        return status in (429, 500, 502, 503, 504)
    return False


# Model call retry — more patient, allows fallback model
model_retry = retry(
    retry=retry_if_exception(is_retryable_bedrock_error),
    wait=wait_exponential_jitter(initial=2, max=60, jitter=5),
    stop=stop_after_attempt(5),
    before_sleep=before_sleep_log(logger, logging.WARNING),
    reraise=True,
)

# Tool/API call retry — stricter, less patient
tool_retry = retry(
    retry=retry_if_exception(is_retryable_api_error),
    wait=wait_exponential_jitter(initial=1, max=30, jitter=3),
    stop=stop_after_attempt(3),
    before_sleep=before_sleep_log(logger, logging.WARNING),
    reraise=True,
)


class ModelCallWithFallback:
    """
    Wraps a Bedrock model call with retry and fallback to a cheaper model
    if the primary model is persistently throttled.
    """

    def __init__(
        self,
        primary_model_id: str,
        fallback_model_id: str,
        region: str = "us-east-1",
    ):
        client = boto3.client("bedrock-runtime", region_name=region)
        from langchain_aws import ChatBedrock
        self.primary = ChatBedrock(client=client, model_id=primary_model_id)
        self.fallback = ChatBedrock(client=client, model_id=fallback_model_id)

    @model_retry
    def invoke_primary(self, messages):
        return self.primary.invoke(messages)

    def invoke(self, messages):
        try:
            return self.invoke_primary(messages)
        except RetryError:
            logger.warning(
                "Primary model exhausted retries, falling back to: %s",
                self.fallback.model_id
            )
            return self.fallback.invoke(messages)
Enter fullscreen mode Exit fullscreen mode

5.3 Circuit Breakers in Multi-Agent Contexts

The circuit breaker pattern — closed (normal), open (failing, stop trying), half-open (testing recovery) — is well understood in microservices. What's different in agent systems is that circuit state needs to be shared across all concurrent agent activity for a given deployment.

If your agent spawns sub-agents or runs parallel tool execution branches, you do not want five concurrent branches all independently discovering that the same API is down. The first failure should open the circuit and inform all of them.

# harness/middleware/circuit_breaker.py
import boto3
import time
import json
import logging
from enum import Enum
from dataclasses import dataclass, asdict
from typing import Optional

logger = logging.getLogger(__name__)


class CircuitState(Enum):
    CLOSED = "CLOSED"        # Normal operation
    OPEN = "OPEN"            # Failing — reject calls immediately
    HALF_OPEN = "HALF_OPEN"  # Testing recovery — allow one probe call


@dataclass
class CircuitRecord:
    tool_name: str
    state: str
    failure_count: int
    last_failure_time: float
    last_state_change: float
    recovery_probe_sent: bool


class DynamoDBCircuitBreaker:
    """
    Circuit breaker with state in DynamoDB.
    Shared across all agent instances and sub-agents in the same deployment.
    """

    FAILURE_THRESHOLD = 5         # Open circuit after this many failures
    RECOVERY_TIMEOUT = 60         # Seconds before attempting half-open
    SUCCESS_TO_CLOSE = 2          # Successful calls in half-open to re-close

    def __init__(
        self,
        table_name: str = "agent-circuit-state",
        region: str = "us-east-1"
    ):
        self.dynamodb = boto3.resource("dynamodb", region_name=region)
        self.table = self.dynamodb.Table(table_name)

    def get_state(self, tool_name: str) -> CircuitState:
        try:
            response = self.table.get_item(Key={"tool_name": tool_name})
            if "Item" not in response:
                return CircuitState.CLOSED

            record = CircuitRecord(**response["Item"])

            # Check if an open circuit should transition to half-open
            if record.state == CircuitState.OPEN.value:
                age = time.time() - record.last_state_change
                if age > self.RECOVERY_TIMEOUT:
                    self._transition_to_half_open(tool_name)
                    return CircuitState.HALF_OPEN

            return CircuitState(record.state)
        except Exception as e:
            # Circuit breaker failure should never block execution
            logger.error("Circuit breaker read failed: %s", e)
            return CircuitState.CLOSED

    def record_success(self, tool_name: str):
        try:
            response = self.table.get_item(Key={"tool_name": tool_name})
            if "Item" not in response:
                return

            record = CircuitRecord(**response["Item"])
            if record.state == CircuitState.HALF_OPEN.value:
                # Count successes toward re-closing the circuit
                # Simplified: one success closes it (adjust threshold as needed)
                self._transition_to_closed(tool_name)
        except Exception as e:
            logger.error("Circuit breaker success recording failed: %s", e)

    def record_failure(self, tool_name: str):
        try:
            now = time.time()
            response = self.table.update_item(
                Key={"tool_name": tool_name},
                UpdateExpression=(
                    "SET failure_count = if_not_exists(failure_count, :zero) + :inc, "
                    "last_failure_time = :now, "
                    "last_state_change = if_not_exists(last_state_change, :now)"
                ),
                ExpressionAttributeValues={
                    ":zero": 0, ":inc": 1, ":now": Decimal(str(now))
                },
                ReturnValues="UPDATED_NEW",
            )
            new_count = int(response["Attributes"]["failure_count"])

            if new_count >= self.FAILURE_THRESHOLD:
                self._open_circuit(tool_name)
        except Exception as e:
            logger.error("Circuit breaker failure recording failed: %s", e)

    def _open_circuit(self, tool_name: str):
        logger.warning("Opening circuit for tool: %s", tool_name)
        self.table.update_item(
            Key={"tool_name": tool_name},
            UpdateExpression="SET #s = :state, last_state_change = :now",
            ExpressionAttributeNames={"#s": "state"},
            ExpressionAttributeValues={
                ":state": CircuitState.OPEN.value,
                ":now": Decimal(str(time.time())),
            },
        )

    def _transition_to_half_open(self, tool_name: str):
        logger.info("Transitioning circuit to HALF_OPEN for tool: %s", tool_name)
        self.table.update_item(
            Key={"tool_name": tool_name},
            UpdateExpression="SET #s = :state, last_state_change = :now",
            ExpressionAttributeNames={"#s": "state"},
            ExpressionAttributeValues={
                ":state": CircuitState.HALF_OPEN.value,
                ":now": Decimal(str(time.time())),
            },
        )

    def _transition_to_closed(self, tool_name: str):
        logger.info("Closing circuit for tool: %s", tool_name)
        self.table.update_item(
            Key={"tool_name": tool_name},
            UpdateExpression=(
                "SET #s = :state, failure_count = :zero, last_state_change = :now"
            ),
            ExpressionAttributeNames={"#s": "state"},
            ExpressionAttributeValues={
                ":state": CircuitState.CLOSED.value,
                ":zero": 0,
                ":now": Decimal(str(time.time())),
            },
        )


class ProtectedToolExecutor:
    """
    Wraps tool calls with both circuit breaker and retry logic.
    This is the outermost wrapper — one place where both concerns live.
    """

    def __init__(
        self,
        circuit_breaker: DynamoDBCircuitBreaker,
        loop_detector: Optional["LoopDetectionMiddleware"] = None,
    ):
        self.circuit_breaker = circuit_breaker
        self.loop_detector = loop_detector

    def execute(self, tool_name: str, tool_fn: Callable, *args, **kwargs) -> Any:
        state = self.circuit_breaker.get_state(tool_name)

        if state == CircuitState.OPEN:
            # Don't call the tool — surface the unavailability to the agent
            raise ToolUnavailableError(
                f"Tool '{tool_name}' circuit is OPEN (too many recent failures). "
                f"Do not retry this tool. Consider an alternative approach "
                f"or report that this capability is currently unavailable."
            )

        try:
            result = tool_fn(*args, **kwargs)
            self.circuit_breaker.record_success(tool_name)
            return result

        except ToolUnavailableError:
            raise  # Already handled above

        except Exception as e:
            self.circuit_breaker.record_failure(tool_name)
            raise


class ToolUnavailableError(Exception):
    """
    Raised when a tool's circuit is open.
    The agent should treat this as a signal to reconsider its plan,
    not as a transient error to retry.
    """
    pass
Enter fullscreen mode Exit fullscreen mode

Connect the circuit breaker to the loop detector — if the agent keeps attempting an open-circuit tool, that's a doom loop and should be caught:

# In loop_detection.py — extend check_and_inject
def check_and_inject(self, state: AgentState, circuit_breaker: DynamoDBCircuitBreaker) -> AgentState:
    last_tool_call = extract_last_tool_call(state["messages"])
    if not last_tool_call:
        return state

    tool_name = last_tool_call.get("name")

    # Check circuit state before recording the call
    circuit_state = circuit_breaker.get_state(tool_name)
    if circuit_state == CircuitState.OPEN:
        message = HumanMessage(
            content=f"[CIRCUIT OPEN] '{tool_name}' is currently unavailable "
                    f"(circuit breaker open). You have attempted to call it despite "
                    f"this. Please reconsider your approach without relying on this tool."
        )
        return {**state, "messages": state["messages"] + [message]}

    # Normal loop detection continues...
    return self._check_loop_count(state, tool_name)
Enter fullscreen mode Exit fullscreen mode

Reference: AWS DynamoDB conditional writes: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithItems.html#WorkingWithItems.ConditionalUpdate
Reference: Tenacity retry library: https://tenacity.readthedocs.io/


Part 6: Putting It All Together — The Full Harness Initialization

Here's how all these components assemble into a single agent run:

# harness/runtime.py
import uuid
from typing import Optional
from harness.graph import build_agent_graph, AgentState
from harness.middleware.auth import CredentialManager
from harness.middleware.loop_detection import LoopDetectionMiddleware
from harness.middleware.circuit_breaker import DynamoDBCircuitBreaker, ProtectedToolExecutor
from harness.middleware.verification import build_verification_node
from harness.context.bootstrap import ContextBootstrap
from harness.routing.reasoning import ReasoningRouter
from harness.tools.base import build_tool_registry


class AgentHarness:
    """
    Top-level harness assembler. One instance per agent deployment.
    Run instances are created via execute().
    """

    def __init__(
        self,
        token_endpoint: str,
        client_id_secret_arn: str,
        knowledge_base_id: Optional[str] = None,
        region: str = "us-east-1",
    ):
        # Credential management
        self.credential_manager = CredentialManager(
            token_endpoint=token_endpoint,
            client_id_secret=client_id_secret_arn,
            region=region,
        )

        # Infrastructure components
        self.loop_detector = LoopDetectionMiddleware(region=region)
        self.circuit_breaker = DynamoDBCircuitBreaker(region=region)
        self.tool_executor = ProtectedToolExecutor(
            circuit_breaker=self.circuit_breaker,
            loop_detector=self.loop_detector,
        )
        self.context_bootstrap = ContextBootstrap(
            knowledge_base_id=knowledge_base_id,
            region=region,
        )
        self.reasoning_router = ReasoningRouter(region=region)

        # Tool registry (wrapped with auth + retry + circuit breaker)
        self.tools = build_tool_registry(self.credential_manager)

    def execute(
        self,
        task_spec: str,
        initial_token: str,
        refresh_token: Optional[str] = None,
    ) -> dict:
        run_id = str(uuid.uuid4())

        # Initialize credentials for this run
        self.credential_manager.initialize(initial_token, refresh_token)

        # Build context-enriched initial state
        graph, initial_state = build_agent_graph(
            task_spec=task_spec,
            knowledge_base_id=self.context_bootstrap.knowledge_base_id,
            run_id=run_id,
        )

        # Execute with full harness active
        config = {"configurable": {"thread_id": run_id}}
        final_state = graph.invoke(initial_state, config=config)

        return {
            "run_id": run_id,
            "result": extract_final_result(final_state),
            "verification_passed": final_state.get("verification_passed", False),
            "loop_summary": self.loop_detector.get_loop_summary(run_id),
            "phases_used": extract_phase_history(final_state),
        }
Enter fullscreen mode Exit fullscreen mode

Production Reality Check

Every series article has one of these, and this is where I try to be honest about where the ideas fall short.

Harnesses are model-specific. This is the hard truth. The self-verification prompt that works with Claude 3.7 needs adjustment for Nova Pro. The loop detection threshold that prevents doom loops on Sonnet causes premature interruptions on a model that naturally revisits files more. When you upgrade your model, budget time to re-tune the harness. The LangChain team explicitly noted this — they ran a separate improvement loop for each model on their leaderboard. This isn't a corner case, it's the default.

Middleware creates latency. Every hook I've described adds round-trip time. The verification subgraph is an entire additional model call. The loop detector hits DynamoDB on every tool call. In batch processing this doesn't matter. In a user-facing application where someone is waiting, you need to profile your harness overhead and decide which components are worth it for your latency budget.

Your guardrails will eventually work against you. The loop detection middleware I described intervenes when a file is edited four times. That threshold was appropriate for the agent behavior I observed six months ago. As models improve and become more methodically iterative by default, that threshold may interrupt legitimate behavior. Harness components should be versioned and reviewed on the same cadence as model upgrades. Build in the ability to disable individual components via feature flags.

The circuit breaker state needs maintenance. Open circuits don't always heal themselves cleanly. Downstream APIs sometimes recover in a way that doesn't trigger the half-open probe. Build a manual override mechanism — a Lambda or admin endpoint that can force a circuit closed — so on-call engineers have a tool when things go sideways at 2 AM.

Token refresh logic is trickier than it looks. The implementation I showed handles the common case cleanly. It does not handle: refresh tokens that are also expired, multi-tenant scenarios where different agent sub-tasks run as different identities, or the race condition where two parallel tool calls both detect an expired token and both try to refresh simultaneously. For a production system, that last one requires a distributed lock or a dedicated token service. Don't discover this issue in production.


Reference Architecture

                        ┌──────────────────────────────────────┐
                        │           Agent Application           │
                        │                                       │
            ┌───────────┴───────────────────────────────┐       │
            │             AgentHarness Runtime          │       │
            │                                           │       │
            │  ┌─────────────────────────────────────┐  │       │
            │  │        LangGraph Orchestrator       │  │       │
            │  │  planning → executing → verifying   │  │       │
            │  └──────┬─────────────┬────────────────┘  │       │
            │         │             │                   │       │
            │  ┌──────▼──────┐  ┌───▼──────────────┐    │       │
            │  │  Reasoning  │  │   Middleware     │    │       │
            │  │   Router    │  │   Loop Detection │    │       │
            │  │  Claude 3.7 │  │   Verification   │    │       │
            │  │  Claude 3.5 │  │   Auth Propagate │    │       │
            │  └──────┬──────┘  └──────────────────┘    │       │
            │         │                                  │      │
            │  ┌──────▼───────────────────────────────┐  │      │
            │  │         AWS Bedrock Runtime          │  │      │
            │  │  Model Invocation + ExtendedThinking │  │      │
            │  └──────────────────────────────────────┘  │      │
            │                                            │      │
            │  ┌──────────────────────────────────────┐  │      │
            │  │       Tool Execution Layer           │  │      │
            │  │  Auth Wrapper → Retry → Circ.Breaker │  │      │
            │  └─────────┬──────────────────┬─────────┘  │      │
            └────────────┼──────────────────┼────────────┘      │
                         │                  │                   │
            ┌────────────▼───┐  ┌───────────▼──────────────┐    │
            │  AWS Services  │  │   External APIs / Tools  │    │
            │  Secrets Mgr   │  │   (protected by circuit  │    │
            │  Knowledge Base│  │    breaker + JWT auth)   │    │
            │  DynamoDB      │  └──────────────────────────┘    │
            │  (loop + circ) │                                  │
            └────────────────┘                                  │
                                                                │
            ┌───────────────────────────────────────────────────┘
            │  Obser. (LangSmith / LangFuse / CloudWatch)       │
            │  Traces: everymodelcall, toolcall, midl. event.   │
            └───────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

Reference: LangGraph multi-agent orchestration patterns: https://langchain-ai.github.io/langgraph/concepts/multi_agent/
Reference: AWS Bedrock Guardrails (complementary safety layer): https://docs.aws.amazon.com/bedrock/latest/userguide/guardrails.html
Reference: Original LangChain harness engineering research: https://blog.langchain.com/harness-engineering/


Reference Infrastructure Stack

+---------------------------+---------------------+----------------------------+
| Component                 | Technology          | Role in Harness            |
+---------------------------+---------------------+----------------------------+
| Orchestration             | LangGraph 0.2+      | State machine, graph edges,|
|                           |                     | conditional routing        |
+---------------------------+---------------------+----------------------------+
| Model Runtime             | AWS Bedrock         | Claude 3.7/3.5, Nova Pro,  |
|                           |                     | extended thinking support  |
+---------------------------+---------------------+----------------------------+
| Reasoning Router          | Custom (boto3)      | Phase-aware model + budget |
|                           |                     | selection                  |
+---------------------------+---------------------+----------------------------+
| Context Retrieval         | Bedrock Knowledge   | Pre-run environment        |
|                           | Bases               | bootstrap                  |
+---------------------------+---------------------+----------------------------+
| Loop Detection State      | DynamoDB            | Shared across sub-agents,  |
|                           |                     | TTL-managed                |
+---------------------------+---------------------+----------------------------+
| Circuit Breaker State     | DynamoDB            | Per-tool, survives restarts|
+---------------------------+---------------------+----------------------------+
| Credential Management     | Secrets Manager +   | JWT lifecycle, refresh,    |
|                           | Custom Manager      | role propagation           |
+---------------------------+---------------------+----------------------------+
| Retry Policies            | Tenacity            | Separate policies for model|
|                           |                     | vs tool failures           |
+---------------------------+---------------------+----------------------------+
| Observability             | LangSmith /         | Full trace capture, token  |
|                           | LangFuse + CW Logs  | counts, phase timing       |
+---------------------------+---------------------+----------------------------+
| Evaluation                | AWS AgentCore Evals | Post-run quality scoring   |
|                           | + Ragas/DeepEval    |                            |
+---------------------------+---------------------+----------------------------+
| Local Dev Alternative     | Ollama + Docker     | Run verification + routing |
|                           | Compose             | without Bedrock costs      |
+---------------------------+---------------------+----------------------------+
| Infrastructure as Code    | Terraform           | DynamoDB tables, IAM roles,|
|                           |                     | Lambda for context Lambda  |
+---------------------------+---------------------+----------------------------+
Enter fullscreen mode Exit fullscreen mode

This is Article 5 in the Agentic Architectures series. Previous articles covered the Agentic AI Maturity Model, Advanced Coordination and Reasoning Patterns, AgentOps and Observability, and Agentic Protocols (MCP and A2A).

Other Articles


Tags: AgenticAI, AWSBedrock, LangGraph, SoftwareArchitecture, MachineLearning, LLMOps, AIEngineering, CloudArchitecture, PythonProgramming, ArtificialIntelligence

Top comments (0)