Agentic Architectures — Article 5: Harness Engineering and the Agent Runtime Layer
There's a specific kind of frustration that only agent builders know. You've spent two weeks tuning your LLM. Your evals look clean. You demo it to your team and it works beautifully. Then you push it to production and watch it slowly destroy itself — calling the same tool eleven times, confidently returning a result it never actually verified, or hanging indefinitely because a downstream API returned a 401 and nobody taught it what to do next.
That happened to me on a document analysis agent I built earlier this year. The model was fine. Claude Sonnet was doing exactly what I asked. The problem was everything around it — the runtime layer I'd barely thought about. I had a prompt and some tools. I did not have a harness.
This article is about what I learned building one, and how to build yours on AWS Bedrock with LangGraph as the orchestration backbone. I'll be drawing on a pattern LangChain published around their deepagents-cli work — they moved a coding agent from 52.8% to 66.5% on Terminal Bench 2.0 without changing the model. They only changed the harness. That result is worth taking seriously: read their post here.
We'll go further than their benchmark setup. We'll add the production concerns they didn't cover: identity propagation, JWT handling at the tool boundary, retry policies that distinguish model failures from tool failures, and circuit breakers whose state survives agent restarts.
What Is the Harness, Really?
The harness is everything that wraps the model at runtime. It's not your application code. It's not your prompt templates living in a file. It's the live, executing system that receives the model's output, decides what to do with it, prepares the next input, and enforces the constraints your application needs.
Think of it this way. Your model is a very capable but somewhat unpredictable engine. The harness is the drivetrain — it takes raw engine output and translates it into controlled motion. Without it, you have power but no direction.
┌─────────────────────────────────────────────────────────┐
│ Agent Application │
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ HARNESS LAYER │ │
│ │ │ │
│ │ System Prompt Architecture │ │
│ │ ┌───────────────────────────────────────────┐ │ │
│ │ │ Middleware / Hooks │ │ │
│ │ │ ┌─────────────────────────────────────┐ │ │ │
│ │ │ │ MODEL (Bedrock) │ │ │ │
│ │ │ │ Claude / Nova / Titan │ │ │ │
│ │ │ └─────────────────────────────────────┘ │ │ │
│ │ │ ┌─────────────────────────────────────┐ │ │ │
│ │ │ │ Tool Execution │ │ │ │
│ │ │ │ Auth │ Retry │ Circuit Breaker │ │ │ │
│ │ │ └─────────────────────────────────────┘ │ │ │
│ │ └───────────────────────────────────────────┘ │ │
│ │ Reasoning Budget Allocator │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
The LangChain research identified three primary "knobs" in a harness: system prompt, tools, and middleware. I've found it useful to think in four levers, because reasoning compute allocation is distinct enough to warrant its own category:
+---------------------------+-------------------------------------------+
| Lever | What It Controls |
+---------------------------+-------------------------------------------+
| System Prompt Architecture| Intent, constraints, verification |
| | guidance, env context injection |
+---------------------------+-------------------------------------------+
| Tool Design & Availability| What the agent can reach, how calls are |
| | structured, auth at the boundary |
+---------------------------+-------------------------------------------+
| Middleware / Lifecycle | Pre/post model call hooks, loop detection,|
| Hooks | verification intercepts, identity prop. |
+---------------------------+-------------------------------------------+
| Reasoning Compute | Which model per phase, thinking budget, |
| Allocation | cost-aware routing |
+---------------------------+-------------------------------------------+
Let's build each of these out concretely.
Setting Up: Project Structure and Dependencies
Before we get into each layer, here's the baseline project structure this article assumes:
agent-harness/
├── harness/
│ ├── __init__.py
│ ├── graph.py # LangGraph state machine
│ ├── middleware/
│ │ ├── __init__.py
│ │ ├── verification.py # Self-verification hooks
│ │ ├── loop_detection.py # Doom loop prevention
│ │ ├── auth.py # JWT + credential management
│ │ └── retry.py # Retry + circuit breaker
│ ├── tools/
│ │ ├── __init__.py
│ │ ├── base.py # ToolWrapper with auth + retry
│ │ └── registry.py # Tool registry with circuit state
│ ├── context/
│ │ ├── __init__.py
│ │ └── bootstrap.py # Environment context injection
│ └── routing/
│ ├── __init__.py
│ └── reasoning.py # Phase-aware model routing
├── infrastructure/
│ └── dynamodb_tables.tf # Terraform for loop + circuit state
├── tests/
└── requirements.txt
Core dependencies:
# requirements.txt
langchain>=0.3.0
langchain-aws>=0.2.0
langchain-community>=0.3.0
langgraph>=0.2.0
boto3>=1.35.0
pyjwt>=2.8.0
cryptography>=42.0.0
tenacity>=8.2.0
For local development without Bedrock costs, we'll also support Ollama:
# requirements-local.txt
langchain-ollama>=0.2.0
ollama>=0.3.0
Part 1: Self-Verification Loop
This is the highest-leverage change LangChain made. The failure pattern is subtle and very common: the agent writes a solution, re-reads its own code, decides it looks reasonable, and exits. It never ran anything. It verified its logic against its own reasoning, which is circular.
The fix is a PreCompletionChecklistMiddleware — an intercept that fires before the agent signals completion and forces a structured verification pass.
Here's how I implement this in LangGraph with Bedrock.
First, define the agent state to track verification:
# harness/graph.py
from typing import TypedDict, Annotated, List, Optional
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage
class AgentState(TypedDict):
messages: Annotated[List[BaseMessage], add_messages]
phase: str # planning | executing | verifying
verification_passed: bool
verification_attempts: int
tool_call_counts: dict # for loop detection
agent_run_id: str
identity_context: dict # JWT claims, propagated to tools
circuit_states: dict # per-tool circuit breaker state
Now build the verification subgraph. The key insight is that verification is its own reasoning pass — not just "check if the answer is non-empty":
# harness/middleware/verification.py
from langchain_aws import ChatBedrock
from langchain_core.messages import SystemMessage, HumanMessage
from harness.graph import AgentState
import boto3
VERIFICATION_SYSTEM_PROMPT = """
You are a strict verification agent. Your job is NOT to produce a solution.
Your job is to verify that the solution already produced is correct and complete.
For every verification request you must:
1. Re-read the original task specification carefully
2. Check that the solution addresses ALL requirements, not just the ones that
seemed easiest
3. Identify any edge cases that were not handled
4. Check that any file paths, function signatures, or API contracts match
what was specified, not what seemed convenient
5. If the solution involves code, mentally trace through at least two execution
paths: the happy path and one error path
Return a JSON object with:
{
"passed": true | false,
"issues": ["list of specific issues if failed"],
"recommendation": "COMPLETE | RETRY_WITH_FIXES | ESCALATE"
}
"""
def build_verification_node(model_id: str = "anthropic.claude-3-5-sonnet-20241022-v2:0"):
"""
Returns a LangGraph node function that performs structured verification.
Uses a separate model call with a focused verification system prompt.
"""
bedrock_client = boto3.client("bedrock-runtime", region_name="us-east-1")
verifier = ChatBedrock(
client=bedrock_client,
model_id=model_id,
model_kwargs={
"temperature": 0, # Verification should be deterministic
"max_tokens": 1024,
}
)
def verification_node(state: AgentState) -> AgentState:
# Extract the task spec and proposed solution from message history
task_spec = extract_task_spec(state["messages"])
proposed_solution = extract_last_agent_output(state["messages"])
verification_prompt = f"""
ORIGINAL TASK:
{task_spec}
PROPOSED SOLUTION:
{proposed_solution}
Verify this solution. Return only the JSON object described in your instructions.
"""
response = verifier.invoke([
SystemMessage(content=VERIFICATION_SYSTEM_PROMPT),
HumanMessage(content=verification_prompt)
])
import json
try:
result = json.loads(response.content)
except json.JSONDecodeError:
# Model didn't follow format — treat as failed verification
result = {
"passed": False,
"issues": ["Verification model returned unparseable output"],
"recommendation": "RETRY_WITH_FIXES"
}
if result["passed"]:
return {
**state,
"phase": "complete",
"verification_passed": True,
}
else:
# Inject verification feedback back into message history
# so the agent can see exactly what it missed
feedback_message = HumanMessage(
content=f"Verification failed. Issues found:\n" +
"\n".join(f"- {issue}" for issue in result["issues"]) +
f"\n\nPlease address these issues and try again."
)
return {
**state,
"messages": state["messages"] + [feedback_message],
"phase": "executing",
"verification_passed": False,
"verification_attempts": state.get("verification_attempts", 0) + 1,
}
return verification_node
def should_verify(state: AgentState) -> str:
"""
Routing function: intercepts AgentFinish and forces verification.
This is the PreCompletionChecklistMiddleware equivalent.
"""
if state.get("verification_passed"):
return "complete"
max_attempts = 3
if state.get("verification_attempts", 0) >= max_attempts:
# Don't loop forever — escalate after max attempts
return "escalate"
return "verify"
For local development, swap the Bedrock client for Ollama — the verification logic is identical:
# Local dev alternative using Ollama
# pip install langchain-ollama
from langchain_ollama import ChatOllama
def build_verification_node_local(model: str = "llama3.1:8b"):
"""
Same verification logic, Ollama backend for local testing.
Run: ollama pull llama3.1:8b
"""
verifier = ChatOllama(
model=model,
temperature=0,
format="json", # Ollama's native JSON mode
)
# ... rest of implementation identical
Wire it into the LangGraph state machine:
# harness/graph.py
from langgraph.graph import StateGraph, END
from harness.middleware.verification import build_verification_node, should_verify
def build_agent_graph():
graph = StateGraph(AgentState)
graph.add_node("agent", agent_node)
graph.add_node("tools", tool_node)
graph.add_node("verify", build_verification_node())
graph.add_node("escalate", escalation_node)
graph.set_entry_point("agent")
graph.add_conditional_edges(
"agent",
route_agent_output, # tools | should_verify | end
{
"tools": "tools",
"verify": "verify",
"end": END,
}
)
graph.add_conditional_edges(
"verify",
should_verify,
{
"complete": END,
"verify": "agent", # feed issues back and re-run
"escalate": "escalate",
}
)
graph.add_edge("tools", "agent")
return graph.compile(checkpointer=MemorySaver())
Reference: LangGraph docs on conditional edges and state management: https://langchain-ai.github.io/langgraph/concepts/low_level/
Part 2: Context Injection as Infrastructure
Agents fail in unfamiliar environments for a simple reason: they spend the first several steps discovering context they should have arrived with. Every discovery step is a potential failure point, a source of latency, and a token cost you're paying for nothing.
The fix is a context bootstrap that runs before the first reasoning step and injects a structured environment summary directly into the system prompt.
# harness/context/bootstrap.py
import boto3
import json
from typing import Optional
from langchain_core.messages import SystemMessage
class ContextBootstrap:
"""
Runs at agent startup. Discovers environment state and injects it
into the agent's initial context so the first reasoning step
can focus on the task, not on discovery.
"""
def __init__(
self,
knowledge_base_id: Optional[str] = None,
region: str = "us-east-1"
):
self.bedrock_agent = boto3.client(
"bedrock-agent-runtime",
region_name=region
)
self.knowledge_base_id = knowledge_base_id
self.secrets_client = boto3.client("secretsmanager", region_name=region)
def build_environment_context(
self,
task_spec: str,
available_tools: list[str],
constraints: dict,
) -> str:
"""
Assembles the environment context block injected at agent start.
"""
# Retrieve relevant docs from Knowledge Base if configured
kb_context = ""
if self.knowledge_base_id:
kb_context = self._retrieve_from_knowledge_base(task_spec)
tool_manifest = self._build_tool_manifest(available_tools)
constraint_summary = self._format_constraints(constraints)
return f"""
## Environment Context (injected at startup — do not re-discover)
### Available Tools
{tool_manifest}
### Constraints
{constraint_summary}
### Relevant Knowledge Base Context
{kb_context if kb_context else "No KB context retrieved for this task."}
### Verification Standards
Your work will be evaluated programmatically. This means:
- File paths must match specifications exactly, including case and extension
- Function signatures must match the interface contract, not your preference
- Any output files must be written to the exact paths specified
- Tests must pass in an automated runner, not just look correct to you
### Problem Solving Protocol
1. PLAN: Read the full task. Identify all requirements including edge cases.
2. BUILD: Implement with testability in mind from the first line.
3. VERIFY: Run your solution. Read the full output. Compare against the spec.
4. FIX: If verification fails, re-read the spec before touching the code.
Do not skip from BUILD to reporting completion.
"""
def _retrieve_from_knowledge_base(self, query: str) -> str:
"""Retrieve relevant context from Bedrock Knowledge Base."""
try:
response = self.bedrock_agent.retrieve(
knowledgeBaseId=self.knowledge_base_id,
retrievalQuery={"text": query},
retrievalConfiguration={
"vectorSearchConfiguration": {"numberOfResults": 5}
}
)
chunks = [
r["content"]["text"]
for r in response.get("retrievalResults", [])
if r.get("score", 0) > 0.6 # Only high-confidence retrievals
]
return "\n\n".join(chunks)
except Exception as e:
# KB retrieval failure should never crash the agent
return f"[KB retrieval unavailable: {str(e)}]"
def _build_tool_manifest(self, tool_names: list[str]) -> str:
lines = []
for name in tool_names:
lines.append(f" - {name}")
return "\n".join(lines)
def _format_constraints(self, constraints: dict) -> str:
lines = []
for key, value in constraints.items():
lines.append(f" - {key}: {value}")
return "\n".join(lines)
Inject this at graph entry point:
# harness/graph.py
def build_agent_graph(
task_spec: str,
knowledge_base_id: Optional[str] = None
):
bootstrap = ContextBootstrap(knowledge_base_id=knowledge_base_id)
env_context = bootstrap.build_environment_context(
task_spec=task_spec,
available_tools=["read_file", "write_file", "run_tests", "search_docs"],
constraints={
"max_execution_time": "300s",
"max_tool_calls": 50,
"verification_required": True,
}
)
# This becomes part of the initial state, not a runtime discovery
initial_state = AgentState(
messages=[SystemMessage(content=BASE_SYSTEM_PROMPT + env_context)],
phase="planning",
verification_passed=False,
verification_attempts=0,
tool_call_counts={},
agent_run_id=str(uuid.uuid4()),
identity_context={},
circuit_states={},
)
return graph, initial_state
Reference: AWS Bedrock Knowledge Bases retrieval API: https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-retrieve.html
Part 3: Loop Detection and Doom Loop Prevention
I've seen this pattern in traces more times than I care to admit: the agent makes a small edit to a file, runs a test, sees it fail, makes a nearly identical edit to the same file, runs the test, sees it fail again. Repeat ten times. Each iteration is a variation of the same broken approach.
LangChain calls these "doom loops" and addresses them with a LoopDetectionMiddleware that tracks per-file edit counts. I've extended this to be state-aware and externalized to DynamoDB — which matters in multi-agent and parallel execution scenarios.
First, the DynamoDB table (Terraform):
# infrastructure/dynamodb_tables.tf
resource "aws_dynamodb_table" "agent_loop_state" {
name = "agent-loop-state"
billing_mode = "PAY_PER_REQUEST"
hash_key = "run_id"
range_key = "resource_key"
attribute {
name = "run_id"
type = "S"
}
attribute {
name = "resource_key"
type = "S"
}
ttl {
attribute_name = "expires_at"
enabled = true
}
tags = {
Environment = "production"
Component = "agent-harness"
}
}
Now the loop detection middleware:
# harness/middleware/loop_detection.py
import boto3
import time
from typing import Optional
from langchain_core.messages import HumanMessage
from harness.graph import AgentState
class LoopDetectionMiddleware:
"""
Tracks tool call patterns per agent run using DynamoDB.
External state is critical for multi-agent and parallel execution —
if two branches of the same agent are both stuck in a loop,
an in-memory counter wouldn't catch it.
"""
LOOP_THRESHOLD = 4 # Same resource edited this many times → intervene
SIMILARITY_WINDOW = 6 # Look at the last N tool calls for pattern detection
TTL_SECONDS = 3600 # State expires after 1 hour
def __init__(self, table_name: str = "agent-loop-state", region: str = "us-east-1"):
self.dynamodb = boto3.resource("dynamodb", region_name=region)
self.table = self.dynamodb.Table(table_name)
def record_tool_call(
self,
run_id: str,
tool_name: str,
resource_key: str, # file path, API endpoint, etc.
) -> int:
"""
Increments the call count for (run_id, resource_key).
Returns the new count.
"""
expires_at = int(time.time()) + self.TTL_SECONDS
response = self.table.update_item(
Key={"run_id": run_id, "resource_key": resource_key},
UpdateExpression=(
"SET call_count = if_not_exists(call_count, :zero) + :inc, "
"tool_name = :tool, "
"expires_at = :ttl"
),
ExpressionAttributeValues={
":zero": 0,
":inc": 1,
":tool": tool_name,
":ttl": expires_at,
},
ReturnValues="UPDATED_NEW",
)
return int(response["Attributes"]["call_count"])
def check_and_inject(self, state: AgentState) -> AgentState:
"""
Called as a LangGraph node before each tool execution.
If a doom loop is detected, injects a reconsideration message.
"""
last_tool_call = extract_last_tool_call(state["messages"])
if not last_tool_call:
return state
tool_name = last_tool_call.get("name")
resource_key = last_tool_call.get("args", {}).get("path") or \
last_tool_call.get("args", {}).get("endpoint") or \
tool_name # Fallback to tool name if no specific resource
count = self.record_tool_call(
run_id=state["agent_run_id"],
tool_name=tool_name,
resource_key=resource_key,
)
if count >= self.LOOP_THRESHOLD:
intervention_message = HumanMessage(
content=f"""
[LOOP DETECTION] You have interacted with '{resource_key}' {count} times
in this session. This suggests your current approach may not be working.
Before proceeding:
1. Step back and re-read the original task specification
2. Identify the root cause of the repeated failures — not the symptoms
3. Consider a fundamentally different approach
4. If you're stuck, explain what you've tried and what's blocking you
Do not make another small variation of the same attempt.
"""
)
return {
**state,
"messages": state["messages"] + [intervention_message],
}
return state
def get_loop_summary(self, run_id: str) -> dict:
"""
Returns a summary of loop patterns for a completed run.
Useful for post-run analysis and harness improvement.
"""
response = self.table.query(
KeyConditionExpression="run_id = :rid",
ExpressionAttributeValues={":rid": run_id},
)
return {
item["resource_key"]: item["call_count"]
for item in response.get("Items", [])
if item.get("call_count", 0) > 1
}
The loop detector also integrates with the circuit breaker layer — if a tool is in an open circuit state and the agent keeps trying to call it anyway, that registers as a loop. We'll connect these in Part 5.
Part 4: Reasoning Budget Management
This is the lever I was most skeptical about initially. Managing reasoning compute sounds like premature optimization. It isn't. On Bedrock, the cost and latency difference between a fast model for simple execution steps and a slow model for complex planning steps is substantial enough to matter at scale.
LangChain's finding — a "reasoning sandwich" of max-budget reasoning at planning and verification, medium at execution — maps cleanly to model selection on Bedrock.
# harness/routing/reasoning.py
from enum import Enum
from dataclasses import dataclass
from langchain_aws import ChatBedrock
import boto3
class AgentPhase(Enum):
PLANNING = "planning"
EXECUTING = "executing"
VERIFYING = "verifying"
DEBUGGING = "debugging"
@dataclass
class ModelConfig:
model_id: str
max_tokens: int
temperature: float
thinking_budget: Optional[int] # For extended thinking on Claude 3.7+
description: str
# Phase-to-model mapping for Bedrock
# Adjust model IDs as new versions become available
PHASE_MODEL_MAP = {
AgentPhase.PLANNING: ModelConfig(
model_id="anthropic.claude-3-7-sonnet-20250219-v1:0",
max_tokens=16000,
temperature=0.2,
thinking_budget=8000, # Extended thinking for deep planning
description="Slow, thorough — plan the whole approach before touching tools"
),
AgentPhase.EXECUTING: ModelConfig(
model_id="anthropic.claude-3-5-sonnet-20241022-v2:0",
max_tokens=8000,
temperature=0,
thinking_budget=None, # No extended thinking for execution steps
description="Fast, precise — follow the plan, call tools correctly"
),
AgentPhase.VERIFYING: ModelConfig(
model_id="anthropic.claude-3-7-sonnet-20250219-v1:0",
max_tokens=8000,
temperature=0,
thinking_budget=4000, # Moderate thinking for verification
description="Thorough check — don't rubber-stamp the execution output"
),
AgentPhase.DEBUGGING: ModelConfig(
model_id="anthropic.claude-3-7-sonnet-20250219-v1:0",
max_tokens=16000,
temperature=0.3,
thinking_budget=10000, # Maximum reasoning for stuck situations
description="Deep diagnosis — something is wrong and we need to find it"
),
}
class ReasoningRouter:
"""
Selects the appropriate Bedrock model and configuration
based on the current agent phase.
"""
def __init__(self, region: str = "us-east-1"):
self.bedrock_client = boto3.client("bedrock-runtime", region_name=region)
self._model_cache: dict[AgentPhase, ChatBedrock] = {}
def get_model(self, phase: AgentPhase) -> ChatBedrock:
if phase not in self._model_cache:
config = PHASE_MODEL_MAP[phase]
model_kwargs = {
"temperature": config.temperature,
"max_tokens": config.max_tokens,
}
# Extended thinking only on Claude 3.7+
if config.thinking_budget:
model_kwargs["thinking"] = {
"type": "enabled",
"budget_tokens": config.thinking_budget
}
self._model_cache[phase] = ChatBedrock(
client=self.bedrock_client,
model_id=config.model_id,
model_kwargs=model_kwargs,
)
return self._model_cache[phase]
def get_phase_from_state(self, state: AgentState) -> AgentPhase:
phase_str = state.get("phase", "executing")
try:
return AgentPhase(phase_str)
except ValueError:
return AgentPhase.EXECUTING
Approximate cost/latency profile across phases (AWS Bedrock on-demand pricing, us-east-1, approximate as of mid-2025):
+------------------+---------------------------+----------+-----------+-----------+
| Phase | Model | Approx. | Approx. | Thinking |
| | | Input | Output | Budget |
| | | $/1M tok | $/1M tok | Tokens |
+------------------+---------------------------+----------+-----------+-----------+
| PLANNING | Claude 3.7 Sonnet | $3.00 | $15.00 | 8,000 |
| EXECUTING | Claude 3.5 Sonnet v2 | $3.00 | $15.00 | None |
| VERIFYING | Claude 3.7 Sonnet | $3.00 | $15.00 | 4,000 |
| DEBUGGING | Claude 3.7 Sonnet | $3.00 | $15.00 | 10,000 |
+------------------+---------------------------+----------+-----------+-----------+
Note: Thinking tokens billed at output rate. Budget ≠ tokens used — model
decides how much thinking to apply within the budget ceiling.
The key principle: you're not saving money by using a cheaper model for execution. You're spending budget more intentionally — reserving the deep reasoning capacity for the moments where it actually changes the outcome.
Reference: AWS Bedrock pricing and model catalog: https://aws.amazon.com/bedrock/pricing/
Reference: Claude extended thinking on Bedrock: https://docs.aws.amazon.com/bedrock/latest/userguide/inference-extended-thinking.html
Part 5: Production Hardening — Auth, Retry, and Circuit Breakers
This is the part most harness engineering articles skip. They show you how to make the agent smart. They don't show you what happens when the JWT expires mid-run, or when a downstream API starts returning 503s and the agent keeps retrying against it for fifteen minutes.
5.1 Identity Propagation and JWT at the Tool Boundary
The most important architectural decision here: authentication happens at the tool boundary, not inside the agent reasoning loop.
The agent should never see raw credentials. It doesn't need to. It operates on an identity context that was resolved before it started running, and that context gets propagated transparently into every tool call.
# harness/middleware/auth.py
import boto3
import jwt
import time
import requests
import json
from typing import Optional
from dataclasses import dataclass
from functools import lru_cache
@dataclass
class IdentityContext:
"""
Resolved identity for an agent run.
Attached to agent state, propagated to all tool calls.
"""
subject: str # user ID or service ID
roles: list[str] # ["analyst", "data-reader"] etc.
tenant_id: str
access_token: str # short-lived JWT
refresh_token: Optional[str]
token_expires_at: float # Unix timestamp
class CredentialManager:
"""
Manages JWT lifecycle for a single agent run.
Tools call this to get a valid token — they never store tokens themselves.
"""
# Refresh when less than 5 minutes remain on the token
REFRESH_THRESHOLD_SECONDS = 300
def __init__(
self,
token_endpoint: str,
client_id_secret: str, # ARN of Secrets Manager secret
region: str = "us-east-1"
):
self.token_endpoint = token_endpoint
self.region = region
self._secrets_client = boto3.client("secretsmanager", region_name=region)
self._client_credentials = self._load_client_credentials(client_id_secret)
self._current_context: Optional[IdentityContext] = None
def _load_client_credentials(self, secret_arn: str) -> dict:
"""Load client_id/client_secret from Secrets Manager."""
response = self._secrets_client.get_secret_value(SecretId=secret_arn)
return json.loads(response["SecretString"])
def get_valid_token(self) -> str:
"""
Returns a valid access token. Refreshes automatically if needed.
This is the only method tools should call.
"""
if self._current_context is None:
raise RuntimeError("CredentialManager not initialized. Call initialize() first.")
# Check if we need to refresh
if self._should_refresh():
self._refresh_token()
return self._current_context.access_token
def _should_refresh(self) -> bool:
if not self._current_context:
return True
remaining = self._current_context.token_expires_at - time.time()
return remaining < self.REFRESH_THRESHOLD_SECONDS
def _refresh_token(self):
"""Refresh using the refresh token or client credentials flow."""
ctx = self._current_context
if ctx and ctx.refresh_token:
# Use refresh token if available
payload = {
"grant_type": "refresh_token",
"refresh_token": ctx.refresh_token,
"client_id": self._client_credentials["client_id"],
"client_secret": self._client_credentials["client_secret"],
}
else:
# Fall back to client credentials
payload = {
"grant_type": "client_credentials",
"client_id": self._client_credentials["client_id"],
"client_secret": self._client_credentials["client_secret"],
"scope": "agent:execute",
}
response = requests.post(self.token_endpoint, data=payload, timeout=10)
response.raise_for_status()
token_data = response.json()
# Decode to extract claims without full verification
# (we trust our own token endpoint; full verification would re-verify
# the JWKS on every call which is unnecessarily expensive)
claims = jwt.decode(
token_data["access_token"],
options={"verify_signature": False}
)
self._current_context = IdentityContext(
subject=claims.get("sub", ""),
roles=claims.get("roles", []),
tenant_id=claims.get("tenant_id", ""),
access_token=token_data["access_token"],
refresh_token=token_data.get("refresh_token"),
token_expires_at=claims.get("exp", time.time() + 3600),
)
def initialize(self, initial_token: str, refresh_token: Optional[str] = None):
"""Call this before the agent run starts."""
claims = jwt.decode(
initial_token,
options={"verify_signature": False}
)
self._current_context = IdentityContext(
subject=claims.get("sub", ""),
roles=claims.get("roles", []),
tenant_id=claims.get("tenant_id", ""),
access_token=initial_token,
refresh_token=refresh_token,
token_expires_at=claims.get("exp", time.time() + 3600),
)
def check_role(self, required_role: str) -> bool:
if not self._current_context:
return False
return required_role in self._current_context.roles
Now wrap every tool with role checking and credential injection:
# harness/tools/base.py
from functools import wraps
from typing import Callable, Any
from harness.middleware.auth import CredentialManager
class AuthorizedToolWrapper:
"""
Wraps a tool function with role-based authorization.
The tool implementation never handles auth — the wrapper does.
"""
def __init__(
self,
tool_fn: Callable,
required_role: str,
credential_manager: CredentialManager,
):
self.tool_fn = tool_fn
self.required_role = required_role
self.credential_manager = credential_manager
self.__name__ = tool_fn.__name__
def __call__(self, *args, **kwargs) -> Any:
# Role check first — fail fast before any external calls
if not self.credential_manager.check_role(self.required_role):
raise PermissionError(
f"Agent identity does not have required role: {self.required_role}. "
f"Current roles: {self.credential_manager._current_context.roles}"
)
# Inject the current valid token into kwargs
kwargs["_auth_token"] = self.credential_manager.get_valid_token()
return self.tool_fn(*args, **kwargs)
# Example: wrapping a document reader tool
def _read_document_impl(document_id: str, _auth_token: str = None) -> str:
"""Internal implementation — receives token as injected kwarg."""
headers = {"Authorization": f"Bearer {_auth_token}"}
response = requests.get(
f"https://docs-api.internal/documents/{document_id}",
headers=headers,
timeout=30,
)
response.raise_for_status()
return response.json()["content"]
# Usage: wrap at registration time, not at call time
def build_tool_registry(credential_manager: CredentialManager) -> dict:
return {
"read_document": AuthorizedToolWrapper(
tool_fn=_read_document_impl,
required_role="document-reader",
credential_manager=credential_manager,
),
# "write_document": requires "document-writer" role
# "execute_query": requires "data-analyst" role
}
Reference: AWS Secrets Manager with Python: https://docs.aws.amazon.com/secretsmanager/latest/userguide/retrieving-secrets_cache-python.html
5.2 Retry Policies: Model Calls vs. Tool Calls
This is a distinction that took me longer to appreciate than it should have. Model call failures and tool call failures require different retry strategies because they have different failure modes.
+---------------------+------------------+---------------------------+--------+
| Failure Source | Error Type | Retry Strategy | Max |
+---------------------+------------------+---------------------------+--------+
| Bedrock model call | 429 throttling | Exp backoff + jitter, | 5 |
| | | fallback to smaller model | |
+---------------------+------------------+---------------------------+--------+
| Bedrock model call | 503 service | Exp backoff | 3 |
+---------------------+------------------+---------------------------+--------+
| External API (tool) | 429 rate limit | Exp backoff + jitter | 3 |
+---------------------+------------------+---------------------------+--------+
| External API (tool) | 401 unauthorized | NO RETRY — refresh token, | 1 |
| | | then single retry | |
+---------------------+------------------+---------------------------+--------+
| External API (tool) | 500 server error | Exp backoff | 2 |
+---------------------+------------------+---------------------------+--------+
| External API (tool) | 400 bad request | NO RETRY — agent logic | 0 |
| | | error, surface immediately| |
+---------------------+------------------+---------------------------+--------+
| External API (tool) | Connection error | Exp backoff | 3 |
+---------------------+------------------+---------------------------+--------+
Implementation using tenacity — the most production-tested retry library in the Python ecosystem:
# harness/middleware/retry.py
import time
import random
import logging
from typing import Callable, TypeVar, Optional
from tenacity import (
retry,
stop_after_attempt,
wait_exponential_jitter,
retry_if_exception,
before_sleep_log,
RetryError,
)
import boto3
import botocore.exceptions
import requests.exceptions
logger = logging.getLogger(__name__)
T = TypeVar("T")
def is_retryable_bedrock_error(exception: Exception) -> bool:
"""Identify transient Bedrock errors worth retrying."""
if isinstance(exception, botocore.exceptions.ClientError):
code = exception.response["Error"]["Code"]
return code in (
"ThrottlingException",
"ServiceUnavailableException",
"InternalServerException",
)
return False
def is_retryable_api_error(exception: Exception) -> bool:
"""Identify transient HTTP errors from external APIs."""
if isinstance(exception, requests.exceptions.ConnectionError):
return True
if isinstance(exception, requests.exceptions.HTTPError):
status = exception.response.status_code if exception.response else 0
# 429, 500, 502, 503, 504 are retryable; 400, 401, 403, 404 are not
return status in (429, 500, 502, 503, 504)
return False
# Model call retry — more patient, allows fallback model
model_retry = retry(
retry=retry_if_exception(is_retryable_bedrock_error),
wait=wait_exponential_jitter(initial=2, max=60, jitter=5),
stop=stop_after_attempt(5),
before_sleep=before_sleep_log(logger, logging.WARNING),
reraise=True,
)
# Tool/API call retry — stricter, less patient
tool_retry = retry(
retry=retry_if_exception(is_retryable_api_error),
wait=wait_exponential_jitter(initial=1, max=30, jitter=3),
stop=stop_after_attempt(3),
before_sleep=before_sleep_log(logger, logging.WARNING),
reraise=True,
)
class ModelCallWithFallback:
"""
Wraps a Bedrock model call with retry and fallback to a cheaper model
if the primary model is persistently throttled.
"""
def __init__(
self,
primary_model_id: str,
fallback_model_id: str,
region: str = "us-east-1",
):
client = boto3.client("bedrock-runtime", region_name=region)
from langchain_aws import ChatBedrock
self.primary = ChatBedrock(client=client, model_id=primary_model_id)
self.fallback = ChatBedrock(client=client, model_id=fallback_model_id)
@model_retry
def invoke_primary(self, messages):
return self.primary.invoke(messages)
def invoke(self, messages):
try:
return self.invoke_primary(messages)
except RetryError:
logger.warning(
"Primary model exhausted retries, falling back to: %s",
self.fallback.model_id
)
return self.fallback.invoke(messages)
5.3 Circuit Breakers in Multi-Agent Contexts
The circuit breaker pattern — closed (normal), open (failing, stop trying), half-open (testing recovery) — is well understood in microservices. What's different in agent systems is that circuit state needs to be shared across all concurrent agent activity for a given deployment.
If your agent spawns sub-agents or runs parallel tool execution branches, you do not want five concurrent branches all independently discovering that the same API is down. The first failure should open the circuit and inform all of them.
# harness/middleware/circuit_breaker.py
import boto3
import time
import json
import logging
from enum import Enum
from dataclasses import dataclass, asdict
from typing import Optional
logger = logging.getLogger(__name__)
class CircuitState(Enum):
CLOSED = "CLOSED" # Normal operation
OPEN = "OPEN" # Failing — reject calls immediately
HALF_OPEN = "HALF_OPEN" # Testing recovery — allow one probe call
@dataclass
class CircuitRecord:
tool_name: str
state: str
failure_count: int
last_failure_time: float
last_state_change: float
recovery_probe_sent: bool
class DynamoDBCircuitBreaker:
"""
Circuit breaker with state in DynamoDB.
Shared across all agent instances and sub-agents in the same deployment.
"""
FAILURE_THRESHOLD = 5 # Open circuit after this many failures
RECOVERY_TIMEOUT = 60 # Seconds before attempting half-open
SUCCESS_TO_CLOSE = 2 # Successful calls in half-open to re-close
def __init__(
self,
table_name: str = "agent-circuit-state",
region: str = "us-east-1"
):
self.dynamodb = boto3.resource("dynamodb", region_name=region)
self.table = self.dynamodb.Table(table_name)
def get_state(self, tool_name: str) -> CircuitState:
try:
response = self.table.get_item(Key={"tool_name": tool_name})
if "Item" not in response:
return CircuitState.CLOSED
record = CircuitRecord(**response["Item"])
# Check if an open circuit should transition to half-open
if record.state == CircuitState.OPEN.value:
age = time.time() - record.last_state_change
if age > self.RECOVERY_TIMEOUT:
self._transition_to_half_open(tool_name)
return CircuitState.HALF_OPEN
return CircuitState(record.state)
except Exception as e:
# Circuit breaker failure should never block execution
logger.error("Circuit breaker read failed: %s", e)
return CircuitState.CLOSED
def record_success(self, tool_name: str):
try:
response = self.table.get_item(Key={"tool_name": tool_name})
if "Item" not in response:
return
record = CircuitRecord(**response["Item"])
if record.state == CircuitState.HALF_OPEN.value:
# Count successes toward re-closing the circuit
# Simplified: one success closes it (adjust threshold as needed)
self._transition_to_closed(tool_name)
except Exception as e:
logger.error("Circuit breaker success recording failed: %s", e)
def record_failure(self, tool_name: str):
try:
now = time.time()
response = self.table.update_item(
Key={"tool_name": tool_name},
UpdateExpression=(
"SET failure_count = if_not_exists(failure_count, :zero) + :inc, "
"last_failure_time = :now, "
"last_state_change = if_not_exists(last_state_change, :now)"
),
ExpressionAttributeValues={
":zero": 0, ":inc": 1, ":now": Decimal(str(now))
},
ReturnValues="UPDATED_NEW",
)
new_count = int(response["Attributes"]["failure_count"])
if new_count >= self.FAILURE_THRESHOLD:
self._open_circuit(tool_name)
except Exception as e:
logger.error("Circuit breaker failure recording failed: %s", e)
def _open_circuit(self, tool_name: str):
logger.warning("Opening circuit for tool: %s", tool_name)
self.table.update_item(
Key={"tool_name": tool_name},
UpdateExpression="SET #s = :state, last_state_change = :now",
ExpressionAttributeNames={"#s": "state"},
ExpressionAttributeValues={
":state": CircuitState.OPEN.value,
":now": Decimal(str(time.time())),
},
)
def _transition_to_half_open(self, tool_name: str):
logger.info("Transitioning circuit to HALF_OPEN for tool: %s", tool_name)
self.table.update_item(
Key={"tool_name": tool_name},
UpdateExpression="SET #s = :state, last_state_change = :now",
ExpressionAttributeNames={"#s": "state"},
ExpressionAttributeValues={
":state": CircuitState.HALF_OPEN.value,
":now": Decimal(str(time.time())),
},
)
def _transition_to_closed(self, tool_name: str):
logger.info("Closing circuit for tool: %s", tool_name)
self.table.update_item(
Key={"tool_name": tool_name},
UpdateExpression=(
"SET #s = :state, failure_count = :zero, last_state_change = :now"
),
ExpressionAttributeNames={"#s": "state"},
ExpressionAttributeValues={
":state": CircuitState.CLOSED.value,
":zero": 0,
":now": Decimal(str(time.time())),
},
)
class ProtectedToolExecutor:
"""
Wraps tool calls with both circuit breaker and retry logic.
This is the outermost wrapper — one place where both concerns live.
"""
def __init__(
self,
circuit_breaker: DynamoDBCircuitBreaker,
loop_detector: Optional["LoopDetectionMiddleware"] = None,
):
self.circuit_breaker = circuit_breaker
self.loop_detector = loop_detector
def execute(self, tool_name: str, tool_fn: Callable, *args, **kwargs) -> Any:
state = self.circuit_breaker.get_state(tool_name)
if state == CircuitState.OPEN:
# Don't call the tool — surface the unavailability to the agent
raise ToolUnavailableError(
f"Tool '{tool_name}' circuit is OPEN (too many recent failures). "
f"Do not retry this tool. Consider an alternative approach "
f"or report that this capability is currently unavailable."
)
try:
result = tool_fn(*args, **kwargs)
self.circuit_breaker.record_success(tool_name)
return result
except ToolUnavailableError:
raise # Already handled above
except Exception as e:
self.circuit_breaker.record_failure(tool_name)
raise
class ToolUnavailableError(Exception):
"""
Raised when a tool's circuit is open.
The agent should treat this as a signal to reconsider its plan,
not as a transient error to retry.
"""
pass
Connect the circuit breaker to the loop detector — if the agent keeps attempting an open-circuit tool, that's a doom loop and should be caught:
# In loop_detection.py — extend check_and_inject
def check_and_inject(self, state: AgentState, circuit_breaker: DynamoDBCircuitBreaker) -> AgentState:
last_tool_call = extract_last_tool_call(state["messages"])
if not last_tool_call:
return state
tool_name = last_tool_call.get("name")
# Check circuit state before recording the call
circuit_state = circuit_breaker.get_state(tool_name)
if circuit_state == CircuitState.OPEN:
message = HumanMessage(
content=f"[CIRCUIT OPEN] '{tool_name}' is currently unavailable "
f"(circuit breaker open). You have attempted to call it despite "
f"this. Please reconsider your approach without relying on this tool."
)
return {**state, "messages": state["messages"] + [message]}
# Normal loop detection continues...
return self._check_loop_count(state, tool_name)
Reference: AWS DynamoDB conditional writes: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithItems.html#WorkingWithItems.ConditionalUpdate
Reference: Tenacity retry library: https://tenacity.readthedocs.io/
Part 6: Putting It All Together — The Full Harness Initialization
Here's how all these components assemble into a single agent run:
# harness/runtime.py
import uuid
from typing import Optional
from harness.graph import build_agent_graph, AgentState
from harness.middleware.auth import CredentialManager
from harness.middleware.loop_detection import LoopDetectionMiddleware
from harness.middleware.circuit_breaker import DynamoDBCircuitBreaker, ProtectedToolExecutor
from harness.middleware.verification import build_verification_node
from harness.context.bootstrap import ContextBootstrap
from harness.routing.reasoning import ReasoningRouter
from harness.tools.base import build_tool_registry
class AgentHarness:
"""
Top-level harness assembler. One instance per agent deployment.
Run instances are created via execute().
"""
def __init__(
self,
token_endpoint: str,
client_id_secret_arn: str,
knowledge_base_id: Optional[str] = None,
region: str = "us-east-1",
):
# Credential management
self.credential_manager = CredentialManager(
token_endpoint=token_endpoint,
client_id_secret=client_id_secret_arn,
region=region,
)
# Infrastructure components
self.loop_detector = LoopDetectionMiddleware(region=region)
self.circuit_breaker = DynamoDBCircuitBreaker(region=region)
self.tool_executor = ProtectedToolExecutor(
circuit_breaker=self.circuit_breaker,
loop_detector=self.loop_detector,
)
self.context_bootstrap = ContextBootstrap(
knowledge_base_id=knowledge_base_id,
region=region,
)
self.reasoning_router = ReasoningRouter(region=region)
# Tool registry (wrapped with auth + retry + circuit breaker)
self.tools = build_tool_registry(self.credential_manager)
def execute(
self,
task_spec: str,
initial_token: str,
refresh_token: Optional[str] = None,
) -> dict:
run_id = str(uuid.uuid4())
# Initialize credentials for this run
self.credential_manager.initialize(initial_token, refresh_token)
# Build context-enriched initial state
graph, initial_state = build_agent_graph(
task_spec=task_spec,
knowledge_base_id=self.context_bootstrap.knowledge_base_id,
run_id=run_id,
)
# Execute with full harness active
config = {"configurable": {"thread_id": run_id}}
final_state = graph.invoke(initial_state, config=config)
return {
"run_id": run_id,
"result": extract_final_result(final_state),
"verification_passed": final_state.get("verification_passed", False),
"loop_summary": self.loop_detector.get_loop_summary(run_id),
"phases_used": extract_phase_history(final_state),
}
Production Reality Check
Every series article has one of these, and this is where I try to be honest about where the ideas fall short.
Harnesses are model-specific. This is the hard truth. The self-verification prompt that works with Claude 3.7 needs adjustment for Nova Pro. The loop detection threshold that prevents doom loops on Sonnet causes premature interruptions on a model that naturally revisits files more. When you upgrade your model, budget time to re-tune the harness. The LangChain team explicitly noted this — they ran a separate improvement loop for each model on their leaderboard. This isn't a corner case, it's the default.
Middleware creates latency. Every hook I've described adds round-trip time. The verification subgraph is an entire additional model call. The loop detector hits DynamoDB on every tool call. In batch processing this doesn't matter. In a user-facing application where someone is waiting, you need to profile your harness overhead and decide which components are worth it for your latency budget.
Your guardrails will eventually work against you. The loop detection middleware I described intervenes when a file is edited four times. That threshold was appropriate for the agent behavior I observed six months ago. As models improve and become more methodically iterative by default, that threshold may interrupt legitimate behavior. Harness components should be versioned and reviewed on the same cadence as model upgrades. Build in the ability to disable individual components via feature flags.
The circuit breaker state needs maintenance. Open circuits don't always heal themselves cleanly. Downstream APIs sometimes recover in a way that doesn't trigger the half-open probe. Build a manual override mechanism — a Lambda or admin endpoint that can force a circuit closed — so on-call engineers have a tool when things go sideways at 2 AM.
Token refresh logic is trickier than it looks. The implementation I showed handles the common case cleanly. It does not handle: refresh tokens that are also expired, multi-tenant scenarios where different agent sub-tasks run as different identities, or the race condition where two parallel tool calls both detect an expired token and both try to refresh simultaneously. For a production system, that last one requires a distributed lock or a dedicated token service. Don't discover this issue in production.
Reference Architecture
┌──────────────────────────────────────┐
│ Agent Application │
│ │
┌───────────┴───────────────────────────────┐ │
│ AgentHarness Runtime │ │
│ │ │
│ ┌─────────────────────────────────────┐ │ │
│ │ LangGraph Orchestrator │ │ │
│ │ planning → executing → verifying │ │ │
│ └──────┬─────────────┬────────────────┘ │ │
│ │ │ │ │
│ ┌──────▼──────┐ ┌───▼──────────────┐ │ │
│ │ Reasoning │ │ Middleware │ │ │
│ │ Router │ │ Loop Detection │ │ │
│ │ Claude 3.7 │ │ Verification │ │ │
│ │ Claude 3.5 │ │ Auth Propagate │ │ │
│ └──────┬──────┘ └──────────────────┘ │ │
│ │ │ │
│ ┌──────▼───────────────────────────────┐ │ │
│ │ AWS Bedrock Runtime │ │ │
│ │ Model Invocation + ExtendedThinking │ │ │
│ └──────────────────────────────────────┘ │ │
│ │ │
│ ┌──────────────────────────────────────┐ │ │
│ │ Tool Execution Layer │ │ │
│ │ Auth Wrapper → Retry → Circ.Breaker │ │ │
│ └─────────┬──────────────────┬─────────┘ │ │
└────────────┼──────────────────┼────────────┘ │
│ │ │
┌────────────▼───┐ ┌───────────▼──────────────┐ │
│ AWS Services │ │ External APIs / Tools │ │
│ Secrets Mgr │ │ (protected by circuit │ │
│ Knowledge Base│ │ breaker + JWT auth) │ │
│ DynamoDB │ └──────────────────────────┘ │
│ (loop + circ) │ │
└────────────────┘ │
│
┌───────────────────────────────────────────────────┘
│ Obser. (LangSmith / LangFuse / CloudWatch) │
│ Traces: everymodelcall, toolcall, midl. event. │
└───────────────────────────────────────────────────┘
Reference: LangGraph multi-agent orchestration patterns: https://langchain-ai.github.io/langgraph/concepts/multi_agent/
Reference: AWS Bedrock Guardrails (complementary safety layer): https://docs.aws.amazon.com/bedrock/latest/userguide/guardrails.html
Reference: Original LangChain harness engineering research: https://blog.langchain.com/harness-engineering/
Reference Infrastructure Stack
+---------------------------+---------------------+----------------------------+
| Component | Technology | Role in Harness |
+---------------------------+---------------------+----------------------------+
| Orchestration | LangGraph 0.2+ | State machine, graph edges,|
| | | conditional routing |
+---------------------------+---------------------+----------------------------+
| Model Runtime | AWS Bedrock | Claude 3.7/3.5, Nova Pro, |
| | | extended thinking support |
+---------------------------+---------------------+----------------------------+
| Reasoning Router | Custom (boto3) | Phase-aware model + budget |
| | | selection |
+---------------------------+---------------------+----------------------------+
| Context Retrieval | Bedrock Knowledge | Pre-run environment |
| | Bases | bootstrap |
+---------------------------+---------------------+----------------------------+
| Loop Detection State | DynamoDB | Shared across sub-agents, |
| | | TTL-managed |
+---------------------------+---------------------+----------------------------+
| Circuit Breaker State | DynamoDB | Per-tool, survives restarts|
+---------------------------+---------------------+----------------------------+
| Credential Management | Secrets Manager + | JWT lifecycle, refresh, |
| | Custom Manager | role propagation |
+---------------------------+---------------------+----------------------------+
| Retry Policies | Tenacity | Separate policies for model|
| | | vs tool failures |
+---------------------------+---------------------+----------------------------+
| Observability | LangSmith / | Full trace capture, token |
| | LangFuse + CW Logs | counts, phase timing |
+---------------------------+---------------------+----------------------------+
| Evaluation | AWS AgentCore Evals | Post-run quality scoring |
| | + Ragas/DeepEval | |
+---------------------------+---------------------+----------------------------+
| Local Dev Alternative | Ollama + Docker | Run verification + routing |
| | Compose | without Bedrock costs |
+---------------------------+---------------------+----------------------------+
| Infrastructure as Code | Terraform | DynamoDB tables, IAM roles,|
| | | Lambda for context Lambda |
+---------------------------+---------------------+----------------------------+
This is Article 5 in the Agentic Architectures series. Previous articles covered the Agentic AI Maturity Model, Advanced Coordination and Reasoning Patterns, AgentOps and Observability, and Agentic Protocols (MCP and A2A).
Other Articles
- Article 1: https://topuzas.medium.com/agentic-architectures-article-1-the-agentic-ai-maturity-model-092f009cf2c0
- Article 2: https://topuzas.medium.com/agentic-architectures-article-1-advanced-coordination-and-reasoning-patterns-2beaa81ce6c3
- Article 3: https://topuzas.medium.com/agentic-architectures-article-3-agentops-861f3ca9eb6f
- Article 4: https://topuzas.medium.com/agentic-architectures-article-4-agentic-protocols-mcp-and-a2a-ca10832365e8
- Article 5: https://medium.com/@topuzas/agentic-architectures-article-5-harness-engineering-and-the-agent-runtime-layer-731414a3f4ed
Tags: AgenticAI, AWSBedrock, LangGraph, SoftwareArchitecture, MachineLearning, LLMOps, AIEngineering, CloudArchitecture, PythonProgramming, ArtificialIntelligence
Top comments (0)