DEV Community

Cover image for How to Build a Frontier Agent OS
Sensei
Sensei

Posted on

How to Build a Frontier Agent OS

The terminalhung at 94% for the third time that afternoon. Claude Code had been "thinking" for four minutes—spinning up a subprocess, spawning a subprocess, spawning a subprocess—each one inheriting a context window bloated with 140,000 tokens of hallucinated file paths, hallucinated imports, and the ghost of a requirements.txt that hadn't existed since Tuesday. I killed it. Again. Switched to Codex. Same repo. Same prompt. Codex spent three minutes writing a 400-line migration script for a table that didn't exist, hallucinating a created_at column with a default that would've violated a NOT NULL constraint on a 40-million-row production table. Killed it. Switched to OpenCode. Same repo. Same prompt. OpenCode spent six minutes writing a test suite for a function that didn't exist, mocking a dependency that had been deleted in the refactor I'd asked it to do.

Three agents. Three different failure modes. Same root cause: none of them knew what the codebase actually looked like right now.

That's the frontier agent OS problem. Not "which model is smartest." Not "which framework has the prettiest dashboard." The frontier agent OS problem is: how do you build a runtime that maintains a coherent, accurate, queryable model of a living codebase—across 200k tokens of context, 50+ tool calls, 47 file edits, and three agent handoffs—so that the next agent action is grounded in reality, not hallucination?

I've spent the last 18 months building agent orchestration runtimes. Not demos. Production systems that deploy code to production, that manage Kubernetes clusters, that refactor 200k-line monorepos, that debug production incidents at 3 AM. The gap between "Claude Code works great on this 500-line repo" and "Claude Code deploys a migration to production without dropping a column" is not a model quality problem. It's an operating system problem.

Here's what a frontier agent OS actually looks like when you build it for production.


The Architecture Nobody Talks About

Everyone builds "agent frameworks." LangGraph, LangChain, AutoGen, CrewAI, AutoGPT, BabyAGI, AutoGPT-Next, AutoGPT-Next-Next. They're all frameworks. Frameworks are libraries. Libraries don't manage state across 50 tool calls. Libraries don't survive agent crashes. Libraries don't checkpoint context windows. Libraries don't enforce invariants.

An agent OS is a runtime. It has a kernel. It has a scheduler. It has a memory manager. It has a filesystem abstraction. It has a device driver layer for tools. It has a scheduler that decides which agent runs next, with what context, with what tools, with what timeout, with what rollback capability.

Here's the architecture that actually works in production:

┌─────────────────────────────────────────────────────────────────┐
│                        AGENT OS KERNEL                          │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐             │
│  │  SCHEDULER  │  │  MEMORY MGR │  │  CONTEXT MGR│             │
│  │  (policy)   │  │  (vector +  │  │  (window +  │             │
│  │             │  │   graph +   │  │   checkpoint)│            │
│  │             │  │   kv)       │  │             │             │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘             │
│         │                │                │                    │
│         └────────────────┼────────────────┘                    │
│                          ▼                                     │
│              ┌───────────────────────┐                         │
│              │   TOOL RUNTIME        │                         │
│              │  (sandbox + drivers)  │                         │
│              └───────────┬───────────┘                         │
│                          │                                      │
│         ┌────────────────┼────────────────┐                    │
│         ▼                ▼                ▼                    │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐            │
│  │  FILESYSTEM │  │   SHELL     │  │   LSP/LSP   │            │
│  │   DRIVER    │  │   DRIVER    │  │   DRIVER    │            │
│  └─────────────┘  └─────────────┘  └─────────────┘            │
└─────────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

The kernel doesn't call models. The kernel manages the runtime in which agents execute. The model is just another device driver—a compute device that takes context and returns tool calls. The kernel decides which model, which context, which tools, what timeout, what rollback point.


The Context Manager: The Actual Kernel

This is where every framework fails. They treat context as "stuff we stuff into the prompt." It's not. Context is memory hierarchy.

