The terminalhung at 94% for the third time that afternoon. Claude Code had been "thinking" for four minutes—spinning up a subprocess, spawning a subprocess, spawning a subprocess—each one inheriting a context window bloated with 140,000 tokens of hallucinated file paths, hallucinated imports, and the ghost of a requirements.txt that hadn't existed since Tuesday. I killed it. Again. Switched to Codex. Same repo. Same prompt. Codex spent three minutes writing a 400-line migration script for a table that didn't exist, hallucinating a created_at column with a default that would've violated a NOT NULL constraint on a 40-million-row production table. Killed it. Switched to OpenCode. Same repo. Same prompt. OpenCode spent six minutes writing a test suite for a function that didn't exist, mocking a dependency that had been deleted in the refactor I'd asked it to do.
Three agents. Three different failure modes. Same root cause: none of them knew what the codebase actually looked like right now.
That's the frontier agent OS problem. Not "which model is smartest." Not "which framework has the prettiest dashboard." The frontier agent OS problem is: how do you build a runtime that maintains a coherent, accurate, queryable model of a living codebase—across 200k tokens of context, 50+ tool calls, 47 file edits, and three agent handoffs—so that the next agent action is grounded in reality, not hallucination?
I've spent the last 18 months building agent orchestration runtimes. Not demos. Production systems that deploy code to production, that manage Kubernetes clusters, that refactor 200k-line monorepos, that debug production incidents at 3 AM. The gap between "Claude Code works great on this 500-line repo" and "Claude Code deploys a migration to production without dropping a column" is not a model quality problem. It's an operating system problem.
Here's what a frontier agent OS actually looks like when you build it for production.
The Architecture Nobody Talks About
Everyone builds "agent frameworks." LangGraph, LangChain, AutoGen, CrewAI, AutoGPT, BabyAGI, AutoGPT-Next, AutoGPT-Next-Next. They're all frameworks. Frameworks are libraries. Libraries don't manage state across 50 tool calls. Libraries don't survive agent crashes. Libraries don't checkpoint context windows. Libraries don't enforce invariants.
An agent OS is a runtime. It has a kernel. It has a scheduler. It has a memory manager. It has a filesystem abstraction. It has a device driver layer for tools. It has a scheduler that decides which agent runs next, with what context, with what tools, with what timeout, with what rollback capability.
Here's the architecture that actually works in production:
┌─────────────────────────────────────────────────────────────────┐
│ AGENT OS KERNEL │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ SCHEDULER │ │ MEMORY MGR │ │ CONTEXT MGR│ │
│ │ (policy) │ │ (vector + │ │ (window + │ │
│ │ │ │ graph + │ │ checkpoint)│ │
│ │ │ │ kv) │ │ │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └────────────────┼────────────────┘ │
│ ▼ │
│ ┌───────────────────────┐ │
│ │ TOOL RUNTIME │ │
│ │ (sandbox + drivers) │ │
│ └───────────┬───────────┘ │
│ │ │
│ ┌────────────────┼────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ FILESYSTEM │ │ SHELL │ │ LSP/LSP │ │
│ │ DRIVER │ │ DRIVER │ │ DRIVER │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────┘
The kernel doesn't call models. The kernel manages the runtime in which agents execute. The model is just another device driver—a compute device that takes context and returns tool calls. The kernel decides which model, which context, which tools, what timeout, what rollback point.
The Context Manager: The Actual Kernel
This is where every framework fails. They treat context as "stuff we stuff into the prompt." It's not. Context is memory hierarchy.
┌────────────────────────────────────────────────────────────────┐
│ CONTEXT HIERARCHY │
├────────────────────────────────────────────────────────────────┤
│ │
│ L1: IMMEDIATE CONTEXT (8k-32k tokens) │
│ ├─ Current task spec │
│ ├─ Active file handles (open files, cursor positions) │
│ ├─ Recent tool results (last 5-10 calls) │
│ ├─ Active error state (stack traces, test failures) │
│ └─ Scratchpad (agent's working memory) │
│ │
│ L2: WORKING CONTEXT (32k-128k tokens) │
│ ├─ Relevant file contents (AST-summarized + key sections) │
│ ├─ Symbol index (definitions, references, call graphs) │
│ ├─ Recent git history (commits, diffs, blame for touched) │
│ ├─ Test results (passing/failing, coverage gaps) │
│ └─ Dependency graph (imports, exports, circular deps) │
│ │
│ L3: ARCHIVAL CONTEXT (vector + graph DB, unbounded) │
│ ├─ Full codebase embeddings (semantic search) │
│ ├─ Knowledge graph (entities: files, functions, tables, │
│ │ services, configs, infra, docs, decisions, people) │
│ ├─ Historical episodes (past tasks, decisions, outcomes) │
│ ├─ Documentation, ADRs, RFCs, runbooks │
│ └─ Production telemetry (traces, logs, metrics, profiles) │
│ │
│ L4: PERSISTENT STATE (durable, transactional) │
│ ├─ Checkpoints (full context snapshots at decision points) │
│ ├─ Agent state machines (paused/resumed workflows) │
│ ├─ Tool execution logs (for replay/debugging) │
│ └─ Evaluation traces (for RLHF/RLHF) │
│ │
└────────────────────────────────────────────────────────────────┘
The context manager's job: promote, demote, evict, summarize, and fuse across these tiers on every scheduler tick. This is not "stuff tokens into context window." This is virtual memory management for LLMs.
The Context Manager Kernel Loop
# Pseudocode for the actual context manager loop
class ContextManager:
def __init__(self, kernel):
self.kernel = kernel
self.l1 = LRUCache(max_tokens=32000)
self.l2 = LRUCache(max_tokens=128000)
self.l3 = VectorGraphDB()
self.l4 = CheckpointStore()
def prepare_context(self, agent_id: str, task: Task) -> ContextBundle:
# 1. Load checkpoint if resuming
checkpoint = self.l4.load_latest(agent_id)
if checkpoint:
self.restore(checkpoint)
# 2. Promote L3 → L2 based on task relevance
relevant = self.l3.query(task.query, k=50)
self.promote_to_l2(relevant)
# 3. Promote L2 → L1 based on recency + relevance
active = self.select_for_l1(task)
self.promote_to_l1(active)
# 4. Inject runtime state (open files, errors, test results)
runtime_state = self.kernel.runtime.get_state(agent_id)
self.inject_l1(runtime_state)
# 5. Compress if over budget
if self.l1.tokens > 32000:
self.compress_l1()
# 6. Build context bundle with provenance
return ContextBundle(
l1=self.l1.serialize(),
l2=self.l2.serialize(),
provenance=self.track_provenance(),
token_budget=self.calculate_budget()
)
def on_tool_result(self, agent_id: str, tool: Tool, result: ToolResult):
# Immediate promotion to L1
self.l1.put(f"tool:{tool.name}:result", result.summary())
# If file read, promote to L2 with AST summary
if tool.name == "read_file":
ast_summary = self.ast_summarize(result.content)
self.l2.put(f"file:{tool.args.path}:ast", ast_summary)
# If error, pin to L1 until resolved
if result.is_error:
self.l1.pin(f"error:{tool.name}", result.error)
This is virtual memory. L1 = registers. L2 = L1 cache. L3 = RAM. L4 = disk. The context manager is the MMU.
The Memory Manager: Vector + Graph + KV, Not Just RAG
RAG is not memory. RAG is "similarity search over chunks." That works for "find me the doc about auth." It fails for "find me all the places where user_id flows from the API gateway to the billing service." That's a graph query, not a similarity query.
The memory manager maintains three coordinated stores:
class MemoryManager:
def __init__(self):
self.vector = VectorStore(embedding_model="text-embedding-3-large")
self.graph = KnowledgeGraph()
self.kv = KVStore()
def ingest_codebase(self, repo_path: Path):
# 1. Parse AST for every file
for file in repo_path.rglob("*"):
if file.suffix in SUPPORTED_EXTENSIONS:
ast = self.parse_ast(file)
# 2. Extract entities to graph
for node in ast.nodes:
self.graph.upsert(Entity(
id=f"{file.path}:{node.name}",
type=node.type, # function, class, table, config, etc.
file=file.path,
span=node.span,
signature=node.signature,
docstring=node.docstring,
))
# 3. Extract relationships
for edge in ast.edges:
self.graph.upsert(Relation(
src=f"{file.path}:{edge.src}",
dst=f"{file.path}:{edge.dst}",
type=edge.type, # calls, imports, inherits, references
))
# 4. Embed semantic chunks for vector search
for chunk in self.chunk_semantically(ast):
self.vector.upsert(VectorRecord(
id=f"{file.path}:{chunk.id}",
text=chunk.text,
embedding=self.embed(chunk.text),
metadata={
"file": file.path,
"symbols": chunk.symbols,
"layer": self.classify_layer(file.path),
}
))
# 5. Cross-reference with infra/config/docs
self.ingest_infra()
self.ingest_docs()
self.ingest_git_history()
def query(self, task: Task) -> MemoryResult:
# Hybrid query: vector + graph + kv
vector_results = self.vector.search(task.query, k=20)
graph_results = self.graph.query(task.query_graph())
kv_results = self.kv.lookup(task.key_lookups())
# Fuse with reciprocal rank fusion
return self.fuse(vector_results, graph_results, kv_results)
The knowledge graph entities aren't just code symbols. They include:
Entity Types:
├── Code: function, class, method, variable, type, interface
├── Data: table, column, index, migration, schema
├── Infra: service, deployment, configmap, secret, ingress, cronjob
├── Config: env_var, feature_flag, yaml, json, toml
├── Docs: adr, rfc, runbook, readme, changelog
├── People: owner, reviewer, oncall, author
├── Decision: adr_id, rationale, alternatives, status
└── Incident: incident_id, root_cause, resolution, action_items
The graph edges capture data flow, control flow, dependency, ownership, temporal relationships:
function:auth:validate_token ──calls──► function:db:get_user
function:api:create_user ──writes──► table:users
table:users ──owned_by──► team:backend
service:billing ──depends_on──► service:payments
adr:042 ──decided_by──► person:sarah
incident:INC-2024-0842 ──caused_by──► migration:2024_08_15_add_idx
When an agent asks "where does user_id flow from the gateway to billing?", the graph query is:
MATCH path = (src:Entity {name: "api_gateway"})-[:CALLS*1..5]->(dst:Entity {name: "billing"})
WHERE ANY(n IN nodes(path) WHERE n.name CONTAINS "user_id")
RETURN path
RAG returns "the auth docs mention user_id." The graph returns the actual data flow path. The agent OS needs both.
The Tool Runtime: Sandbox + Drivers, Not "Tools"
Frameworks treat tools as functions. read_file(path) -> str. run_command(cmd) -> str. This is wrong. Tools are devices. They have state. They have side effects. They need drivers. They need sandboxes. They need resource limits. They need observability.
class ToolRuntime:
def __init__(self, kernel):
self.kernel = kernel
self.sandbox = SandboxManager()
self.drivers = {}
self.registry = ToolRegistry()
def register_driver(self, name: str, driver: ToolDriver):
self.drivers[name] = driver
self.registry.register(ToolSpec(
name=name,
driver=name,
schema=driver.schema,
capabilities=driver.capabilities,
resource_limits=driver.limits,
))
async def execute(self, agent_id: str, tool_call: ToolCall) -> ToolResult:
driver = self.drivers[tool_call.tool]
# 1. Check capabilities against agent policy
if not self.kernel.policy.check(agent_id, driver.capabilities):
return ToolResult.error("Capability denied by policy")
# 2. Prepare sandbox (filesystem, network, process namespace)
sandbox = await self.sandbox.prepare(agent_id, driver.requirements)
# 3. Execute with timeout, resource limits, observability
with self.kernel.observability.trace(f"tool:{tool_call.tool}") as span:
span.set_attribute("agent_id", agent_id)
span.set_attribute("tool_args", tool_call.args)
try:
result = await asyncio.wait_for(
driver.execute(sandbox, tool_call.args),
timeout=driver.limits.timeout_seconds
)
# 4. Capture side effects (file changes, process spawns, network)
side_effects = await sandbox.capture_effects()
# 5. Update context manager with results
self.kernel.context.on_tool_result(agent_id, tool_call, result)
# 6. Update memory manager with new knowledge
self.kernel.memory.ingest_tool_result(tool_call, result, side_effects)
return ToolResult.ok(result, side_effects)
except asyncio.TimeoutError:
await sandbox.terminate()
return ToolResult.error("Tool timeout")
except Exception as e:
await sandbox.capture_state_for_debugging()
return ToolResult.error(str(e))
The filesystem driver doesn't just read_file. It maintains open file handles, cursor positions, watch notifications, diff tracking:
class FilesystemDriver(ToolDriver):
def __init__(self, sandbox):
self.sandbox = sandbox
self.open_files = {} # agent_id -> {path: FileHandle}
self.watchers = {} # path -> [agent_id]
async def read_file(self, agent_id: str, path: str, range: Range = None) -> ToolResult:
# Track open file for context promotion
handle = self.open_files.setdefault(agent_id, {})[path] = FileHandle(
path=path,
content=await self.sandbox.read(path),
cursor=0,
version=await self.sandbox.get_version(path)
)
if range:
content = handle.content[range.start:range.end]
handle.cursor = range.end
else:
content = handle.content
# Notify context manager: this file is now "active"
self.kernel.context.promote_file(agent_id, path, handle.version)
return ToolResult.ok(content, metadata={
"path": path,
"version": handle.version,
"size": len(content),
"mime": self.detect_mime(path)
})
async def write_file(self, agent_id: str, path: str, content: str, mode: WriteMode) -> ToolResult:
# Create checkpoint before write
checkpoint = await self.sandbox.checkpoint(path)
# Write with atomic replace
await self.sandbox.write(path, content, mode)
# Notify watchers
for watcher_id in self.watchers.get(path, []):
self.kernel.context.notify_file_changed(watcher_id, path)
# Update open handle
if agent_id in self.open_files and path in self.open_files[agent_id]:
self.open_files[agent_id][path].content = content
self.open_files[agent_id][path].version += 1
return ToolResult.ok(None, metadata={
"path": path,
"bytes_written": len(content),
"checkpoint_id": checkpoint.id
})
The shell driver maintains persistent PTY sessions, not fire-and-forget commands. It tracks working directory, environment, process tree, output history:
class ShellDriver(ToolDriver):
def __init__(self, sandbox):
self.sessions = {} # agent_id -> PTYSession
async def execute(self, agent_id: str, command: str, timeout: int = 300) -> ToolResult:
session = self.sessions.setdefault(agent_id, PTYSession(
cwd=self.sandbox.workdir,
env=self.sandbox.env,
history=[]
))
# Inject agent context into shell env
session.env["AGENT_ID"] = agent_id
session.env["AGENT_TASK"] = self.kernel.context.get_task(agent_id).summary()
process = await session.spawn(command, timeout=timeout)
# Stream output to context manager in real-time
async for chunk in process.stdout:
self.kernel.context.append_stdout(agent_id, chunk)
async for chunk in process.stderr:
self.kernel.context.append_stderr(agent_id, chunk)
result = await process.wait()
# Capture final state
session.cwd = await session.get_cwd()
session.env = await session.get_env()
return ToolResult.ok({
"stdout": result.stdout,
"stderr": result.stderr,
"exit_code": result.exit_code,
"cwd": session.cwd,
"duration_ms": result.duration_ms
})
The LSP driver is the secret weapon. It doesn't just "go to definition." It maintains a live language server per language per workspace, giving agents semantic navigation, not grep:
class LSPDriver(ToolDriver):
def __init__(self, sandbox):
self.servers = {} # language -> LSPServer
self.sandbox = sandbox
async def ensure_server(self, language: str) -> LSPServer:
if language not in self.servers:
self.servers[language] = await LSPServer.start(
language=language,
workspace=self.sandbox.workdir,
capabilities=[
"textDocument/definition",
"textDocument/references",
"textDocument/hover",
"textDocument/typeDefinition",
"textDocument/implementation",
"textDocument/documentSymbol",
"workspace/symbol",
"workspace/executeCommand",
"textDocument/codeAction",
"textDocument/rename",
"textDocument/semanticTokens/full",
]
)
return self.servers[language]
async def find_references(self, agent_id: str, symbol: SymbolRef) -> ToolResult:
server = await self.ensure_server(symbol.language)
refs = await server.text_document_references(
uri=symbol.file_uri,
position=symbol.position,
include_declaration=True
)
# Enrich with context
enriched = []
for ref in refs:
content = await self.sandbox.read_range(ref.uri, ref.range)
enriched.append(Reference(
file=ref.uri,
range=ref.range,
context=content,
kind=self.classify_reference(ref, symbol)
))
# Promote all referenced files to L2 context
for ref in enriched:
self.kernel.context.promote_file(agent_id, ref.file)
return ToolResult.ok(enriched)
The Scheduler: Policy, Not Heuristics
Frameworks use heuristics. "If error, retry." "If task done, next task." An agent OS uses policy. Policy is code. Policy is versioned. Policy is testable. Policy is auditable.
class SchedulerPolicy:
def __init__(self, kernel):
self.kernel = kernel
def decide_next(self, agent_id: str) -> SchedulingDecision:
agent = self.kernel.agents[agent_id]
task = agent.current_task
context = self.kernel.context.get_state(agent_id)
# 1. Check for hard stops
if context.token_usage > self.kernel.config.max_tokens * 0.9:
return SchedulingDecision.CHECKPOINT_AND_PAUSE
if context.error_count > self.kernel.config.max_errors:
return SchedulingDecision.ESCALATE_TO_HUMAN
if context.wall_time > self.kernel.config.max_wall_time:
return SchedulingDecision.CHECKPOINT_AND_PAUSE
# 2. Check for completion signals
if task.is_complete(context):
return SchedulingDecision.COMPLETE_TASK
# 3. Check for handoff conditions
if task.requires_specialist(context):
specialist = self.kernel.registry.find_specialist(task.required_capability)
return SchedulingDecision.HANDOFF(specialist)
# 4. Check for tool necessity
if context.needs_external_info():
return SchedulingDecision.USE_TOOL(context.suggested_tool)
# 5. Check for context refresh
if context.staleness > self.kernel.config.max_staleness:
return SchedulingDecision.REFRESH_CONTEXT
# 6. Default: continue with current model
return SchedulingDecision.CONTINUE(model=self.select_model(task, context))
def select_model(self, task: Task, context: Context) -> ModelSpec:
# Policy: use cheapest model that meets capability requirements
required = task.required_capabilities
candidates = self.kernel.model_registry.filter(capabilities=required)
# Prefer specialized models for specialized tasks
if task.type == "code_generation":
return candidates.best_for("coding")
elif task.type == "reasoning":
return candidates.best_for("reasoning")
elif task.type == "tool_use":
return candidates.best_for("function_calling")
else:
return candidates.cheapest()
The scheduler doesn't just pick "next action." It decides which agent runs, with which model, with which tools, with which context, with which timeout, with which checkpoint policy. This is an OS scheduler. It manages processes (agents), not threads (tool calls).
Checkpointing: The Save/Resume That Actually Works
This is where every framework fails. They serialize "messages." Messages are not state. State is: open file handles, shell sessions, LSP connections, context manager tiers, memory manager indexes, tool execution history, error state, retry counters, human feedback, policy decisions.
class CheckpointManager:
def __init__(self, kernel):
self.kernel = kernel
self.store = CheckpointStore()
async def checkpoint(self, agent_id: str, reason: CheckpointReason) -> Checkpoint:
agent = self.kernel.agents[agent_id]
# 1. Checkpoint context manager (all 4 tiers)
context_state = await self.kernel.context.checkpoint(agent_id)
# 2. Checkpoint tool runtime (sandbox state)
sandbox_state = await self.kernel.tools.sandbox.checkpoint(agent_id)
# 3. Checkpoint agent state machine
agent_state = agent.checkpoint()
# 4. Checkpoint memory manager indexes
memory_state = await self.kernel.memory.checkpoint(agent_id)
# 5. Checkpoint scheduler state
scheduler_state = self.kernel.scheduler.checkpoint(agent_id)
checkpoint = Checkpoint(
id=uuid4(),
agent_id=agent_id,
task_id=agent.current_task.id,
reason=reason,
timestamp=datetime.utcnow(),
context=context_state,
sandbox=sandbox_state,
agent=agent_state,
memory=memory_state,
scheduler=scheduler_state,
provenance=self.build_provenance(agent_id),
)
# 6. Store atomically
await self.store.write(checkpoint)
# 7. Notify observability
self.kernel.observability.checkpoint_created(checkpoint)
return checkpoint
async def resume(self, checkpoint_id: str) -> Agent:
checkpoint = await self.store.read(checkpoint_id)
# 1. Restore sandbox (filesystem, processes, network)
await self.kernel.tools.sandbox.restore(checkpoint.sandbox)
# 2. Restore context manager tiers
await self.kernel.context.restore(checkpoint.context)
# 3. Restore memory indexes
await self.kernel.memory.restore(checkpoint.memory)
# 4. Restore agent state
agent = await self.kernel.agents.restore(checkpoint.agent)
# 5. Restore scheduler state
self.kernel.scheduler.restore(checkpoint.scheduler)
# 6. Verify consistency
await self.verify_consistency(agent, checkpoint)
return agent
Checkpointing is not optional. It's not "save for later." It's transactional boundary. Every tool call that mutates state creates a checkpoint. Every model call that exceeds 50k tokens creates a checkpoint. Every error creates a checkpoint. Every human intervention creates a checkpoint.
This is how you get replay debugging. Not "look at the logs." Replay the exact agent execution from any checkpoint. Same model. Same context. Same tools. Same sandbox. Deterministic replay.
The Model Router: Not "Claude vs GPT," "Right Tool for Right Job"
Frontier agent OS doesn't "use Claude Code." It routes to models based on capability requirements, cost budgets, latency SLAs, and policy constraints.
class ModelRouter:
def __init__(self, kernel):
self.kernel = kernel
self.registry = ModelRegistry()
self.policies = RoutingPolicies()
def route(self, request: ModelRequest) -> ModelHandle:
# 1. Filter by capabilities
candidates = self.registry.filter(
capabilities=request.required_capabilities,
max_latency=request.latency_budget,
max_cost=request.cost_budget,
)
# 2. Apply routing policy
policy = self.policies.get_policy_for(request.task_type)
selected = policy.select(candidates, request.context)
# 3. Reserve capacity
handle = selected.reserve(request.estimated_tokens)
# 4. Wrap with observability
return ObservedModelHandle(handle, self.kernel.observability)
def policy_for(self, task_type: TaskType) -> RoutingPolicy:
return {
TaskType.CODE_GENERATION: CodingPolicy(),
TaskType.REASONING: ReasoningPolicy(),
TaskType.TOOL_USE: FunctionCallingPolicy(),
TaskType.CONTEXT_RICH: LargeContextPolicy(),
TaskType.LOW_LATENCY: SpeedPolicy(),
TaskType.HIGH_RELIABILITY: ConsensusPolicy(),
}[task_type]
class CodingPolicy(RoutingPolicy):
def select(self, candidates: List[Model], context: Context) -> Model:
# Prefer models with strong coding benchmarks
scored = []
for m in candidates:
score = (
m.benchmark_score("swe_bench") * 0.4 +
m.benchmark_score("human_eval") * 0.3 +
m.benchmark_score("mbpp") * 0.2 +
(1 / m.cost_per_1k_tokens) * 0.1
)
scored.append((score, m))
return max(scored, key=lambda x: x[0])[1]
class ConsensusPolicy(RoutingPolicy):
"""For high-reliability tasks: run multiple models, vote."""
def select(self, candidates: List[Model], context: Context) -> Model:
# Return a composite model that runs 3 models and aggregates
return ConsensusModel(
models=candidates[:3],
aggregator=MajorityVoteAggregator(),
timeout=context.latency_budget
)
The router is policy-driven, observable, and auditable. Every routing decision is logged with: candidates considered, policy applied, scores, selected model, estimated cost, actual cost, latency, outcome.
Evaluation: The Product Is the Eval Harness
This is the part nobody builds. They build the agent. They don't build the evaluation harness that tells you if the agent is getting better or worse.
The agent OS is the eval harness. Every agent run produces an evaluation trace. Every checkpoint is an eval artifact. Every human intervention is a labeled example.
class EvaluationHarness:
def __init__(self, kernel):
self.kernel = kernel
self.store = EvalStore()
self.judges = JudgeRegistry()
async def evaluate_task(self, task: Task, agent_run: AgentRun) -> EvalResult:
# 1. Collect all artifacts
artifacts = self.collect_artifacts(agent_run)
# 2. Run automated judges
auto_results = await self.run_automated_judges(task, artifacts)
# 3. Queue for human review if needed
if auto_results.needs_human_review:
human_review = await self.queue_human_review(task, artifacts)
auto_results.human_labels = human_review
# 4. Compute metrics
metrics = self.compute_metrics(task, auto_results)
# 5. Store for training/analysis
eval_result = EvalResult(
task_id=task.id,
run_id=agent_run.id,
timestamp=datetime.utcnow(),
metrics=metrics,
auto_judgments=auto_results,
artifacts=artifacts,
)
await self.store.write(eval_result)
# 6. Feed back to memory manager for few-shot learning
if metrics.success:
await self.kernel.memory.add_positive_example(task, artifacts)
else:
await self.kernel.memory.add_negative_example(task, artifacts)
return eval_result
async def run_automated_judges(self, task: Task, artifacts: Artifacts) -> JudgeResults:
judges = self.judges.for_task_type(task.type)
results = {}
for judge in judges:
with self.kernel.observability.trace(f"judge:{judge.name}") as span:
result = await judge.evaluate(task, artifacts)
results[judge.name] = result
span.set_attribute("score", result.score)
return JudgeResults(results)
Automated judges for code tasks:
class CodeCorrectnessJudge(Judge):
async def evaluate(self, task: Task, artifacts: Artifacts) -> Judgment:
# 1. Run tests in sandbox
test_result = await self.run_tests(artifacts.sandbox)
# 2. Static analysis
lint_result = await self.run_linter(artifacts.changed_files)
# 3. Type checking
type_result = await self.run_type_checker(artifacts.changed_files)
# 4. Security scan
sec_result = await self.run_security_scan(artifacts.changed_files)
# 5. Semantic diff vs requirements
semantic_result = await self.semantic_diff(task.requirements, artifacts)
score = (
test_result.pass_rate * 0.4 +
(1 - lint_result.error_rate) * 0.15 +
(1 - type_result.error_rate) * 0.15 +
(1 - sec_result.finding_rate) * 0.1 +
semantic_result.similarity * 0.2
)
return Judgment(
score=score,
passed=score > 0.8,
details={
"tests": test_result,
"lint": lint_result,
"types": type_result,
"security": sec_result,
"semantic": semantic_result,
}
)
class ProductionReadinessJudge(Judge):
async def evaluate(self, task: Task, artifacts: Artifacts) -> Judgment:
checks = await asyncio.gather(
self.check_migrations(artifacts),
self.check_config_changes(artifacts),
self.check_feature_flags(artifacts),
self.check_observability(artifacts),
self.check_rollback_plan(artifacts),
self.check_load_test(artifacts),
)
return Judgment(
score=sum(c.score for c in checks) / len(checks),
passed=all(c.passed for c in checks),
details={c.name: c for c in checks}
)
The eval harness produces training data for the router, few-shot examples for the memory manager, regression detection for the scheduler policy, reward signals for RLHF.
The Agent OS API: What Agents Actually See
Agents don't talk to the kernel directly. They talk to an agent runtime API that abstracts the OS:
class AgentRuntime:
"""The interface every agent sees. Hides the OS complexity."""
def __init__(self, kernel, agent_id: str):
self.kernel = kernel
self.agent_id = agent_id
self.context = kernel.context.view(agent_id)
self.tools = kernel.tools.view(agent_id)
self.memory = kernel.memory.view(agent_id)
self.state = kernel.scheduler.state(agent_id)
# Context operations
def get_context(self, budget: TokenBudget = None) -> ContextBundle:
return self.kernel.context.prepare(self.agent_id, budget)
def focus_on(self, target: FocusTarget):
"""Tell context manager: this is what matters now."""
self.kernel.context.set_focus(self.agent_id, target)
def remember(self, key: str, value: Any, tier: MemoryTier = MemoryTier.L2):
self.kernel.memory.store(self.agent_id, key, value, tier)
def recall(self, query: str, k: int = 10) -> List[MemoryItem]:
return self.kernel.memory.query(self.agent_id, query, k)
# Tool operations
async def call(self, tool: str, **args) -> ToolResult:
return await self.kernel.tools.execute(self.agent_id, tool, args)
async def read_file(self, path: str, range: Range = None) -> ToolResult:
return await self.call("read_file", path=path, range=range)
async def write_file(self, path: str, content: str, mode: WriteMode = "replace") -> ToolResult:
return await self.call("write_file", path=path, content=content, mode=mode)
async def run_command(self, cmd: str, timeout: int = 300) -> ToolResult:
return await self.call("shell", command=cmd, timeout=timeout)
async def find_references(self, symbol: SymbolRef) -> ToolResult:
return await self.call("lsp_references", symbol=symbol)
async def search_code(self, query: str, filters: SearchFilters = None) -> ToolResult:
return await self.call("code_search", query=query, filters=filters)
# Control flow
def checkpoint(self, reason: str = "manual") -> Checkpoint:
return self.kernel.checkpoint(self.agent_id, reason)
def handoff(self, specialist: str, task: Task) -> HandoffResult:
return self.kernel.scheduler.handoff(self.agent_id, specialist, task)
def request_human(self, question: str, context: dict) -> HumanResponse:
return self.kernel.scheduler.request_human(self.agent_id, question, context)
def complete(self, result: TaskResult) -> CompletionResult:
return self.kernel.scheduler.complete(self.agent_id, result)
An agent implementation becomes remarkably simple:
class RefactoringAgent:
def __init__(self, runtime: AgentRuntime):
self.rt = runtime
async def run(self, task: RefactoringTask) -> TaskResult:
# 1. Load context
self.rt.focus_on(FocusTarget(files=task.affected_files))
# 2. Explore codebase
references = await self.rt.find_references(task.target_symbol)
self.rt.remember("references", references)
# 3. Understand impact
impact = await self.analyze_impact(references)
# 4. Plan refactoring
plan = await self.create_plan(impact)
# 5. Execute with checkpoints
for step in plan.steps:
checkpoint = self.rt.checkpoint(f"before_step_{step.id}")
try:
await self.execute_step(step)
except Exception as e:
# Rollback and try alternative
await self.rt.rollback(checkpoint)
await self.try_alternative(step)
# 6. Verify
await self.rt.run_command("pytest -xvs")
await self.rt.run_command("mypy .")
# 7. Complete
return self.rt.complete(TaskResult(
success=True,
changes=plan.changes,
tests_passed=True,
))
Production Patterns That Actually Work
Pattern 1: Context Budgeting as First-Class Concern
Every agent run has a token budget. The context manager enforces it. The scheduler respects it. The model router optimizes for it.
class TokenBudget:
def __init__(self, total: int, reserves: dict):
self.total = total
self.reserves = reserves # {"system": 2000, "tools": 8000, "output": 4000}
self.allocated = 0
@property
def available_for_context(self) -> int:
return self.total - sum(self.reserves.values()) - self.allocated
def allocate(self, purpose: str, tokens: int) -> bool:
if self.allocated + tokens <= self.available_for_context:
self.allocated += tokens
return True
return False
The context manager never exceeds budget. It compresses, summarizes, evicts, promotes—whatever it takes. The agent never sees "context too long" errors. It just sees less context.
Pattern 2: Speculative Execution with Rollback
For expensive operations (large refactors, migrations), the agent OS runs speculative execution:
async def speculative_execute(self, agent_id: str, plan: Plan) -> SpeculativeResult:
# 1. Checkpoint current state
checkpoint = await self.checkpoint(agent_id, "pre_speculative")
# 2. Fork sandbox
forked_sandbox = await self.sandbox.fork(checkpoint.sandbox)
# 3. Execute in forked sandbox with relaxed limits
forked_agent = await self.spawn_agent(
sandbox=forked_sandbox,
policy=SpeculativePolicy(max_tokens=500000, max_time=1800)
)
result = await forked_agent.run(plan)
# 4. Evaluate result
eval_result = await self.eval_harness.evaluate(plan.task, result)
if eval_result.passed:
# 5. Merge sandbox changes back
await self.sandbox.merge(forked_sandbox, checkpoint.sandbox)
return SpeculativeResult.success(result)
else:
# 6. Discard fork, learn from failure
await self.memory.store_failure(plan, eval_result)
return SpeculativeResult.failed(eval_result)
This is how you get agents to do big, risky refactors without blowing up production. They rehearse in a fork. Only successful rehearsals merge.
Pattern 3: Human-in-the-Loop as First-Class Scheduler Primitive
Humans aren't "fallback." They're policy decision points.
class HumanInTheLoop:
def __init__(self, kernel):
self.kernel = kernel
self.queue = HumanReviewQueue()
async def request_decision(self, agent_id: str, decision: DecisionRequest) -> Decision:
# 1. Checkpoint agent
checkpoint = await self.kernel.checkpoint(agent_id, "human_review")
# 2. Prepare context for human
context = await self.prepare_human_context(agent_id, decision)
# 3. Queue for review (Slack, GitHub, PagerDuty, custom UI)
review_id = await self.queue.submit(HumanReview(
agent_id=agent_id,
checkpoint_id=checkpoint.id,
decision=decision,
context=context,
timeout=decision.timeout,
))
# 4. Wait for response (with agent paused)
response = await self.queue.wait(review_id)
# 5. Resume agent with decision
await self.kernel.resume_with_decision(agent_id, checkpoint, response)
return response
Decision types:
-
ApprovePlan(plan)— "Is this refactoring plan correct?" -
ChooseOption(options)— "Which migration strategy?" -
ProvideContext(question)— "What's the business rule for this edge case?" -
AuthorizeAction(action)— "Deploy this to production?" -
LabelOutcome(outcome)— "Was this fix correct?"
Every human decision becomes training data. Every decision is logged with context. The system learns when to ask, what to ask, and how to present context.
Pattern 4: Multi-Agent Workflows as State Machines, Not Chains
LangGraph et al. model workflows as graphs. Production agent OS models them as hierarchical state machines with checkpoints at every transition.
class MigrationWorkflow(StateMachine):
states = [
"analyzing",
"planning",
"reviewing_plan",
"executing_migration",
"validating",
"canarying",
"promoting",
"completed",
"rolled_back",
"failed",
]
transitions = {
"analyzing": ["planning", "failed"],
"planning": ["reviewing_plan", "failed"],
"reviewing_plan": ["executing_migration", "planning", "failed"],
"executing_migration": ["validating", "rolled_back", "failed"],
"validating": ["canarying", "rolled_back", "failed"],
"canarying": ["promoting", "rolled_back", "failed"],
"promoting": ["completed", "rolled_back", "failed"],
}
def on_enter_analyzing(self, context):
# Spawn codebase analysis agent
agent = self.spawn_agent("codebase_analyst", context)
return agent.run(AnalyzeMigrationTask(...))
def on_enter_planning(self, context):
# Spawn planning agent with analysis results
agent = self.spawn_agent("migration_planner", context)
return agent.run(CreateMigrationPlanTask(...))
def on_enter_reviewing_plan(self, context):
# Human reviews plan
return self.request_human(PlanReviewDecision(...))
def on_enter_executing_migration(self, context):
# Spawn execution agent with speculative execution
agent = self.spawn_agent("migration_executor", context)
return agent.run_speculative(ExecuteMigrationTask(...))
def on_enter_validating(self, context):
# Run validation suite
return self.run_validation(context)
def on_enter_canarying(self, context):
# Deploy to 1% traffic
return self.deploy_canary(context)
def on_enter_promoting(self, context):
# Human approves full rollout
return self.request_human(PromotionDecision(...))
Each state transition creates a checkpoint. Failure at any state → automatic rollback to last good checkpoint. Human review at critical states. Full observability at every step.
The Observability Stack: You Can't Debug What You Can't See
Agent OS observability isn't "logs." It's distributed tracing across model calls, tool calls, context changes, memory operations, scheduler decisions, and human interactions.
class AgentObservability:
def __init__(self):
self.tracer = Tracer("agent-os")
self.metrics = MetricsCollector()
self.logger = StructuredLogger()
@contextmanager
def trace_agent_run(self, agent_id: str, task: Task):
with self.tracer.start_span(f"agent.run.{task.type}") as span:
span.set_attribute("agent.id", agent_id)
span.set_attribute("task.id", task.id)
span.set_attribute("task.type", task.type)
start_time = time.time()
token_start = self.get_token_usage(agent_id)
try:
yield span
span.set_attribute("status", "success")
span.set_attribute("duration_ms", (time.time() - start_time) * 1000)
span.set_attribute("tokens_used", self.get_token_usage(agent_id) - token_start)
except Exception as e:
span.set_attribute("status", "error")
span.set_attribute("error.type", type(e).__name__)
span.set_attribute("error.message", str(e))
raise
@contextmanager
def trace_model_call(self, agent_id: str, model: ModelSpec, purpose: str):
with self.tracer.start_span(f"model.call.{purpose}") as span:
span.set_attribute("agent.id", agent_id)
span.set_attribute("model.name", model.name)
span.set_attribute("model.provider", model.provider)
span.set_attribute("purpose", purpose)
start = time.time()
yield span
span.set_attribute("latency_ms", (time.time() - start) * 1000)
span.set_attribute("input_tokens", self.last_input_tokens)
span.set_attribute("output_tokens", self.last_output_tokens)
span.set_attribute("cost_usd", self.last_cost)
@contextmanager
def trace_tool_call(self, agent_id: str, tool: str, args: dict):
with self.tracer.start_span(f"tool.call.{tool}") as span:
span.set_attribute("agent.id", agent_id)
span.set_attribute("tool.name", tool)
span.set_attribute("tool.args", json.dumps(args, default=str))
start = time.time()
try:
yield span
span.set_attribute("status", "success")
except Exception as e:
span.set_attribute("status", "error")
span.set_attribute("error", str(e))
raise
finally:
span.set_attribute("duration_ms", (time.time() - start) * 1000)
def trace_context_change(self, agent_id: str, change: ContextChange):
self.metrics.increment("context.changes", tags={
"agent": agent_id,
"tier": change.tier,
"operation": change.operation,
})
if change.tokens_delta > 1000:
self.logger.warning("Large context change",
agent_id=agent_id,
delta=change.tokens_delta,
operation=change.operation)
def trace_scheduler_decision(self, agent_id: str, decision: SchedulingDecision):
self.metrics.increment("scheduler.decisions", tags={
"agent": agent_id,
"decision": decision.type,
"model": decision.model.name if decision.model else "none",
})
Dashboards that matter:
- Token efficiency: tokens per successful task completion
- Tool success rate: by tool, by agent type, by task type
- Context churn: how often L1/L2/L3 are refreshed
- Checkpoint frequency: per task type, per agent type
- Human intervention rate: when, why, outcome
- Model routing accuracy: predicted vs actual cost/latency/quality
- Speculative execution success rate: how often forks merge
- Rollback frequency: and why
Building It: The Actual Stack
You don't build this from scratch. You compose it from primitives that exist:
| Layer | Technology Choices | Why |
|---|---|---|
| Sandbox | gVisor / Firecracker / Kata Containers / nsjail | Hardware-virtualized isolation, fast snapshots |
| Filesystem | OverlayFS + FUSE + inotify | Copy-on-write, instant checkpoints, change notifications |
| Shell | asyncssh + asyncio PTY | Persistent sessions, stream output, process control |
| LSP | pyright, typescript-language-server, gopls, rust-analyzer, clangd | Semantic code intelligence per language |
| Vector DB | Qdrant / Weaviate / Pinecone / pgvector | Hybrid search, filtering, scalable |
| Graph DB | Kuzu / FalkorDB / Neo4j / Kuzu (embedded) | Property graph, Cypher, embedded option |
| KV Store | Redis / Dragonfly / Valkey / etcd | Low-latitude, pub/sub for cache invalidation |
| Checkpoint Store | S3 / GCS / MinIO + SQLite index | Immutable, versioned, queryable |
| Task Queue | Temporal / Hatchet / Celery + Redis | Durable execution, retries, scheduling |
| Model Gateway | LiteLLM / Portkey / Helicone / custom | Routing, fallbacks, observability, budgets |
| Observability | OpenTelemetry + Jaeger/Tempo + Prometheus/Grafana | Vendor-neutral, distributed tracing |
| Policy Engine | OPA / Cedar / custom Rust | Decidable, auditable, testable policies |
The kernel itself is ~3,000 lines of Rust or Go. The drivers are ~500 lines each. The scheduler policy is ~1,000 lines of Python (for iteration speed). The context manager is ~2,000 lines. The memory manager is ~3,000 lines. The eval harness is ~2,000 lines.
Total: ~15,000 lines of core OS code. The rest is drivers, policies, judges, agents.
The Hard Parts Nobody Tells You
1. Context Compression Is Lossy Compression
You will lose information. The question is what you can afford to lose.
Compression Strategy by Tier:
┌─────────────┬─────────────────────────────────────────────────────────┐
│ L1 → L2 │ Keep: exact tool results, error messages, open files │
│ │ Summarize: model reasoning, verbose tool output │
│ │ Drop: duplicate context, old scratchpad │
├─────────────┼─────────────────────────────────────────────────────────┤
│ L2 → L3 │ Keep: AST summaries, symbol defs, key config │
│ │ Embed: file contents, doc sections, conversation │
│ │ Graph: relationships, data flow, ownership │
│ │ Drop: verbatim source (retrievable from git) │
├─────────────┼─────────────────────────────────────────────────────────┤
│ L3 → L4 │ Keep: checkpoints, decisions, outcomes, labels │
│ │ Archive: full context bundles for replay │
│ │ Index: for retrieval │
└─────────────┴─────────────────────────────────────────────────────────┘
The compression algorithm is the product. Get it wrong and agents hallucinate. Get it right and they seem omniscient.
2. The Sandbox Is the Attack Surface
Every tool is a potential escape. The filesystem driver, shell driver, network driver—they all need capability-based security, not ACLs.
// Capability-based sandbox policy
struct SandboxPolicy {
// Filesystem capabilities
fs_read: CapabilitySet<PathPattern>,
fs_write: CapabilitySet<PathPattern>,
fs_exec: CapabilitySet<PathPattern>,
// Network capabilities
net_egress: CapabilitySet<CidrBlock>,
net_ingress: CapabilitySet<PortRange>,
// Process capabilities
proc_spawn: CapabilitySet<CommandPattern>,
proc_ptrace: bool,
// Resource limits
max_memory_mb: u64,
max_cpu_seconds: u64,
max_pids: u32,
max_open_files: u32,
// Seccomp profile
seccomp_profile: SeccompProfile,
}
// Each agent gets a policy derived from its role
fn policy_for_agent(role: AgentRole) -> SandboxPolicy {
match role {
AgentRole::CodeAnalyzer => SandboxPolicy {
fs_read: CapabilitySet::allow_all(),
fs_write: CapabilitySet::deny_all(),
net_egress: CapabilitySet::deny_all(),
proc_spawn: CapabilitySet::allow(["git", "grep", "rg", "ast-grep"]),
max_memory_mb: 2048,
seccomp_profile: SeccompProfile::ReadOnly,
},
AgentRole::Refactorer => SandboxPolicy {
fs_read: CapabilitySet::allow_all(),
fs_write: CapabilitySet::allow(["**/*.rs", "**/*.py", "**/*.ts", "**/*.go"]),
net_egress: CapabilitySet::deny_all(),
proc_spawn: CapabilitySet::allow(["cargo", "pytest", "mypy", "go", "npm"]),
max_memory_mb: 8192,
seccomp_profile: SeccompProfile::BuildTools,
},
AgentRole::Deployer => SandboxPolicy {
fs_read: CapabilitySet::allow(["**/*.yaml", "**/*.yml", "**/Dockerfile*"]),
fs_write: CapabilitySet::deny_all(),
net_egress: CapabilitySet::allow(["kubernetes.default.svc", "registry.internal"]),
proc_spawn: CapabilitySet::allow(["kubectl", "helm", "docker", "skaffold"]),
max_memory_mb: 4096,
seccomp_profile: SeccompProfile::DeployTools,
},
}
}
No agent runs as root. No agent has unrestricted network. No agent spawns arbitrary processes. The sandbox is the security boundary.
3. Model Non-Determinism Breaks Replay
You checkpoint everything. You replay. The model gives a different answer. Replay breaks.
Solutions:
- Temperature = 0 for all production model calls (deterministic sampling)
-
Seed fixation where providers support it (OpenAI
seedparam, Anthropic doesn't yet) - Logprobs capture — store top-k logprobs at each step, verify replay matches
- Consensus verification — run 3 models, require 2/3 agreement for critical decisions
- Deterministic tool schemas — no optional fields, strict validation, no "creative" tool use
class DeterministicModelWrapper:
def __init__(self, model: ModelHandle):
self.model = model
async def complete(self, request: CompletionRequest) -> CompletionResponse:
# Force deterministic parameters
request = request.model_copy(update={
"temperature": 0.0,
"top_p": 1.0,
"seed": request.seed or self.generate_seed(request),
})
response = await self.model.complete(request)
# Capture logprobs for replay verification
if request.logprobs:
self.store_logprobs(request.trace_id, response.logprobs)
return response
def verify_replay(self, trace_id: str, replay_response: CompletionResponse) -> bool:
original_logprobs = self.get_logprobs(trace_id)
replay_logprobs = replay_response.logprobs
# Verify top token matches at each position
for orig, replay in zip(original_logprobs, replay_logprobs):
if orig.top_token != replay.top_token:
return False
if abs(orig.top_logprob - replay.top_logprob) > 0.01:
return False
return True
4. The Context Manager Is a Real-Time System
It has hard deadlines. The model call must receive context within the latency budget. Context promotion/demotion/compression cannot block the scheduler.
// Rust context manager for real-time guarantees
pub struct ContextManager {
l1: Arc<Mutex<LruCache<ContextKey, ContextEntry>>>,
l2: Arc<Mutex<LruCache<ContextKey, ContextEntry>>>,
l3: Arc<VectorGraphDb>,
l4: Arc<CheckpointStore>,
// Background workers
promotion_tx: mpsc::Sender<PromotionTask>,
compression_tx: mpsc::Sender<CompressionTask>,
checkpoint_tx: mpsc::Sender<CheckpointTask>,
}
impl ContextManager {
// MUST complete in < 5ms
pub fn prepare_context(&self, req: ContextRequest) -> ContextBundle {
let mut l1 = self.l1.lock().unwrap();
let mut l2 = self.l2.lock().unwrap();
// Fast path: L1 only
if req.budget < L1_BUDGET {
return ContextBundle::from_l1(l1.get_relevant(req.query));
}
// Medium path: L1 + promoted L2
let l2_items = self.promote_l2_to_l1(&mut l1, &mut l2, req);
// Slow path: async L3 promotion (fire and forget)
if req.budget > L2_BUDGET {
self.promotion_tx.try_send(PromotionTask {
agent_id: req.agent_id,
query: req.query,
target_tier: Tier::L2,
}).ok(); // Drop if busy - non-blocking
}
ContextBundle::from_tiers(l1, l2, req.budget)
}
}
The context manager never blocks on I/O. L3 promotion happens async. Compression happens async. Checkpointing happens async. The synchronous path is L1 + L2 only, bounded by token budget.
What This Actually Buys You
An agent OS built this way doesn't just "work better." It enables workflows that are impossible with frameworks:
| Workflow | Framework Approach | Agent OS Approach |
|---|---|---|
| 50-file refactor across 3 services | Agent hits context limit, hallucinates imports, breaks tests | Speculative execution in forked sandbox, checkpoint per file, validation gates, human review at plan stage |
| Production incident debug at 3AM | Agent reads logs, guesses root cause, suggests random fixes | Graph query traces request flow, correlates with recent deploys, checks related incidents, proposes targeted fix with rollback plan |
| Database migration with zero downtime | Agent writes migration, hopes for best | Analyzes schema + queries + access patterns, generates backward-compatible migration + deployment plan + validation suite + canary strategy, human gates at each stage |
| New feature across FE/BE/API/DB | Agent tries to write everything, loses coherence | Planner agent decomposes, specialist agents execute with shared context, integration agent validates contracts, deployment agent stages rollout |
| Legacy codebase modernization | Agent rewrites file by file, introduces regressions | Analysis agent builds knowledge graph, planning agent creates phased strategy, executor agents work in parallel with shared memory, validation agents run continuously |
The Honest Assessment
Building this takes 6-12 months for a team of 3-5 engineers who know distributed systems, PL tooling, and LLM behavior. It's not a side project. It's not a weekend hack.
But here's the thing: every company deploying agents to production is building pieces of this anyway. They're building context management. They're building sandboxing. They're building evaluation. They're building routing. They're building observability. They're just building them as ad-hoc scripts inside their agent code instead of as a reusable runtime.
The frontier agent OS is the Linux for AI agents. We're in the 1991 phase. Everyone's writing their own kernel. Soon someone will write the one that everyone else builds on.
If you're building agents for production, you have two choices:
- Build the OS — invest 6-12 months, own the runtime, compound the investment
- Wait for the OS — build on frameworks, hit the ceiling, rewrite later
There is no third option. The complexity doesn't disappear. It just moves from "runtime you control" to "framework you fight."
I chose option 1 eighteen months ago. The agents I run today deploy to production, debug incidents, refactor legacy systems, and write features across the stack. They checkpoint. They rollback. They ask humans. They learn from every run.
They don't hallucinate file paths. They don't forget the schema. They don't lose context at 94%.
They just work. Because underneath them is an operating system that makes sure they can.
Appendix: Minimal Viable Agent OS (If You Must Start Smaller)
You can't build the full thing tomorrow. But you can build the critical path:
# mvp_agent_os.py — ~500 lines that changes everything
import asyncio
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from pathlib import Path
import json
import hashlib
import time
@dataclass
class ContextTier:
name: str
max_tokens: int
data: Dict[str, Any] = field(default_factory=dict)
access_times: Dict[str, float] = field(default_factory=dict)
@dataclass
class SimpleContextManager:
tiers: Dict[str, ContextTier] = field(default_factory=lambda: {
"L1": ContextTier("L1", 8000), # Immediate
"L2": ContextTier("L2", 32000), # Working
"L3": ContextTier("L3", 200000), # Archival (simulated)
})
def promote(self, key: str, value: Any, from_tier: str, to_tier: str):
if key in self.tiers[from_tier].data:
del self.tiers[from_tier].data[key]
self.tiers[to_tier].data[key] = value
self.tiers[to_tier].access_times[key] = time.time()
self._enforce_budget(to_tier)
def _enforce_budget(self, tier_name: str):
tier = self.tiers[tier_name]
# Simple LRU eviction
while self._estimate_tokens(tier.data) > tier.max_tokens:
oldest = min(tier.access_times, key=tier.access_times.get)
del tier.data[oldest]
del tier.access_times[oldest]
def _estimate_tokens(self, data: Dict) -> int:
return sum(len(json.dumps(v)) // 4 for v in data.values())
def build_context(self, budget: int = 50000) -> str:
parts = []
remaining = budget
for tier_name in ["L1", "L2", "L3"]:
tier = self.tiers[tier_name]
tier_content = json.dumps(tier.data, indent=2)
tier_tokens = len(tier_content) // 4
if tier_tokens <= remaining:
parts.append(f"=== {tier_name} ===\n{tier_content}")
remaining -= tier_tokens
else:
# Truncate L3
if tier_name == "L3":
truncated = tier_content[:remaining * 4]
parts.append(f"=== {tier_name} (truncated) ===\n{truncated}")
break
return "\n\n".join(parts)
@dataclass
class SimpleSandbox:
workdir: Path
checkpoints: Dict[str, Dict] = field(default_factory=dict)
async def read(self, path: str) -> str:
return (self.workdir / path).read_text()
async def write(self, path: str, content: str):
(self.workdir / path).write_text(content)
async def run(self, cmd: str, timeout: int = 60) -> dict:
proc = await asyncio.create_subprocess_shell(
cmd, cwd=self.workdir,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout)
return {"stdout": stdout.decode(), "stderr": stderr.decode(), "code": proc.returncode}
except asyncio.TimeoutError:
proc.kill()
return {"stdout": "", "stderr": "TIMEOUT", "code": -1}
def checkpoint(self, name: str):
# Snapshot key files (simplified)
self.checkpoints[name] = {
"files": {str(f.relative_to(self.workdir)): f.read_text()
for f in self.workdir.rglob("*") if f.is_file()},
"time": time.time()
}
def restore(self, name: str):
if name in self.checkpoints:
for path, content in self.checkpoints[name]["files"].items():
(self.workdir / path).write_text(content)
@dataclass
class SimpleAgentOS:
context: SimpleContextManager = field(default_factory=SimpleContextManager)
sandbox: SimpleSandbox = field(default_factory=lambda: SimpleSandbox(Path.cwd()))
checkpoints: Dict[str, Any] = field(default_factory=dict)
token_usage: int = 0
async def run_agent(self, agent_fn, task: str, model_fn, max_turns: int = 20):
# Initialize context
self.context.promote("task", task, "L1", "L1")
for turn in range(max_turns):
# Build context
context_str = self.context.build_context()
# Call model
response = await model_fn(context_str)
self.token_usage += len(context_str) // 4 + len(response) // 4
# Parse tool calls (simplified)
tool_calls = self.parse_tool_calls(response)
if not tool_calls:
# Agent done
break
# Execute tools
for call in tool_calls:
result = await self.execute_tool(call)
# Promote results to context
self.context.promote(
f"tool:{call['name']}:{turn}",
{"args": call["args"], "result": result},
"L1", "L1"
)
# If file read, promote to L2
if call["name"] == "read_file":
self.context.promote(
f"file:{call['args']['path']}",
result,
"L1", "L2"
)
# Checkpoint every 5 turns
if turn % 5 == 0:
self.checkpoint(f"turn_{turn}")
return {"turns": turn, "tokens": self.token_usage}
async def execute_tool(self, call: dict) -> Any:
name = call["name"]
args = call["args"]
if name == "read_file":
return await self.sandbox.read(args["path"])
elif name == "write_file":
await self.sandbox.write(args["path"], args["content"])
return {"ok": True}
elif name == "run_command":
return await self.sandbox.run(args["command"])
elif name == "search_code":
# Simplified: grep
result = await self.sandbox.run(f"rg -n {args['pattern']} {args.get('path', '.')}")
return result["stdout"]
else:
return {"error": f"Unknown tool: {name}"}
def checkpoint(self, name: str):
self.checkpoints[name] = {
"context": {k: v.data.copy() for k, v in self.context.tiers.items()},
"sandbox": self.sandbox.checkpoints.copy(),
"tokens": self.token_usage,
"time": time.time(),
}
self.sandbox.checkpoint(name)
def restore(self, name: str):
if name in self.checkpoints:
cp = self.checkpoints[name]
for tier_name, data in cp["context"].items():
self.context.tiers[tier_name].data = data
self.sandbox.checkpoints = cp["sandbox"]
self.token_usage = cp["tokens"]
self.sandbox.restore(name)
# Usage
async def main():
os = SimpleAgentOS()
async def my_model(context: str) -> str:
# Call your model here (Claude, GPT, etc.)
return await call_model(context)
result = await os.run_agent(
agent_fn=None, # Not used in this simple version
task="Refactor the auth module to use JWT instead of sessions",
model_fn=my_model,
max_turns=30
)
print(result)
# This 500-line MVP gives you:
# 1. Tiered context management with budgets
# 2. Sandbox with checkpoint/restore
# 3. Tool execution with result promotion
# 4. Turn-based agent loop with token tracking
# 5. Foundation to build the real thing on
Start there. But know: the MVP is not the OS. The OS is what you build when the MVP hits the wall—and it will. The wall is usually context management, sandbox fidelity, or evaluation. The OS solves all three.
Related Reading: Context Engineering for Production Agents · Building Reliable Agent Evaluations · Sandbox Security for Code Agents
Top comments (0)