┌────────────────────────────────────────────────────────────────┐
│                    CONTEXT HIERARCHY                           │
├────────────────────────────────────────────────────────────────┤
│                                                                │
│  L1: IMMEDIATE CONTEXT (8k-32k tokens)                        │
│  ├─ Current task spec                                         │
│  ├─ Active file handles (open files, cursor positions)        │
│  ├─ Recent tool results (last 5-10 calls)                     │
│  ├─ Active error state (stack traces, test failures)          │
│  └─ Scratchpad (agent's working memory)                       │
│                                                                │
│  L2: WORKING CONTEXT (32k-128k tokens)                        │
│  ├─ Relevant file contents (AST-summarized + key sections)    │
│  ├─ Symbol index (definitions, references, call graphs)       │
│  ├─ Recent git history (commits, diffs, blame for touched)    │
│  ├─ Test results (passing/failing, coverage gaps)             │
│  └─ Dependency graph (imports, exports, circular deps)        │
│                                                                │
│  L3: ARCHIVAL CONTEXT (vector + graph DB, unbounded)          │
│  ├─ Full codebase embeddings (semantic search)                │
│  ├─ Knowledge graph (entities: files, functions, tables,     │
│  │   services, configs, infra, docs, decisions, people)      │
│  ├─ Historical episodes (past tasks, decisions, outcomes)     │
│  ├─ Documentation, ADRs, RFCs, runbooks                       │
│  └─ Production telemetry (traces, logs, metrics, profiles)   │
│                                                                │
│  L4: PERSISTENT STATE (durable, transactional)                │
│  ├─ Checkpoints (full context snapshots at decision points)   │
│  ├─ Agent state machines (paused/resumed workflows)           │
│  ├─ Tool execution logs (for replay/debugging)                │
│  └─ Evaluation traces (for RLHF/RLHF)                         │
│                                                                │
└────────────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

The context manager's job: promote, demote, evict, summarize, and fuse across these tiers on every scheduler tick. This is not "stuff tokens into context window." This is virtual memory management for LLMs.

The Context Manager Kernel Loop

# Pseudocode for the actual context manager loop
class ContextManager:
    def __init__(self, kernel):
        self.kernel = kernel
        self.l1 = LRUCache(max_tokens=32000)
        self.l2 = LRUCache(max_tokens=128000)
        self.l3 = VectorGraphDB()
        self.l4 = CheckpointStore()

    def prepare_context(self, agent_id: str, task: Task) -> ContextBundle:
        # 1. Load checkpoint if resuming
        checkpoint = self.l4.load_latest(agent_id)
        if checkpoint:
            self.restore(checkpoint)

        # 2. Promote L3 → L2 based on task relevance
        relevant = self.l3.query(task.query, k=50)
        self.promote_to_l2(relevant)

        # 3. Promote L2 → L1 based on recency + relevance
        active = self.select_for_l1(task)
        self.promote_to_l1(active)

        # 4. Inject runtime state (open files, errors, test results)
        runtime_state = self.kernel.runtime.get_state(agent_id)
        self.inject_l1(runtime_state)

        # 5. Compress if over budget
        if self.l1.tokens > 32000:
            self.compress_l1()

        # 6. Build context bundle with provenance
        return ContextBundle(
            l1=self.l1.serialize(),
            l2=self.l2.serialize(),
            provenance=self.track_provenance(),
            token_budget=self.calculate_budget()
        )

    def on_tool_result(self, agent_id: str, tool: Tool, result: ToolResult):
        # Immediate promotion to L1
        self.l1.put(f"tool:{tool.name}:result", result.summary())

        # If file read, promote to L2 with AST summary
        if tool.name == "read_file":
            ast_summary = self.ast_summarize(result.content)
            self.l2.put(f"file:{tool.args.path}:ast", ast_summary)

        # If error, pin to L1 until resolved
        if result.is_error:
            self.l1.pin(f"error:{tool.name}", result.error)

Enter fullscreen mode Exit fullscreen mode

This is virtual memory. L1 = registers. L2 = L1 cache. L3 = RAM. L4 = disk. The context manager is the MMU.


The Memory Manager: Vector + Graph + KV, Not Just RAG

RAG is not memory. RAG is "similarity search over chunks." That works for "find me the doc about auth." It fails for "find me all the places where user_id flows from the API gateway to the billing service." That's a graph query, not a similarity query.

The memory manager maintains three coordinated stores:

class MemoryManager:
    def __init__(self):
        self.vector = VectorStore(embedding_model="text-embedding-3-large")
        self.graph = KnowledgeGraph()
        self.kv = KVStore()

    def ingest_codebase(self, repo_path: Path):
        # 1. Parse AST for every file
        for file in repo_path.rglob("*"):
            if file.suffix in SUPPORTED_EXTENSIONS:
                ast = self.parse_ast(file)

                # 2. Extract entities to graph
                for node in ast.nodes:
                    self.graph.upsert(Entity(
                        id=f"{file.path}:{node.name}",
                        type=node.type,  # function, class, table, config, etc.
                        file=file.path,
                        span=node.span,
                        signature=node.signature,
                        docstring=node.docstring,
                    ))

                # 3. Extract relationships
                for edge in ast.edges:
                    self.graph.upsert(Relation(
                        src=f"{file.path}:{edge.src}",
                        dst=f"{file.path}:{edge.dst}",
                        type=edge.type,  # calls, imports, inherits, references
                    ))

                # 4. Embed semantic chunks for vector search
                for chunk in self.chunk_semantically(ast):
                    self.vector.upsert(VectorRecord(
                        id=f"{file.path}:{chunk.id}",
                        text=chunk.text,
                        embedding=self.embed(chunk.text),
                        metadata={
                            "file": file.path,
                            "symbols": chunk.symbols,
                            "layer": self.classify_layer(file.path),
                        }
                    ))

        # 5. Cross-reference with infra/config/docs
        self.ingest_infra()
        self.ingest_docs()
        self.ingest_git_history()

    def query(self, task: Task) -> MemoryResult:
        # Hybrid query: vector + graph + kv
        vector_results = self.vector.search(task.query, k=20)
        graph_results = self.graph.query(task.query_graph())
        kv_results = self.kv.lookup(task.key_lookups())

        # Fuse with reciprocal rank fusion
        return self.fuse(vector_results, graph_results, kv_results)
Enter fullscreen mode Exit fullscreen mode

The knowledge graph entities aren't just code symbols. They include:

Entity Types:
├── Code: function, class, method, variable, type, interface
├── Data: table, column, index, migration, schema
├── Infra: service, deployment, configmap, secret, ingress, cronjob
├── Config: env_var, feature_flag, yaml, json, toml
├── Docs: adr, rfc, runbook, readme, changelog
├── People: owner, reviewer, oncall, author
├── Decision: adr_id, rationale, alternatives, status
└── Incident: incident_id, root_cause, resolution, action_items
Enter fullscreen mode Exit fullscreen mode

The graph edges capture data flow, control flow, dependency, ownership, temporal relationships:

function:auth:validate_token  ──calls──►  function:db:get_user
function:api:create_user      ──writes──►  table:users
table:users                   ──owned_by──►  team:backend
service:billing               ──depends_on──►  service:payments
adr:042                       ──decided_by──►  person:sarah
incident:INC-2024-0842        ──caused_by──►  migration:2024_08_15_add_idx
Enter fullscreen mode Exit fullscreen mode

When an agent asks "where does user_id flow from the gateway to billing?", the graph query is:

MATCH path = (src:Entity {name: "api_gateway"})-[:CALLS*1..5]->(dst:Entity {name: "billing"})
WHERE ANY(n IN nodes(path) WHERE n.name CONTAINS "user_id")
RETURN path
Enter fullscreen mode Exit fullscreen mode

RAG returns "the auth docs mention user_id." The graph returns the actual data flow path. The agent OS needs both.


The Tool Runtime: Sandbox + Drivers, Not "Tools"

Frameworks treat tools as functions. read_file(path) -> str. run_command(cmd) -> str. This is wrong. Tools are devices. They have state. They have side effects. They need drivers. They need sandboxes. They need resource limits. They need observability.

class ToolRuntime:
    def __init__(self, kernel):
        self.kernel = kernel
        self.sandbox = SandboxManager()
        self.drivers = {}
        self.registry = ToolRegistry()

    def register_driver(self, name: str, driver: ToolDriver):
        self.drivers[name] = driver
        self.registry.register(ToolSpec(
            name=name,
            driver=name,
            schema=driver.schema,
            capabilities=driver.capabilities,
            resource_limits=driver.limits,
        ))

    async def execute(self, agent_id: str, tool_call: ToolCall) -> ToolResult:
        driver = self.drivers[tool_call.tool]

        # 1. Check capabilities against agent policy
        if not self.kernel.policy.check(agent_id, driver.capabilities):
            return ToolResult.error("Capability denied by policy")

        # 2. Prepare sandbox (filesystem, network, process namespace)
        sandbox = await self.sandbox.prepare(agent_id, driver.requirements)

        # 3. Execute with timeout, resource limits, observability
        with self.kernel.observability.trace(f"tool:{tool_call.tool}") as span:
            span.set_attribute("agent_id", agent_id)
            span.set_attribute("tool_args", tool_call.args)

            try:
                result = await asyncio.wait_for(
                    driver.execute(sandbox, tool_call.args),
                    timeout=driver.limits.timeout_seconds
                )

                # 4. Capture side effects (file changes, process spawns, network)
                side_effects = await sandbox.capture_effects()

                # 5. Update context manager with results
                self.kernel.context.on_tool_result(agent_id, tool_call, result)

                # 6. Update memory manager with new knowledge
                self.kernel.memory.ingest_tool_result(tool_call, result, side_effects)

                return ToolResult.ok(result, side_effects)

            except asyncio.TimeoutError:
                await sandbox.terminate()
                return ToolResult.error("Tool timeout")
            except Exception as e:
                await sandbox.capture_state_for_debugging()
                return ToolResult.error(str(e))
Enter fullscreen mode Exit fullscreen mode

The filesystem driver doesn't just read_file. It maintains open file handles, cursor positions, watch notifications, diff tracking:

class FilesystemDriver(ToolDriver):
    def __init__(self, sandbox):
        self.sandbox = sandbox
        self.open_files = {}  # agent_id -> {path: FileHandle}
        self.watchers = {}    # path -> [agent_id]

    async def read_file(self, agent_id: str, path: str, range: Range = None) -> ToolResult:
        # Track open file for context promotion
        handle = self.open_files.setdefault(agent_id, {})[path] = FileHandle(
            path=path,
            content=await self.sandbox.read(path),
            cursor=0,
            version=await self.sandbox.get_version(path)
        )

        if range:
            content = handle.content[range.start:range.end]
            handle.cursor = range.end
        else:
            content = handle.content

        # Notify context manager: this file is now "active"
        self.kernel.context.promote_file(agent_id, path, handle.version)

        return ToolResult.ok(content, metadata={
            "path": path,
            "version": handle.version,
            "size": len(content),
            "mime": self.detect_mime(path)
        })

    async def write_file(self, agent_id: str, path: str, content: str, mode: WriteMode) -> ToolResult:
        # Create checkpoint before write
        checkpoint = await self.sandbox.checkpoint(path)

        # Write with atomic replace
        await self.sandbox.write(path, content, mode)

        # Notify watchers
        for watcher_id in self.watchers.get(path, []):
            self.kernel.context.notify_file_changed(watcher_id, path)

        # Update open handle
        if agent_id in self.open_files and path in self.open_files[agent_id]:
            self.open_files[agent_id][path].content = content
            self.open_files[agent_id][path].version += 1

        return ToolResult.ok(None, metadata={
            "path": path,
            "bytes_written": len(content),
            "checkpoint_id": checkpoint.id
        })
Enter fullscreen mode Exit fullscreen mode

The shell driver maintains persistent PTY sessions, not fire-and-forget commands. It tracks working directory, environment, process tree, output history:

class ShellDriver(ToolDriver):
    def __init__(self, sandbox):
        self.sessions = {}  # agent_id -> PTYSession

    async def execute(self, agent_id: str, command: str, timeout: int = 300) -> ToolResult:
        session = self.sessions.setdefault(agent_id, PTYSession(
            cwd=self.sandbox.workdir,
            env=self.sandbox.env,
            history=[]
        ))

        # Inject agent context into shell env
        session.env["AGENT_ID"] = agent_id
        session.env["AGENT_TASK"] = self.kernel.context.get_task(agent_id).summary()

        process = await session.spawn(command, timeout=timeout)

        # Stream output to context manager in real-time
        async for chunk in process.stdout:
            self.kernel.context.append_stdout(agent_id, chunk)
        async for chunk in process.stderr:
            self.kernel.context.append_stderr(agent_id, chunk)

        result = await process.wait()

        # Capture final state
        session.cwd = await session.get_cwd()
        session.env = await session.get_env()

        return ToolResult.ok({
            "stdout": result.stdout,
            "stderr": result.stderr,
            "exit_code": result.exit_code,
            "cwd": session.cwd,
            "duration_ms": result.duration_ms
        })
Enter fullscreen mode Exit fullscreen mode

The LSP driver is the secret weapon. It doesn't just "go to definition." It maintains a live language server per language per workspace, giving agents semantic navigation, not grep:

class LSPDriver(ToolDriver):
    def __init__(self, sandbox):
        self.servers = {}  # language -> LSPServer
        self.sandbox = sandbox

    async def ensure_server(self, language: str) -> LSPServer:
        if language not in self.servers:
            self.servers[language] = await LSPServer.start(
                language=language,
                workspace=self.sandbox.workdir,
                capabilities=[
                    "textDocument/definition",
                    "textDocument/references",
                    "textDocument/hover",
                    "textDocument/typeDefinition",
                    "textDocument/implementation",
                    "textDocument/documentSymbol",
                    "workspace/symbol",
                    "workspace/executeCommand",
                    "textDocument/codeAction",
                    "textDocument/rename",
                    "textDocument/semanticTokens/full",
                ]
            )
        return self.servers[language]

    async def find_references(self, agent_id: str, symbol: SymbolRef) -> ToolResult:
        server = await self.ensure_server(symbol.language)
        refs = await server.text_document_references(
            uri=symbol.file_uri,
            position=symbol.position,
            include_declaration=True
        )

        # Enrich with context
        enriched = []
        for ref in refs:
            content = await self.sandbox.read_range(ref.uri, ref.range)
            enriched.append(Reference(
                file=ref.uri,
                range=ref.range,
                context=content,
                kind=self.classify_reference(ref, symbol)
            ))

        # Promote all referenced files to L2 context
        for ref in enriched:
            self.kernel.context.promote_file(agent_id, ref.file)

        return ToolResult.ok(enriched)
Enter fullscreen mode Exit fullscreen mode

The Scheduler: Policy, Not Heuristics

Frameworks use heuristics. "If error, retry." "If task done, next task." An agent OS uses policy. Policy is code. Policy is versioned. Policy is testable. Policy is auditable.

class SchedulerPolicy:
    def __init__(self, kernel):
        self.kernel = kernel

    def decide_next(self, agent_id: str) -> SchedulingDecision:
        agent = self.kernel.agents[agent_id]
        task = agent.current_task
        context = self.kernel.context.get_state(agent_id)

        # 1. Check for hard stops
        if context.token_usage > self.kernel.config.max_tokens * 0.9:
            return SchedulingDecision.CHECKPOINT_AND_PAUSE

        if context.error_count > self.kernel.config.max_errors:
            return SchedulingDecision.ESCALATE_TO_HUMAN

        if context.wall_time > self.kernel.config.max_wall_time:
            return SchedulingDecision.CHECKPOINT_AND_PAUSE

        # 2. Check for completion signals
        if task.is_complete(context):
            return SchedulingDecision.COMPLETE_TASK

        # 3. Check for handoff conditions
        if task.requires_specialist(context):
            specialist = self.kernel.registry.find_specialist(task.required_capability)
            return SchedulingDecision.HANDOFF(specialist)

        # 4. Check for tool necessity
        if context.needs_external_info():
            return SchedulingDecision.USE_TOOL(context.suggested_tool)

        # 5. Check for context refresh
        if context.staleness > self.kernel.config.max_staleness:
            return SchedulingDecision.REFRESH_CONTEXT

        # 6. Default: continue with current model
        return SchedulingDecision.CONTINUE(model=self.select_model(task, context))

    def select_model(self, task: Task, context: Context) -> ModelSpec:
        # Policy: use cheapest model that meets capability requirements
        required = task.required_capabilities
        candidates = self.kernel.model_registry.filter(capabilities=required)

        # Prefer specialized models for specialized tasks
        if task.type == "code_generation":
            return candidates.best_for("coding")
        elif task.type == "reasoning":
            return candidates.best_for("reasoning")
        elif task.type == "tool_use":
            return candidates.best_for("function_calling")
        else:
            return candidates.cheapest()
Enter fullscreen mode Exit fullscreen mode

The scheduler doesn't just pick "next action." It decides which agent runs, with which model, with which tools, with which context, with which timeout, with which checkpoint policy. This is an OS scheduler. It manages processes (agents), not threads (tool calls).


Checkpointing: The Save/Resume That Actually Works

This is where every framework fails. They serialize "messages." Messages are not state. State is: open file handles, shell sessions, LSP connections, context manager tiers, memory manager indexes, tool execution history, error state, retry counters, human feedback, policy decisions.

class CheckpointManager:
    def __init__(self, kernel):
        self.kernel = kernel
        self.store = CheckpointStore()

    async def checkpoint(self, agent_id: str, reason: CheckpointReason) -> Checkpoint:
        agent = self.kernel.agents[agent_id]

        # 1. Checkpoint context manager (all 4 tiers)
        context_state = await self.kernel.context.checkpoint(agent_id)

        # 2. Checkpoint tool runtime (sandbox state)
        sandbox_state = await self.kernel.tools.sandbox.checkpoint(agent_id)

        # 3. Checkpoint agent state machine
        agent_state = agent.checkpoint()

        # 4. Checkpoint memory manager indexes
        memory_state = await self.kernel.memory.checkpoint(agent_id)

        # 5. Checkpoint scheduler state
        scheduler_state = self.kernel.scheduler.checkpoint(agent_id)

        checkpoint = Checkpoint(
            id=uuid4(),
            agent_id=agent_id,
            task_id=agent.current_task.id,
            reason=reason,
            timestamp=datetime.utcnow(),
            context=context_state,
            sandbox=sandbox_state,
            agent=agent_state,
            memory=memory_state,
            scheduler=scheduler_state,
            provenance=self.build_provenance(agent_id),
        )

        # 6. Store atomically
        await self.store.write(checkpoint)

        # 7. Notify observability
        self.kernel.observability.checkpoint_created(checkpoint)

        return checkpoint

    async def resume(self, checkpoint_id: str) -> Agent:
        checkpoint = await self.store.read(checkpoint_id)

        # 1. Restore sandbox (filesystem, processes, network)
        await self.kernel.tools.sandbox.restore(checkpoint.sandbox)

        # 2. Restore context manager tiers
        await self.kernel.context.restore(checkpoint.context)

        # 3. Restore memory indexes
        await self.kernel.memory.restore(checkpoint.memory)

        # 4. Restore agent state
        agent = await self.kernel.agents.restore(checkpoint.agent)

        # 5. Restore scheduler state
        self.kernel.scheduler.restore(checkpoint.scheduler)

        # 6. Verify consistency
        await self.verify_consistency(agent, checkpoint)

        return agent
Enter fullscreen mode Exit fullscreen mode

Checkpointing is not optional. It's not "save for later." It's transactional boundary. Every tool call that mutates state creates a checkpoint. Every model call that exceeds 50k tokens creates a checkpoint. Every error creates a checkpoint. Every human intervention creates a checkpoint.

This is how you get replay debugging. Not "look at the logs." Replay the exact agent execution from any checkpoint. Same model. Same context. Same tools. Same sandbox. Deterministic replay.


The Model Router: Not "Claude vs GPT," "Right Tool for Right Job"

Frontier agent OS doesn't "use Claude Code." It routes to models based on capability requirements, cost budgets, latency SLAs, and policy constraints.

class ModelRouter:
    def __init__(self, kernel):
        self.kernel = kernel
        self.registry = ModelRegistry()
        self.policies = RoutingPolicies()

    def route(self, request: ModelRequest) -> ModelHandle:
        # 1. Filter by capabilities
        candidates = self.registry.filter(
            capabilities=request.required_capabilities,
            max_latency=request.latency_budget,
            max_cost=request.cost_budget,
        )

        # 2. Apply routing policy
        policy = self.policies.get_policy_for(request.task_type)
        selected = policy.select(candidates, request.context)

        # 3. Reserve capacity
        handle = selected.reserve(request.estimated_tokens)

        # 4. Wrap with observability
        return ObservedModelHandle(handle, self.kernel.observability)

    def policy_for(self, task_type: TaskType) -> RoutingPolicy:
        return {
            TaskType.CODE_GENERATION: CodingPolicy(),
            TaskType.REASONING: ReasoningPolicy(),
            TaskType.TOOL_USE: FunctionCallingPolicy(),
            TaskType.CONTEXT_RICH: LargeContextPolicy(),
            TaskType.LOW_LATENCY: SpeedPolicy(),
            TaskType.HIGH_RELIABILITY: ConsensusPolicy(),
        }[task_type]


class CodingPolicy(RoutingPolicy):
    def select(self, candidates: List[Model], context: Context) -> Model:
        # Prefer models with strong coding benchmarks
        scored = []
        for m in candidates:
            score = (
                m.benchmark_score("swe_bench") * 0.4 +
                m.benchmark_score("human_eval") * 0.3 +
                m.benchmark_score("mbpp") * 0.2 +
                (1 / m.cost_per_1k_tokens) * 0.1
            )
            scored.append((score, m))

        return max(scored, key=lambda x: x[0])[1]


class ConsensusPolicy(RoutingPolicy):
    """For high-reliability tasks: run multiple models, vote."""
    def select(self, candidates: List[Model], context: Context) -> Model:
        # Return a composite model that runs 3 models and aggregates
        return ConsensusModel(
            models=candidates[:3],
            aggregator=MajorityVoteAggregator(),
            timeout=context.latency_budget
        )
Enter fullscreen mode Exit fullscreen mode

The router is policy-driven, observable, and auditable. Every routing decision is logged with: candidates considered, policy applied, scores, selected model, estimated cost, actual cost, latency, outcome.


Evaluation: The Product Is the Eval Harness

This is the part nobody builds. They build the agent. They don't build the evaluation harness that tells you if the agent is getting better or worse.

The agent OS is the eval harness. Every agent run produces an evaluation trace. Every checkpoint is an eval artifact. Every human intervention is a labeled example.

class EvaluationHarness:
    def __init__(self, kernel):
        self.kernel = kernel
        self.store = EvalStore()
        self.judges = JudgeRegistry()

    async def evaluate_task(self, task: Task, agent_run: AgentRun) -> EvalResult:
        # 1. Collect all artifacts
        artifacts = self.collect_artifacts(agent_run)

        # 2. Run automated judges
        auto_results = await self.run_automated_judges(task, artifacts)

        # 3. Queue for human review if needed
        if auto_results.needs_human_review:
            human_review = await self.queue_human_review(task, artifacts)
            auto_results.human_labels = human_review

        # 4. Compute metrics
        metrics = self.compute_metrics(task, auto_results)

        # 5. Store for training/analysis
        eval_result = EvalResult(
            task_id=task.id,
            run_id=agent_run.id,
            timestamp=datetime.utcnow(),
            metrics=metrics,
            auto_judgments=auto_results,
            artifacts=artifacts,
        )

        await self.store.write(eval_result)

        # 6. Feed back to memory manager for few-shot learning
        if metrics.success:
            await self.kernel.memory.add_positive_example(task, artifacts)
        else:
            await self.kernel.memory.add_negative_example(task, artifacts)

        return eval_result

    async def run_automated_judges(self, task: Task, artifacts: Artifacts) -> JudgeResults:
        judges = self.judges.for_task_type(task.type)
        results = {}

        for judge in judges:
            with self.kernel.observability.trace(f"judge:{judge.name}") as span:
                result = await judge.evaluate(task, artifacts)
                results[judge.name] = result
                span.set_attribute("score", result.score)

        return JudgeResults(results)
Enter fullscreen mode Exit fullscreen mode

Automated judges for code tasks:

class CodeCorrectnessJudge(Judge):
    async def evaluate(self, task: Task, artifacts: Artifacts) -> Judgment:
        # 1. Run tests in sandbox
        test_result = await self.run_tests(artifacts.sandbox)

        # 2. Static analysis
        lint_result = await self.run_linter(artifacts.changed_files)

        # 3. Type checking
        type_result = await self.run_type_checker(artifacts.changed_files)

        # 4. Security scan
        sec_result = await self.run_security_scan(artifacts.changed_files)

        # 5. Semantic diff vs requirements
        semantic_result = await self.semantic_diff(task.requirements, artifacts)

        score = (
            test_result.pass_rate * 0.4 +
            (1 - lint_result.error_rate) * 0.15 +
            (1 - type_result.error_rate) * 0.15 +
            (1 - sec_result.finding_rate) * 0.1 +
            semantic_result.similarity * 0.2
        )

        return Judgment(
            score=score,
            passed=score > 0.8,
            details={
                "tests": test_result,
                "lint": lint_result,
                "types": type_result,
                "security": sec_result,
                "semantic": semantic_result,
            }
        )


class ProductionReadinessJudge(Judge):
    async def evaluate(self, task: Task, artifacts: Artifacts) -> Judgment:
        checks = await asyncio.gather(
            self.check_migrations(artifacts),
            self.check_config_changes(artifacts),
            self.check_feature_flags(artifacts),
            self.check_observability(artifacts),
            self.check_rollback_plan(artifacts),
            self.check_load_test(artifacts),
        )

        return Judgment(
            score=sum(c.score for c in checks) / len(checks),
            passed=all(c.passed for c in checks),
            details={c.name: c for c in checks}
        )
Enter fullscreen mode Exit fullscreen mode

The eval harness produces training data for the router, few-shot examples for the memory manager, regression detection for the scheduler policy, reward signals for RLHF.


The Agent OS API: What Agents Actually See

Agents don't talk to the kernel directly. They talk to an agent runtime API that abstracts the OS:

class AgentRuntime:
    """The interface every agent sees. Hides the OS complexity."""

    def __init__(self, kernel, agent_id: str):
        self.kernel = kernel
        self.agent_id = agent_id
        self.context = kernel.context.view(agent_id)
        self.tools = kernel.tools.view(agent_id)
        self.memory = kernel.memory.view(agent_id)
        self.state = kernel.scheduler.state(agent_id)

    # Context operations
    def get_context(self, budget: TokenBudget = None) -> ContextBundle:
        return self.kernel.context.prepare(self.agent_id, budget)

    def focus_on(self, target: FocusTarget):
        """Tell context manager: this is what matters now."""
        self.kernel.context.set_focus(self.agent_id, target)

    def remember(self, key: str, value: Any, tier: MemoryTier = MemoryTier.L2):
        self.kernel.memory.store(self.agent_id, key, value, tier)

    def recall(self, query: str, k: int = 10) -> List[MemoryItem]:
        return self.kernel.memory.query(self.agent_id, query, k)

    # Tool operations
    async def call(self, tool: str, **args) -> ToolResult:
        return await self.kernel.tools.execute(self.agent_id, tool, args)

    async def read_file(self, path: str, range: Range = None) -> ToolResult:
        return await self.call("read_file", path=path, range=range)

    async def write_file(self, path: str, content: str, mode: WriteMode = "replace") -> ToolResult:
        return await self.call("write_file", path=path, content=content, mode=mode)

    async def run_command(self, cmd: str, timeout: int = 300) -> ToolResult:
        return await self.call("shell", command=cmd, timeout=timeout)

    async def find_references(self, symbol: SymbolRef) -> ToolResult:
        return await self.call("lsp_references", symbol=symbol)

    async def search_code(self, query: str, filters: SearchFilters = None) -> ToolResult:
        return await self.call("code_search", query=query, filters=filters)

    # Control flow
    def checkpoint(self, reason: str = "manual") -> Checkpoint:
        return self.kernel.checkpoint(self.agent_id, reason)

    def handoff(self, specialist: str, task: Task) -> HandoffResult:
        return self.kernel.scheduler.handoff(self.agent_id, specialist, task)

    def request_human(self, question: str, context: dict) -> HumanResponse:
        return self.kernel.scheduler.request_human(self.agent_id, question, context)

    def complete(self, result: TaskResult) -> CompletionResult:
        return self.kernel.scheduler.complete(self.agent_id, result)
Enter fullscreen mode Exit fullscreen mode

An agent implementation becomes remarkably simple:

class RefactoringAgent:
    def __init__(self, runtime: AgentRuntime):
        self.rt = runtime

    async def run(self, task: RefactoringTask) -> TaskResult:
        # 1. Load context
        self.rt.focus_on(FocusTarget(files=task.affected_files))

        # 2. Explore codebase
        references = await self.rt.find_references(task.target_symbol)
        self.rt.remember("references", references)

        # 3. Understand impact
        impact = await self.analyze_impact(references)

        # 4. Plan refactoring
        plan = await self.create_plan(impact)

        # 5. Execute with checkpoints
        for step in plan.steps:
            checkpoint = self.rt.checkpoint(f"before_step_{step.id}")

            try:
                await self.execute_step(step)
            except Exception as e:
                # Rollback and try alternative
                await self.rt.rollback(checkpoint)
                await self.try_alternative(step)

        # 6. Verify
        await self.rt.run_command("pytest -xvs")
        await self.rt.run_command("mypy .")

        # 7. Complete
        return self.rt.complete(TaskResult(
            success=True,
            changes=plan.changes,
            tests_passed=True,
        ))
Enter fullscreen mode Exit fullscreen mode

Production Patterns That Actually Work

Pattern 1: Context Budgeting as First-Class Concern

Every agent run has a token budget. The context manager enforces it. The scheduler respects it. The model router optimizes for it.

class TokenBudget:
    def __init__(self, total: int, reserves: dict):
        self.total = total
        self.reserves = reserves  # {"system": 2000, "tools": 8000, "output": 4000}
        self.allocated = 0

    @property
    def available_for_context(self) -> int:
        return self.total - sum(self.reserves.values()) - self.allocated

    def allocate(self, purpose: str, tokens: int) -> bool:
        if self.allocated + tokens <= self.available_for_context:
            self.allocated += tokens
            return True
        return False
Enter fullscreen mode Exit fullscreen mode

The context manager never exceeds budget. It compresses, summarizes, evicts, promotes—whatever it takes. The agent never sees "context too long" errors. It just sees less context.

Pattern 2: Speculative Execution with Rollback

For expensive operations (large refactors, migrations), the agent OS runs speculative execution:

async def speculative_execute(self, agent_id: str, plan: Plan) -> SpeculativeResult:
    # 1. Checkpoint current state
    checkpoint = await self.checkpoint(agent_id, "pre_speculative")

    # 2. Fork sandbox
    forked_sandbox = await self.sandbox.fork(checkpoint.sandbox)

    # 3. Execute in forked sandbox with relaxed limits
    forked_agent = await self.spawn_agent(
        sandbox=forked_sandbox,
        policy=SpeculativePolicy(max_tokens=500000, max_time=1800)
    )

    result = await forked_agent.run(plan)

    # 4. Evaluate result
    eval_result = await self.eval_harness.evaluate(plan.task, result)

    if eval_result.passed:
        # 5. Merge sandbox changes back
        await self.sandbox.merge(forked_sandbox, checkpoint.sandbox)
        return SpeculativeResult.success(result)
    else:
        # 6. Discard fork, learn from failure
        await self.memory.store_failure(plan, eval_result)
        return SpeculativeResult.failed(eval_result)
Enter fullscreen mode Exit fullscreen mode

This is how you get agents to do big, risky refactors without blowing up production. They rehearse in a fork. Only successful rehearsals merge.

Pattern 3: Human-in-the-Loop as First-Class Scheduler Primitive

Humans aren't "fallback." They're policy decision points.

class HumanInTheLoop:
    def __init__(self, kernel):
        self.kernel = kernel
        self.queue = HumanReviewQueue()

    async def request_decision(self, agent_id: str, decision: DecisionRequest) -> Decision:
        # 1. Checkpoint agent
        checkpoint = await self.kernel.checkpoint(agent_id, "human_review")

        # 2. Prepare context for human
        context = await self.prepare_human_context(agent_id, decision)

        # 3. Queue for review (Slack, GitHub, PagerDuty, custom UI)
        review_id = await self.queue.submit(HumanReview(
            agent_id=agent_id,
            checkpoint_id=checkpoint.id,
            decision=decision,
            context=context,
            timeout=decision.timeout,
        ))

        # 4. Wait for response (with agent paused)
        response = await self.queue.wait(review_id)

        # 5. Resume agent with decision
        await self.kernel.resume_with_decision(agent_id, checkpoint, response)

        return response
Enter fullscreen mode Exit fullscreen mode

Decision types:

  • ApprovePlan(plan) — "Is this refactoring plan correct?"
  • ChooseOption(options) — "Which migration strategy?"
  • ProvideContext(question) — "What's the business rule for this edge case?"
  • AuthorizeAction(action) — "Deploy this to production?"
  • LabelOutcome(outcome) — "Was this fix correct?"

Every human decision becomes training data. Every decision is logged with context. The system learns when to ask, what to ask, and how to present context.

Pattern 4: Multi-Agent Workflows as State Machines, Not Chains

LangGraph et al. model workflows as graphs. Production agent OS models them as hierarchical state machines with checkpoints at every transition.

class MigrationWorkflow(StateMachine):
    states = [
        "analyzing",
        "planning", 
        "reviewing_plan",
        "executing_migration",
        "validating",
        "canarying",
        "promoting",
        "completed",
        "rolled_back",
        "failed",
    ]

    transitions = {
        "analyzing": ["planning", "failed"],
        "planning": ["reviewing_plan", "failed"],
        "reviewing_plan": ["executing_migration", "planning", "failed"],
        "executing_migration": ["validating", "rolled_back", "failed"],
        "validating": ["canarying", "rolled_back", "failed"],
        "canarying": ["promoting", "rolled_back", "failed"],
        "promoting": ["completed", "rolled_back", "failed"],
    }

    def on_enter_analyzing(self, context):
        # Spawn codebase analysis agent
        agent = self.spawn_agent("codebase_analyst", context)
        return agent.run(AnalyzeMigrationTask(...))

    def on_enter_planning(self, context):
        # Spawn planning agent with analysis results
        agent = self.spawn_agent("migration_planner", context)
        return agent.run(CreateMigrationPlanTask(...))

    def on_enter_reviewing_plan(self, context):
        # Human reviews plan
        return self.request_human(PlanReviewDecision(...))

    def on_enter_executing_migration(self, context):
        # Spawn execution agent with speculative execution
        agent = self.spawn_agent("migration_executor", context)
        return agent.run_speculative(ExecuteMigrationTask(...))

    def on_enter_validating(self, context):
        # Run validation suite
        return self.run_validation(context)

    def on_enter_canarying(self, context):
        # Deploy to 1% traffic
        return self.deploy_canary(context)

    def on_enter_promoting(self, context):
        # Human approves full rollout
        return self.request_human(PromotionDecision(...))
Enter fullscreen mode Exit fullscreen mode

Each state transition creates a checkpoint. Failure at any state → automatic rollback to last good checkpoint. Human review at critical states. Full observability at every step.


The Observability Stack: You Can't Debug What You Can't See

Agent OS observability isn't "logs." It's distributed tracing across model calls, tool calls, context changes, memory operations, scheduler decisions, and human interactions.

class AgentObservability:
    def __init__(self):
        self.tracer = Tracer("agent-os")
        self.metrics = MetricsCollector()
        self.logger = StructuredLogger()

    @contextmanager
    def trace_agent_run(self, agent_id: str, task: Task):
        with self.tracer.start_span(f"agent.run.{task.type}") as span:
            span.set_attribute("agent.id", agent_id)
            span.set_attribute("task.id", task.id)
            span.set_attribute("task.type", task.type)

            start_time = time.time()
            token_start = self.get_token_usage(agent_id)

            try:
                yield span

                span.set_attribute("status", "success")
                span.set_attribute("duration_ms", (time.time() - start_time) * 1000)
                span.set_attribute("tokens_used", self.get_token_usage(agent_id) - token_start)

            except Exception as e:
                span.set_attribute("status", "error")
                span.set_attribute("error.type", type(e).__name__)
                span.set_attribute("error.message", str(e))
                raise

    @contextmanager
    def trace_model_call(self, agent_id: str, model: ModelSpec, purpose: str):
        with self.tracer.start_span(f"model.call.{purpose}") as span:
            span.set_attribute("agent.id", agent_id)
            span.set_attribute("model.name", model.name)
            span.set_attribute("model.provider", model.provider)
            span.set_attribute("purpose", purpose)

            start = time.time()
            yield span

            span.set_attribute("latency_ms", (time.time() - start) * 1000)
            span.set_attribute("input_tokens", self.last_input_tokens)
            span.set_attribute("output_tokens", self.last_output_tokens)
            span.set_attribute("cost_usd", self.last_cost)

    @contextmanager
    def trace_tool_call(self, agent_id: str, tool: str, args: dict):
        with self.tracer.start_span(f"tool.call.{tool}") as span:
            span.set_attribute("agent.id", agent_id)
            span.set_attribute("tool.name", tool)
            span.set_attribute("tool.args", json.dumps(args, default=str))

            start = time.time()
            try:
                yield span
                span.set_attribute("status", "success")
            except Exception as e:
                span.set_attribute("status", "error")
                span.set_attribute("error", str(e))
                raise
            finally:
                span.set_attribute("duration_ms", (time.time() - start) * 1000)

    def trace_context_change(self, agent_id: str, change: ContextChange):
        self.metrics.increment("context.changes", tags={
            "agent": agent_id,
            "tier": change.tier,
            "operation": change.operation,
        })

        if change.tokens_delta > 1000:
            self.logger.warning("Large context change", 
                agent_id=agent_id, 
                delta=change.tokens_delta,
                operation=change.operation)

    def trace_scheduler_decision(self, agent_id: str, decision: SchedulingDecision):
        self.metrics.increment("scheduler.decisions", tags={
            "agent": agent_id,
            "decision": decision.type,
            "model": decision.model.name if decision.model else "none",
        })
Enter fullscreen mode Exit fullscreen mode

Dashboards that matter:

  • Token efficiency: tokens per successful task completion
  • Tool success rate: by tool, by agent type, by task type
  • Context churn: how often L1/L2/L3 are refreshed
  • Checkpoint frequency: per task type, per agent type
  • Human intervention rate: when, why, outcome
  • Model routing accuracy: predicted vs actual cost/latency/quality
  • Speculative execution success rate: how often forks merge
  • Rollback frequency: and why

Building It: The Actual Stack

You don't build this from scratch. You compose it from primitives that exist:

Layer Technology Choices Why
Sandbox gVisor / Firecracker / Kata Containers / nsjail Hardware-virtualized isolation, fast snapshots
Filesystem OverlayFS + FUSE + inotify Copy-on-write, instant checkpoints, change notifications
Shell asyncssh + asyncio PTY Persistent sessions, stream output, process control
LSP pyright, typescript-language-server, gopls, rust-analyzer, clangd Semantic code intelligence per language
Vector DB Qdrant / Weaviate / Pinecone / pgvector Hybrid search, filtering, scalable
Graph DB Kuzu / FalkorDB / Neo4j / Kuzu (embedded) Property graph, Cypher, embedded option
KV Store Redis / Dragonfly / Valkey / etcd Low-latitude, pub/sub for cache invalidation
Checkpoint Store S3 / GCS / MinIO + SQLite index Immutable, versioned, queryable
Task Queue Temporal / Hatchet / Celery + Redis Durable execution, retries, scheduling
Model Gateway LiteLLM / Portkey / Helicone / custom Routing, fallbacks, observability, budgets
Observability OpenTelemetry + Jaeger/Tempo + Prometheus/Grafana Vendor-neutral, distributed tracing
Policy Engine OPA / Cedar / custom Rust Decidable, auditable, testable policies

The kernel itself is ~3,000 lines of Rust or Go. The drivers are ~500 lines each. The scheduler policy is ~1,000 lines of Python (for iteration speed). The context manager is ~2,000 lines. The memory manager is ~3,000 lines. The eval harness is ~2,000 lines.

Total: ~15,000 lines of core OS code. The rest is drivers, policies, judges, agents.


The Hard Parts Nobody Tells You

1. Context Compression Is Lossy Compression

You will lose information. The question is what you can afford to lose.

Compression Strategy by Tier:
┌─────────────┬─────────────────────────────────────────────────────────┐
│ L1 → L2     │ Keep: exact tool results, error messages, open files    │
│             │ Summarize: model reasoning, verbose tool output         │
│             │ Drop:   duplicate context, old scratchpad               │
├─────────────┼─────────────────────────────────────────────────────────┤
│ L2 → L3     │ Keep:   AST summaries, symbol defs, key config          │
│             │ Embed:  file contents, doc sections, conversation       │
│             │ Graph:  relationships, data flow, ownership             │
│             │ Drop:   verbatim source (retrievable from git)          │
├─────────────┼─────────────────────────────────────────────────────────┤
│ L3 → L4     │ Keep:   checkpoints, decisions, outcomes, labels        │
│             │ Archive: full context bundles for replay                │
│             │ Index:  for retrieval                                   │
└─────────────┴─────────────────────────────────────────────────────────┘
Enter fullscreen mode Exit fullscreen mode

The compression algorithm is the product. Get it wrong and agents hallucinate. Get it right and they seem omniscient.

2. The Sandbox Is the Attack Surface

Every tool is a potential escape. The filesystem driver, shell driver, network driver—they all need capability-based security, not ACLs.

// Capability-based sandbox policy
struct SandboxPolicy {
    // Filesystem capabilities
    fs_read: CapabilitySet<PathPattern>,
    fs_write: CapabilitySet<PathPattern>,
    fs_exec: CapabilitySet<PathPattern>,

    // Network capabilities
    net_egress: CapabilitySet<CidrBlock>,
    net_ingress: CapabilitySet<PortRange>,

    // Process capabilities
    proc_spawn: CapabilitySet<CommandPattern>,
    proc_ptrace: bool,

    // Resource limits
    max_memory_mb: u64,
    max_cpu_seconds: u64,
    max_pids: u32,
    max_open_files: u32,

    // Seccomp profile
    seccomp_profile: SeccompProfile,
}

// Each agent gets a policy derived from its role
fn policy_for_agent(role: AgentRole) -> SandboxPolicy {
    match role {
        AgentRole::CodeAnalyzer => SandboxPolicy {
            fs_read: CapabilitySet::allow_all(),
            fs_write: CapabilitySet::deny_all(),
            net_egress: CapabilitySet::deny_all(),
            proc_spawn: CapabilitySet::allow(["git", "grep", "rg", "ast-grep"]),
            max_memory_mb: 2048,
            seccomp_profile: SeccompProfile::ReadOnly,
        },
        AgentRole::Refactorer => SandboxPolicy {
            fs_read: CapabilitySet::allow_all(),
            fs_write: CapabilitySet::allow(["**/*.rs", "**/*.py", "**/*.ts", "**/*.go"]),
            net_egress: CapabilitySet::deny_all(),
            proc_spawn: CapabilitySet::allow(["cargo", "pytest", "mypy", "go", "npm"]),
            max_memory_mb: 8192,
            seccomp_profile: SeccompProfile::BuildTools,
        },
        AgentRole::Deployer => SandboxPolicy {
            fs_read: CapabilitySet::allow(["**/*.yaml", "**/*.yml", "**/Dockerfile*"]),
            fs_write: CapabilitySet::deny_all(),
            net_egress: CapabilitySet::allow(["kubernetes.default.svc", "registry.internal"]),
            proc_spawn: CapabilitySet::allow(["kubectl", "helm", "docker", "skaffold"]),
            max_memory_mb: 4096,
            seccomp_profile: SeccompProfile::DeployTools,
        },
    }
}
Enter fullscreen mode Exit fullscreen mode

No agent runs as root. No agent has unrestricted network. No agent spawns arbitrary processes. The sandbox is the security boundary.

3. Model Non-Determinism Breaks Replay

You checkpoint everything. You replay. The model gives a different answer. Replay breaks.

Solutions:

  • Temperature = 0 for all production model calls (deterministic sampling)
  • Seed fixation where providers support it (OpenAI seed param, Anthropic doesn't yet)
  • Logprobs capture — store top-k logprobs at each step, verify replay matches
  • Consensus verification — run 3 models, require 2/3 agreement for critical decisions
  • Deterministic tool schemas — no optional fields, strict validation, no "creative" tool use
class DeterministicModelWrapper:
    def __init__(self, model: ModelHandle):
        self.model = model

    async def complete(self, request: CompletionRequest) -> CompletionResponse:
        # Force deterministic parameters
        request = request.model_copy(update={
            "temperature": 0.0,
            "top_p": 1.0,
            "seed": request.seed or self.generate_seed(request),
        })

        response = await self.model.complete(request)

        # Capture logprobs for replay verification
        if request.logprobs:
            self.store_logprobs(request.trace_id, response.logprobs)

        return response

    def verify_replay(self, trace_id: str, replay_response: CompletionResponse) -> bool:
        original_logprobs = self.get_logprobs(trace_id)
        replay_logprobs = replay_response.logprobs

        # Verify top token matches at each position
        for orig, replay in zip(original_logprobs, replay_logprobs):
            if orig.top_token != replay.top_token:
                return False
            if abs(orig.top_logprob - replay.top_logprob) > 0.01:
                return False

        return True
Enter fullscreen mode Exit fullscreen mode

4. The Context Manager Is a Real-Time System

It has hard deadlines. The model call must receive context within the latency budget. Context promotion/demotion/compression cannot block the scheduler.

// Rust context manager for real-time guarantees
pub struct ContextManager {
    l1: Arc<Mutex<LruCache<ContextKey, ContextEntry>>>,
    l2: Arc<Mutex<LruCache<ContextKey, ContextEntry>>>,
    l3: Arc<VectorGraphDb>,
    l4: Arc<CheckpointStore>,

    // Background workers
    promotion_tx: mpsc::Sender<PromotionTask>,
    compression_tx: mpsc::Sender<CompressionTask>,
    checkpoint_tx: mpsc::Sender<CheckpointTask>,
}

impl ContextManager {
    // MUST complete in < 5ms
    pub fn prepare_context(&self, req: ContextRequest) -> ContextBundle {
        let mut l1 = self.l1.lock().unwrap();
        let mut l2 = self.l2.lock().unwrap();

        // Fast path: L1 only
        if req.budget < L1_BUDGET {
            return ContextBundle::from_l1(l1.get_relevant(req.query));
        }

        // Medium path: L1 + promoted L2
        let l2_items = self.promote_l2_to_l1(&mut l1, &mut l2, req);

        // Slow path: async L3 promotion (fire and forget)
        if req.budget > L2_BUDGET {
            self.promotion_tx.try_send(PromotionTask {
                agent_id: req.agent_id,
                query: req.query,
                target_tier: Tier::L2,
            }).ok(); // Drop if busy - non-blocking
        }

        ContextBundle::from_tiers(l1, l2, req.budget)
    }
}
Enter fullscreen mode Exit fullscreen mode

The context manager never blocks on I/O. L3 promotion happens async. Compression happens async. Checkpointing happens async. The synchronous path is L1 + L2 only, bounded by token budget.


What This Actually Buys You

An agent OS built this way doesn't just "work better." It enables workflows that are impossible with frameworks:

Workflow Framework Approach Agent OS Approach
50-file refactor across 3 services Agent hits context limit, hallucinates imports, breaks tests Speculative execution in forked sandbox, checkpoint per file, validation gates, human review at plan stage
Production incident debug at 3AM Agent reads logs, guesses root cause, suggests random fixes Graph query traces request flow, correlates with recent deploys, checks related incidents, proposes targeted fix with rollback plan
Database migration with zero downtime Agent writes migration, hopes for best Analyzes schema + queries + access patterns, generates backward-compatible migration + deployment plan + validation suite + canary strategy, human gates at each stage
New feature across FE/BE/API/DB Agent tries to write everything, loses coherence Planner agent decomposes, specialist agents execute with shared context, integration agent validates contracts, deployment agent stages rollout
Legacy codebase modernization Agent rewrites file by file, introduces regressions Analysis agent builds knowledge graph, planning agent creates phased strategy, executor agents work in parallel with shared memory, validation agents run continuously

The Honest Assessment

Building this takes 6-12 months for a team of 3-5 engineers who know distributed systems, PL tooling, and LLM behavior. It's not a side project. It's not a weekend hack.

But here's the thing: every company deploying agents to production is building pieces of this anyway. They're building context management. They're building sandboxing. They're building evaluation. They're building routing. They're building observability. They're just building them as ad-hoc scripts inside their agent code instead of as a reusable runtime.

The frontier agent OS is the Linux for AI agents. We're in the 1991 phase. Everyone's writing their own kernel. Soon someone will write the one that everyone else builds on.

If you're building agents for production, you have two choices:

  1. Build the OS — invest 6-12 months, own the runtime, compound the investment
  2. Wait for the OS — build on frameworks, hit the ceiling, rewrite later

There is no third option. The complexity doesn't disappear. It just moves from "runtime you control" to "framework you fight."

I chose option 1 eighteen months ago. The agents I run today deploy to production, debug incidents, refactor legacy systems, and write features across the stack. They checkpoint. They rollback. They ask humans. They learn from every run.

They don't hallucinate file paths. They don't forget the schema. They don't lose context at 94%.

They just work. Because underneath them is an operating system that makes sure they can.


Appendix: Minimal Viable Agent OS (If You Must Start Smaller)

You can't build the full thing tomorrow. But you can build the critical path:

# mvp_agent_os.py — ~500 lines that changes everything
import asyncio
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional
from pathlib import Path
import json
import hashlib
import time

@dataclass
class ContextTier:
    name: str
    max_tokens: int
    data: Dict[str, Any] = field(default_factory=dict)
    access_times: Dict[str, float] = field(default_factory=dict)

@dataclass
class SimpleContextManager:
    tiers: Dict[str, ContextTier] = field(default_factory=lambda: {
        "L1": ContextTier("L1", 8000),      # Immediate
        "L2": ContextTier("L2", 32000),     # Working
        "L3": ContextTier("L3", 200000),    # Archival (simulated)
    })

    def promote(self, key: str, value: Any, from_tier: str, to_tier: str):
        if key in self.tiers[from_tier].data:
            del self.tiers[from_tier].data[key]
        self.tiers[to_tier].data[key] = value
        self.tiers[to_tier].access_times[key] = time.time()
        self._enforce_budget(to_tier)

    def _enforce_budget(self, tier_name: str):
        tier = self.tiers[tier_name]
        # Simple LRU eviction
        while self._estimate_tokens(tier.data) > tier.max_tokens:
            oldest = min(tier.access_times, key=tier.access_times.get)
            del tier.data[oldest]
            del tier.access_times[oldest]

    def _estimate_tokens(self, data: Dict) -> int:
        return sum(len(json.dumps(v)) // 4 for v in data.values())

    def build_context(self, budget: int = 50000) -> str:
        parts = []
        remaining = budget

        for tier_name in ["L1", "L2", "L3"]:
            tier = self.tiers[tier_name]
            tier_content = json.dumps(tier.data, indent=2)
            tier_tokens = len(tier_content) // 4

            if tier_tokens <= remaining:
                parts.append(f"=== {tier_name} ===\n{tier_content}")
                remaining -= tier_tokens
            else:
                # Truncate L3
                if tier_name == "L3":
                    truncated = tier_content[:remaining * 4]
                    parts.append(f"=== {tier_name} (truncated) ===\n{truncated}")
                break

        return "\n\n".join(parts)

@dataclass
class SimpleSandbox:
    workdir: Path
    checkpoints: Dict[str, Dict] = field(default_factory=dict)

    async def read(self, path: str) -> str:
        return (self.workdir / path).read_text()

    async def write(self, path: str, content: str):
        (self.workdir / path).write_text(content)

    async def run(self, cmd: str, timeout: int = 60) -> dict:
        proc = await asyncio.create_subprocess_shell(
            cmd, cwd=self.workdir,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        try:
            stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout)
            return {"stdout": stdout.decode(), "stderr": stderr.decode(), "code": proc.returncode}
        except asyncio.TimeoutError:
            proc.kill()
            return {"stdout": "", "stderr": "TIMEOUT", "code": -1}

    def checkpoint(self, name: str):
        # Snapshot key files (simplified)
        self.checkpoints[name] = {
            "files": {str(f.relative_to(self.workdir)): f.read_text() 
                     for f in self.workdir.rglob("*") if f.is_file()},
            "time": time.time()
        }

    def restore(self, name: str):
        if name in self.checkpoints:
            for path, content in self.checkpoints[name]["files"].items():
                (self.workdir / path).write_text(content)

@dataclass
class SimpleAgentOS:
    context: SimpleContextManager = field(default_factory=SimpleContextManager)
    sandbox: SimpleSandbox = field(default_factory=lambda: SimpleSandbox(Path.cwd()))
    checkpoints: Dict[str, Any] = field(default_factory=dict)
    token_usage: int = 0

    async def run_agent(self, agent_fn, task: str, model_fn, max_turns: int = 20):
        # Initialize context
        self.context.promote("task", task, "L1", "L1")

        for turn in range(max_turns):
            # Build context
            context_str = self.context.build_context()

            # Call model
            response = await model_fn(context_str)
            self.token_usage += len(context_str) // 4 + len(response) // 4

            # Parse tool calls (simplified)
            tool_calls = self.parse_tool_calls(response)

            if not tool_calls:
                # Agent done
                break

            # Execute tools
            for call in tool_calls:
                result = await self.execute_tool(call)

                # Promote results to context
                self.context.promote(
                    f"tool:{call['name']}:{turn}", 
                    {"args": call["args"], "result": result},
                    "L1", "L1"
                )

                # If file read, promote to L2
                if call["name"] == "read_file":
                    self.context.promote(
                        f"file:{call['args']['path']}",
                        result,
                        "L1", "L2"
                    )

            # Checkpoint every 5 turns
            if turn % 5 == 0:
                self.checkpoint(f"turn_{turn}")

        return {"turns": turn, "tokens": self.token_usage}

    async def execute_tool(self, call: dict) -> Any:
        name = call["name"]
        args = call["args"]

        if name == "read_file":
            return await self.sandbox.read(args["path"])
        elif name == "write_file":
            await self.sandbox.write(args["path"], args["content"])
            return {"ok": True}
        elif name == "run_command":
            return await self.sandbox.run(args["command"])
        elif name == "search_code":
            # Simplified: grep
            result = await self.sandbox.run(f"rg -n {args['pattern']} {args.get('path', '.')}")
            return result["stdout"]
        else:
            return {"error": f"Unknown tool: {name}"}

    def checkpoint(self, name: str):
        self.checkpoints[name] = {
            "context": {k: v.data.copy() for k, v in self.context.tiers.items()},
            "sandbox": self.sandbox.checkpoints.copy(),
            "tokens": self.token_usage,
            "time": time.time(),
        }
        self.sandbox.checkpoint(name)

    def restore(self, name: str):
        if name in self.checkpoints:
            cp = self.checkpoints[name]
            for tier_name, data in cp["context"].items():
                self.context.tiers[tier_name].data = data
            self.sandbox.checkpoints = cp["sandbox"]
            self.token_usage = cp["tokens"]
            self.sandbox.restore(name)

# Usage
async def main():
    os = SimpleAgentOS()

    async def my_model(context: str) -> str:
        # Call your model here (Claude, GPT, etc.)
        return await call_model(context)

    result = await os.run_agent(
        agent_fn=None,  # Not used in this simple version
        task="Refactor the auth module to use JWT instead of sessions",
        model_fn=my_model,
        max_turns=30
    )
    print(result)

# This 500-line MVP gives you:
# 1. Tiered context management with budgets
# 2. Sandbox with checkpoint/restore
# 3. Tool execution with result promotion
# 4. Turn-based agent loop with token tracking
# 5. Foundation to build the real thing on
Enter fullscreen mode Exit fullscreen mode

Start there. But know: the MVP is not the OS. The OS is what you build when the MVP hits the wall—and it will. The wall is usually context management, sandbox fidelity, or evaluation. The OS solves all three.


Related Reading: Context Engineering for Production Agents · Building Reliable Agent Evaluations · Sandbox Security for Code Agents

Top comments (0)