TL;DR: While building AgentFlow, an open source orchestration engine for AI agents, I discovered fundamental differences from container orchestration. Kubernetes assumes deterministic workloads; agents are non-deterministic reasoning systems. This post explores the architectural challenges I identified and the design decisions I made to address them.
Note: AgentFlow is a personal side project built to explore agent orchestration challenges. The observations and technical decisions in this post reflect my individual learning and experimentation, and do not represent the views, products, or architecture of my employer. All code examples are from the open source AgentFlow project.
Introduction: The orchestration illusion
When I started building AgentFlow, the pitch was simple: "Kubernetes for AI agents." The analogy made sense, both systems schedule workloads, manage resources, and handle failures. One month into initial version, I learned why that comparison falls apart.
Kubernetes assumes your workload is a deterministic function: same input → same output. Containers crash cleanly. Resource needs are predictable. State is either ephemeral or in a database.
Agents break every assumption:
- Non-deterministic execution: Same prompt generates different responses (temperature, model updates, context window variations)
- Ambiguous failures: Agent produces output, but is it correct?
- Distributed state: Reasoning context, tool outputs, external API mutations, LLM chat history
- Dynamic resource needs: Token quotas, model availability, cost constraints, latency requirements
- Recursive decomposition: Agent spawns sub-agents at runtime based on task complexity
This post explores why these differences make agent orchestration an order of magnitude harder, with concrete examples from production systems.
1. The non-determinism problem: When retry isn't idempotent
Kubernetes assumption
# Pod fails → K8s restarts it
# Same image + same config = same behavior
apiVersion: v1
kind: Pod
spec:
containers:
- name: auth-service
image: auth:v1.2.3
restartPolicy: Always
Restart is safe because containers are deterministic. Same inputs → same outputs.
Agent reality
# Agent task: "Refactor database query for performance"
response = llm.generate(
prompt="Optimize this SQL query for performance:\n{query}",
temperature=0.7 # Non-zero temperature = non-deterministic
)
# Attempt 1: Adds index on user_id
# Attempt 2: Rewrites as JOIN instead of subquery
# Attempt 3: Suggests denormalization
# Which is "correct"? All could work. Or none.
Implication: Retry logic is ambiguous:
- Should we retry with same prompt? (might get worse output)
- Different temperature? (changes behavior profile)
- Different model? (GPT-5 vs Sonnet-4.5 - different reasoning styles)
- Add few-shot examples from previous attempts? (context pollution)
Real production failure
I had an agent that wrote Terraform configurations. On retry after timeout:
- First attempt: Created 90% of infrastructure
- Retry: Generated different resource names
- Result: Duplicate infrastructure, half-configured state
K8s equivalent would be: Pod restarts and creates new database tables with different schemas each time.
Our solution: Semantic checkpointing
struct AgentCheckpoint {
task_id: Uuid,
reasoning_trace: Vec<ReasoningStep>, // What agent decided and why
tool_outputs: HashMap<String, ToolResult>, // External state mutations
partial_results: Vec<Artifact>, // Code, configs, etc.
context_hash: String, // Hash of prompt + context for replay detection
}
// On failure, we:
// 1. Load checkpoint
// 2. Replay tool outputs (don't re-execute)
// 3. Resume with explicit "continue from step N" prompt
// 4. Compare context_hash to detect if inputs changed
Key insight: Checkpoints must capture intent and reasoning, not just state. When resuming, agent needs to understand what it was trying to do, not just what it did.
2. Failure detection: When "success" is ambiguous
Kubernetes failure modes
livenessProbe:
httpGet:
path: /healthz
port: 8080
failureThreshold: 3
readinessProbe:
tcpSocket:
port: 8080
Binary outcome: process alive/dead, port open/closed, HTTP 200/500.
Agent failure modes
# Agent task: "Write unit tests for auth module"
output = agent.execute(task)
# Agent returns HTTP 200 with:
"""
def test_login():
user = User("test")
assert user.login() == True # Useless test
"""
# Questions:
# - Is this a "failure"? Code is syntactically valid
# - Test doesn't actually validate auth logic
# - How do we detect this programmatically?
Failure categories I've encountered:
- Syntactic failure: Invalid code, malformed JSON (easy to detect)
- Semantic failure: Valid code that doesn't solve the task (hard)
- Partial completion: 70% correct, 30% missing (do I retry the whole task?)
- Hallucinated success: Agent claims completion but didn't execute tools
- Silent degradation: Output works but is suboptimal (performance, security)
Detection strategies
Approach 1: Programmatic validation
enum ValidationResult {
Pass,
Fail(String),
Uncertain, // Can't determine programmatically
}
impl Validator {
fn validate_code_output(&self, output: &str) -> ValidationResult {
// 1. Syntax check (AST parsing)
if let Err(e) = parse_syntax(output) {
return ValidationResult::Fail(format!("Syntax error: {}", e));
}
// 2. Execute tests
if let Err(e) = run_tests(output) {
return ValidationResult::Fail(format!("Tests failed: {}", e));
}
// 3. Static analysis
let issues = run_linters(output);
if issues.critical > 0 {
return ValidationResult::Fail(format!("Critical issues: {:?}", issues));
}
// 4. Semantic validation - HARD
// How do we know if tests actually validate auth logic?
ValidationResult::Uncertain
}
}
Approach 2: Evaluation agents (agent-as-judge)
# Separate agent evaluates output quality
evaluation_prompt = f"""
Task: {original_task}
Output: {agent_output}
Does this output successfully complete the task?
Score 0-10 and explain your reasoning.
"""
score = evaluator_agent.generate(evaluation_prompt)
if score < 7:
# Retry or escalate to human
Problem: Evaluation agent can also hallucinate. I found 15% false positive rate (marked bad output as good).
Approach 3: Outcome-based validation
// Don't validate the code, validate if it works end-to-end
async fn validate_deployment(task_id: &str) -> Result<(), ValidationError> {
// Agent wrote deployment config
// Actually deploy to staging
deploy_to_staging(task_id).await?;
// Run integration tests
let health = check_service_health().await?;
// Monitor for 5 minutes
let metrics = observe_metrics(Duration::from_secs(300)).await?;
if metrics.error_rate > 0.01 {
rollback_deployment(task_id).await?;
return Err(ValidationError::HighErrorRate);
}
Ok(())
}
This is what I use in production for infrastructure agents: validate by outcome, not output.
3. Resource scheduling: beyond CPU and Memory
Kubernetes Resource Model
resources:
requests:
cpu: 500m
memory: 256Mi
limits:
cpu: 1000m
memory: 512Mi
Scheduler assigns pods to nodes based on available CPU/memory. Simple, measurable, predictable.
Agent Resource Model
struct AgentResourceRequirements {
// Traditional resources
cpu_cores: f32,
memory_gb: f32,
// LLM-specific resources
token_quota: TokenQuota {
input_tokens_per_minute: u32,
output_tokens_per_minute: u32,
provider: Provider, // OpenAI, Anthropic, etc.
},
// Model requirements
model_constraints: ModelConstraints {
min_context_window: u32, // Need 100k tokens for large codebase
required_capabilities: Vec<Capability>, // FunctionCalling, Vision, etc.
max_cost_per_request: f32, // Budget constraint
max_latency_p95: Duration, // SLA requirement
},
// Tool access
required_tools: Vec<Tool>, // GitHub, AWS, Database access
// Quality requirements
min_quality_score: f32, // Some tasks need high-quality models
}
Scheduling complexity:
-
Token quotas are rate-limited, not capacity-limited
- K8s: Node has 8 CPU cores, can run 8 single-core pods
- Agents: Provider has 100k TPM (tokens per minute), but token usage varies wildly
- Task A: 500 tokens (simple question)
- Task B: 50k tokens (large codebase analysis)
- Can't predict how many concurrent tasks are feasible
Model availability changes dynamically
// Morning: GPT-5 available, low latency
// Afternoon: GPT-5 rate limited (org-wide spike)
// Fallback to Claude? Different reasoning style might break downstream tasks
// Evening: GPT-5 back but model updated (gpt-5-0613 → gpt-5-1106)
// Output format slightly different, breaks parsing logic
- Cost optimization vs. quality tradeoff
// K8s: Use cheapest instance type that meets CPU/memory needs
// Agents: Complex multi-objective optimization
fn schedule_task(task: &Task) -> SchedulingDecision {
let models = available_models();
// Pareto frontier: cost vs. quality vs. latency
let candidates = models.iter()
.filter(|m| m.can_handle(&task.requirements))
.map(|m| {
let cost = estimate_cost(m, task);
let quality = predict_quality(m, task); // ML model trained on past tasks
let latency = m.avg_latency + queue_time(m);
(m, cost, quality, latency)
})
.collect();
// Which to pick?
// - Cheapest might fail task (quality too low)
// - Best might blow budget
// - Fastest might not be available
optimize_by_policy(candidates, &task.priority)
}
Our Scheduler Implementation
pub struct AgentScheduler {
// Track real-time model availability
model_health: Arc<RwLock<HashMap<Model, HealthStatus>>>,
// Token quota tracking per provider
quota_manager: QuotaManager,
// Historical task→model performance
performance_db: PerformanceDB,
// Cost tracking
budget_tracker: BudgetTracker,
}
impl AgentScheduler {
pub async fn schedule(&self, task: Task) -> Result<Assignment, ScheduleError> {
// 1. Filter models by hard constraints
let viable_models = self.get_viable_models(&task).await?;
if viable_models.is_empty() {
return Err(ScheduleError::NoViableModel);
}
// 2. Check token quotas
let available = self.quota_manager
.get_available_quota(&viable_models)
.await?;
// 3. Predict task resource needs based on similar past tasks
let estimated_tokens = self.performance_db
.predict_token_usage(&task)
.await?;
// 4. Score each model
let mut scored: Vec<(Model, f32)> = viable_models.iter()
.filter_map(|model| {
let quota = available.get(model)?;
if quota.remaining < estimated_tokens {
return None; // Insufficient quota
}
let cost_score = 1.0 - (model.cost_per_token / MAX_COST);
let quality_score = self.performance_db
.get_quality_score(model, &task.task_type);
let latency_score = 1.0 - (model.avg_latency.as_secs_f32() / MAX_LATENCY);
// Weighted combination based on task priority
let score = match task.priority {
Priority::Cost => cost_score * 0.7 + quality_score * 0.3,
Priority::Quality => quality_score * 0.7 + cost_score * 0.2 + latency_score * 0.1,
Priority::Latency => latency_score * 0.6 + quality_score * 0.3 + cost_score * 0.1,
};
Some((*model, score))
})
.collect();
scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
// 5. Reserve quota and assign
let (selected_model, _) = scored.first().ok_or(ScheduleError::AllModelsExhausted)?;
self.quota_manager.reserve(*selected_model, estimated_tokens).await?;
Ok(Assignment {
model: *selected_model,
estimated_cost: model.cost_per_token * estimated_tokens as f32,
fallback: scored.get(1).map(|(m, _)| *m),
})
}
}
Key differences from K8s scheduler:
- Predictive (token usage) vs. declarative (CPU request)
- Multi-objective optimization vs. bin packing
- Real-time quota consumption tracking
- Model health and version tracking
- Cost-awareness is first-class concern
4. State Management: The Distributed Reasoning Problem
Kubernetes State Model
┌─────────────────┐
│ Pod (stateless) │
└─────────────────┘
│
├─> ConfigMap (immutable config)
├─> Secret (credentials)
└─> PersistentVolume (durable state)
# State is externalized, pod is disposable
Agent State Model
┌──────────────────────────────────────────┐
│ Agent Task State │
├──────────────────────────────────────────┤
│ 1. LLM Context Window (ephemeral) │
│ - Conversation history │
│ - Retrieved documents │
│ - Previous reasoning steps │
│ - Max 200k tokens, then lost │
├──────────────────────────────────────────┤
│ 2. Tool Execution Side Effects (durable) │
│ - Files created in GitHub │
│ - Database records modified │
│ - Cloud resources provisioned │
│ - Slack messages sent │
├──────────────────────────────────────────┤
│ 3. Reasoning State (semi-structured) │
│ - Current subtask in decomposition │
│ - Hypotheses being explored │
│ - Confidence scores │
│ - Retry attempts │
├──────────────────────────────────────────┤
│ 4. Inter-Agent State (distributed) │
│ - Results from sub-agents │
│ - Merge conflicts │
│ - Dependency resolution status │
└──────────────────────────────────────────┘
Problem: State is scattered across:
- LLM provider's context (can't access directly)
- External systems (GitHub, AWS, etc.)
- Your orchestrator's DB
- Other agents' context windows
Example: Mid-Task Failure Recovery
// Task: "Implement feature X across 3 microservices"
Agent decomposes into:
├─ Service A: Add API endpoint
│ ├─ Write code ✓ (committed to GitHub)
│ ├─ Write tests ✓ (committed to GitHub)
│ └─ Update docs ✓ (committed to GitHub)
├─ Service B: Update client library
│ ├─ Write code ✓ (committed to GitHub)
│ ├─ Write tests ✗ (TIMEOUT - agent crashed)
│ └─ Update docs ✗ (not started)
└─ Service C: Deploy configuration
└─ Not started
// Recovery questions:
// 1. Which commits were part of this task? (need git SHA tracking)
// 2. What was Service B agent trying to do when it crashed?
// 3. Can we resume Service B without re-reading entire codebase?
// 4. Do we rollback Service A changes? Or continue forward?
// 5. If we retry, how do we prevent Service B from duplicating Service A's work?
Our Solution: Task DAG with Explicit Dependencies
#[derive(Serialize, Deserialize)]
struct TaskGraph {
root: TaskId,
nodes: HashMap<TaskId, TaskNode>,
edges: Vec<Dependency>,
}
#[derive(Serialize, Deserialize)]
struct TaskNode {
id: TaskId,
description: String,
assigned_agent: AgentId,
status: TaskStatus,
// CRITICAL: Capture side effects
side_effects: Vec<SideEffect>,
// CRITICAL: Capture reasoning
reasoning_trace: Vec<ReasoningStep>,
// Checkpoint for recovery
checkpoint: Option<Checkpoint>,
}
#[derive(Serialize, Deserialize)]
enum SideEffect {
GitCommit { repo: String, sha: String, branch: String },
FileModified { path: String, hash: String },
APICall { endpoint: String, method: String, response_status: u16 },
DatabaseMutation { table: String, operation: DbOperation },
CloudResource { provider: String, resource_id: String, action: String },
}
#[derive(Serialize, Deserialize)]
struct Dependency {
from: TaskId,
to: TaskId,
dependency_type: DependencyType,
}
#[derive(Serialize, Deserialize)]
enum DependencyType {
Sequential, // B must start after A completes
DataDependency(String), // B needs output from A
ConflictingResources, // A and B can't run concurrently (e.g., both modify same file)
}
// Recovery logic
impl TaskGraph {
async fn recover_from_failure(&mut self, failed_task: TaskId) -> RecoveryPlan {
// 1. Find all completed upstream dependencies
let completed_deps = self.get_completed_dependencies(failed_task);
// 2. Check if any side effects need rollback
let downstream_affected = self.get_downstream_tasks(failed_task);
// 3. Determine resume strategy
if let Some(checkpoint) = self.nodes[&failed_task].checkpoint {
// Can resume from checkpoint
return RecoveryPlan::Resume {
from_checkpoint: checkpoint,
replay_side_effects: false, // Already done
};
} else if self.can_rollback_side_effects(failed_task) {
// Clean rollback possible
return RecoveryPlan::Rollback {
tasks_to_rollback: vec![failed_task],
side_effects_to_revert: self.get_side_effects(failed_task),
};
} else {
// Partial state exists, need human decision
return RecoveryPlan::RequiresHuman {
reason: "Side effects cannot be automatically rolled back",
affected_systems: self.get_affected_systems(failed_task),
};
}
}
}
Key insight: Kubernetes can restart pods because containers don't mutate external state (ideally). Agents inherently mutate state across multiple systems, so recovery requires explicitly tracking and potentially reverting side effects.
5. Observability: Debugging Reasoning, Not Just Execution
Kubernetes Observability
# Standard signals
kubectl logs pod-name # STDOUT/STDERR
kubectl top pod # CPU/Memory usage
kubectl describe pod # Events, status
# Metrics (RED method)
- Rate: Requests per second
- Errors: Error rate
- Duration: Latency percentiles
Debugging: Look at logs, find error, fix code, redeploy.
Agent Observability Requirements
struct AgentTrace {
// Traditional metrics
duration_ms: u64,
tokens_used: TokenUsage,
cost_usd: f32,
model: String,
// Reasoning trace - CRITICAL FOR DEBUGGING
reasoning_steps: Vec<ReasoningStep>,
// Tool interaction
tool_calls: Vec<ToolCall>,
// Quality metrics
output_quality_score: Option<f32>,
validation_result: ValidationResult,
// Context
input_hash: String, // Detect if same input produces different output
parent_trace_id: Option<TraceId>, // Link to spawning agent
}
#[derive(Serialize, Deserialize)]
struct ReasoningStep {
step_number: u32,
thought: String, // What agent was thinking
action: AgentAction, // What it decided to do
observation: String, // What happened after action
confidence: f32, // How sure the agent was (if using Chain-of-Thought)
}
Example debugging scenario:
# Bug report: "Agent deployed wrong configuration to prod"
# Traditional debugging:
kubectl logs agent-pod-xyz
# Shows: "Deployment successful"
# Useless - I know it deployed, I need to know WHY it chose that config
# Agent debugging:
SELECT reasoning_trace FROM agent_traces WHERE task_id = 'xyz';
# Returns:
{
"step_1": {
"thought": "User requested deployment to production",
"action": "fetch_current_config(environment='prod')",
"observation": "Current config uses instance type m5.large"
},
"step_2": {
"thought": "To optimize cost, I'll downgrade to t3.medium",
"action": "generate_terraform_config(instance_type='t3.medium')",
"confidence": 0.85,
"observation": "Generated new config"
},
"step_3": {
"thought": "Config looks good, applying to prod",
"action": "terraform_apply(environment='prod')",
"observation": "Applied successfully"
}
}
# Found the bug: Agent decided to "optimize cost" autonomously
# Solution: Add constraint that any cost-saving change needs approval
Observability I built:
pub struct ObservabilityPipeline {
// Distributed tracing (similar to OpenTelemetry)
tracer: AgentTracer,
// Metrics
metrics: MetricsCollector,
// Reasoning storage
reasoning_db: ReasoningDatabase,
// Quality evaluation (async)
evaluator: QualityEvaluator,
}
impl ObservabilityPipeline {
pub async fn trace_agent_execution(
&self,
task: &Task,
agent: &Agent,
) -> Result<AgentTrace, Error> {
let span = self.tracer.start_span("agent_execution");
// Wrap LLM calls to capture reasoning
let traced_llm = TracedLLM::new(agent.llm.clone(), span.clone());
// Execute task with traced LLM
let result = agent.execute_with_llm(task, traced_llm).await?;
// Extract reasoning from LLM responses
let reasoning = self.extract_reasoning(&result)?;
// Calculate metrics
let tokens = result.token_usage;
let cost = self.calculate_cost(&tokens, &agent.model);
// Async quality evaluation (don't block response)
let eval_task = self.evaluator.evaluate_async(task, &result);
tokio::spawn(eval_task);
let trace = AgentTrace {
trace_id: span.trace_id(),
task_id: task.id,
duration_ms: span.duration_ms(),
tokens_used: tokens,
cost_usd: cost,
model: agent.model.to_string(),
reasoning_steps: reasoning,
tool_calls: result.tool_calls,
output: result.output,
validation_result: result.validation,
};
// Store for later analysis
self.reasoning_db.store(&trace).await?;
Ok(trace)
}
// Query interface for debugging
pub async fn debug_task(&self, task_id: &str) -> DebugReport {
let traces = self.reasoning_db.get_traces_for_task(task_id).await;
DebugReport {
total_cost: traces.iter().map(|t| t.cost_usd).sum(),
total_tokens: traces.iter().map(|t| t.tokens_used.total()).sum(),
reasoning_tree: self.build_reasoning_tree(&traces),
tool_calls: traces.iter().flat_map(|t| &t.tool_calls).collect(),
quality_scores: traces.iter().filter_map(|t| t.quality_score).collect(),
failure_points: traces.iter().filter(|t| t.validation_result.is_fail()).collect(),
}
}
}
Dashboard I actually use in production:
Agent Task: "Refactor auth module"
├─ Total Cost: $2.34
├─ Total Tokens: 87,432
├─ Duration: 4m 23s
├─ Quality Score: 8.2/10
├─ Agent Decisions:
│ ├─ Step 1: Analyzed codebase (Claude Sonnet, $0.45, 32k tokens)
│ │ └─ Reasoning: "Identified 3 performance bottlenecks"
│ ├─ Step 2: Generated refactor plan (GPT-5, $1.20, 45k tokens)
│ │ └─ Reasoning: "Will parallelize token validation and cache user roles"
│ ├─ Step 3: Wrote tests (Claude Haiku, $0.12, 8k tokens)
│ │ └─ Reasoning: "Using cheaper model for straightforward task"
│ └─ Step 4: Applied changes (Claude Sonnet, $0.57, 12k tokens)
│ └─ Reasoning: "Need context awareness for merge conflicts"
└─ Validation: Tests passed, performance improved 3x
Critical observability that Kubernetes doesn't need:
- Why did the agent make this decision? (reasoning trace)
- What would it have done differently with a different model? (A/B testing agents)
- Is the output quality degrading over time? (model drift detection)
- Which subtasks were expensive vs. valuable? (ROI per reasoning step)
6. Dynamic Task Decomposition: The Recursive Scheduling Problem
Kubernetes: Static Workload Definition
# You declare the full workload upfront
apiVersion: apps/v1
kind: Deployment
spec:
replicas: 3 # Known at deploy time
template:
spec:
containers:
- name: worker
image: worker:v1
Scheduler knows exactly how many pods to create.
Agent: Runtime Task Decomposition
# Agent doesn't know how many subtasks until it analyzes the problem
task = "Migrate our monolith to microservices"
# Agent reasoning:
# 1. First, analyze codebase to identify service boundaries
initial_analysis = agent.analyze(codebase)
# 2. Based on analysis, dynamically spawn decomposition
# Agent decides: "I found 7 service boundaries"
subtasks = [
"Extract user service",
"Extract auth service",
"Extract payment service",
# ... 4 more services
"Update API gateway routing",
"Update deployment pipelines"
]
# 3. Some subtasks spawn further subtasks at runtime
for subtask in subtasks:
sub_agent = spawn_agent(subtask)
result = sub_agent.execute()
if result.requires_further_breakdown:
# Recursive decomposition - didn't know this upfront
even_more_subtasks = sub_agent.decompose_further()
for sub_sub_task in even_more_subtasks:
spawn_agent(sub_sub_task)
Orchestration challenges:
-
Resource reservation is impossible
- Kubernetes: Reserve N pods worth of CPU/memory upfront
- Agents: Don't know how many sub-agents until runtime
- Can't predict total cost or token usage
Circular dependencies detected at runtime
Task: "Implement feature X"
├─ Agent A: "Need to update schema"
│ └─ Spawns Agent B: "Design new schema"
│ └─ Agent B: "Need to know data access patterns"
│ └─ Spawns Agent C: "Analyze current queries"
│ └─ Agent C: "Need to understand schema" ← CIRCULAR!
│ └─ Tries to spawn Agent B again...
- Conflicting subtask outputs
Task: "Optimize database performance"
├─ Agent A: "Add index on user_id"
├─ Agent B: "Denormalize user table" ← Conflicts with A's approach
└─ Agent C: "Move to NoSQL" ← Conflicts with both A and B
# All three agents working in parallel, don't know others' decisions
# Merge agent needs to reconcile contradictory approaches
Our Solution: Hierarchical Task Graphs with Constraints
pub struct HierarchicalTaskGraph {
root: TaskId,
nodes: HashMap<TaskId, TaskNode>,
constraints: Vec<Constraint>,
}
#[derive(Clone)]
enum Constraint {
// Resource constraints
MaxConcurrentTasks(usize),
MaxTotalCost(f32),
MaxTreeDepth(usize), // Prevent infinite recursion
// Logical constraints
MutualExclusion(Vec<TaskId>), // Can't run simultaneously
RequiredSequence(Vec<TaskId>), // Must run in order
DeduplicationKey(String), // Prevent duplicate tasks
// Quality constraints
RequireHumanApproval(Predicate), // High-risk tasks need approval
}
pub struct TaskNode {
id: TaskId,
parent: Option<TaskId>,
children: Vec<TaskId>,
depth: usize, // Track recursion depth
// Runtime decomposition tracking
decomposed: bool,
decomposition_reasoning: Option<String>,
}
impl HierarchicalTaskGraph {
// Agent requests to spawn subtask
pub async fn request_subtask_spawn(
&mut self,
parent: TaskId,
subtask_desc: String,
) -> Result<TaskId, SpawnError> {
// 1. Check depth constraint (prevent infinite recursion)
let parent_depth = self.nodes[&parent].depth;
if parent_depth >= self.constraints.max_depth() {
return Err(SpawnError::MaxDepthExceeded);
}
// 2. Deduplication check
let dedup_key = self.compute_task_hash(&subtask_desc);
if self.task_exists_with_key(&dedup_key) {
return Err(SpawnError::DuplicateTask);
}
// 3. Check resource constraints
let concurrent = self.count_running_tasks();
if concurrent >= self.constraints.max_concurrent() {
return Err(SpawnError::ResourceExhausted);
}
let estimated_cost = self.estimate_subtask_cost(&subtask_desc);
let total_cost = self.total_cost_so_far() + estimated_cost;
if total_cost >= self.constraints.max_cost() {
return Err(SpawnError::BudgetExceeded);
}
// 4. Check for circular dependencies
if self.would_create_cycle(parent, &subtask_desc) {
return Err(SpawnError::CircularDependency);
}
// 5. Create subtask
let task_id = self.create_task(TaskNode {
parent: Some(parent),
depth: parent_depth + 1,
description: subtask_desc,
..Default::default()
});
// 6. Check approval constraints
if self.requires_approval(&task_id) {
self.mark_awaiting_approval(task_id);
}
Ok(task_id)
}
// Detect circular dependencies
fn would_create_cycle(&self, parent: TaskId, new_task_desc: &str) -> bool {
// Check if new task's description matches any ancestor
let mut current = Some(parent);
while let Some(node_id) = current {
let node = &self.nodes[&node_id];
// Semantic similarity check (not just exact match)
if self.task_similarity(&node.description, new_task_desc) > 0.85 {
return true; // Likely circular
}
current = node.parent;
}
false
}
}
Example: Preventing Runaway Task Explosion
// Without constraints:
Task: "Build a web scraper"
├─ Agent A: "First, build HTTP client"
│ └─ Agent B: "Implement connection pooling"
│ └─ Agent C: "Optimize socket management"
│ └─ Agent D: "Implement custom TCP stack" ← WAY too deep
│ └─ Agent E: "Write network driver" ← INSANE
// With constraints:
constraints = [
MaxTreeDepth(3), // Max 3 levels of decomposition
RequireHumanApproval(|task| task.estimated_cost > 10.0),
MaxTotalCost(50.0),
];
// Now:
Task: "Build a web scraper"
├─ Agent A: "First, build HTTP client"
│ └─ Agent B: "Implement connection pooling"
│ └─ Agent C: "Actually, just use reqwest crate" ← Depth limit hit, chose pragmatic solution
└─ Agent D: "Write parser for target site"
7. The Merge Problem: Reconciling Parallel Agent Outputs
Kubernetes: No Merge Problem
Pods don't modify each other's state. Service A and Service B are independent.
Agents: Constant Merge Conflicts
// Task: "Refactor codebase for performance"
// Decomposed into 3 parallel agents:
Agent A output:
diff
// auth.rs
- fn validate_token(token: &str) -> bool {
- expensive_crypto_check(token)
- }
- fn validate_token(token: &str) -> bool {
- CACHE.get_or_insert(token, || expensive_crypto_check(token))
- }
Agent B output:
diff
// auth.rs
- fn validate_token(token: &str) -> bool {
- expensive_crypto_check(token)
- }
- async fn validate_token(token: &str) -> Result {
- expensive_crypto_check_async(token).await
- } // Made it async for better concurrency
Agent C output:
diff
// auth.rs
- fn validate_token(token: &str) -> bool {
- expensive_crypto_check(token)
- }
- fn validate_token(token: &str) -> bool {
- // Refactored crypto library
- new_fast_crypto::check(token)
- }
**All three modified the same function with incompatible changes.**
**Merge strategies I've tried:**
**1. Sequential execution (kills parallelism)**
rust
// Simple but slow
let result_a = agent_a.execute().await;
let result_b = agent_b.execute_with_context(result_a).await;
let result_c = agent_c.execute_with_context(result_b).await;
**2. LLM-based merge agent (works surprisingly well)**
rust
struct MergeAgent {
llm: LLM,
}
impl MergeAgent {
async fn merge_outputs(
&self,
original: &str,
outputs: Vec,
) -> Result {
let prompt = format!(
r#"
Original code:
```
{original}
```
Three agents proposed these changes:
Agent A (caching optimization):
```diff
{diff_a}
```
Agent B (async conversion):
```diff
{diff_b}
```
Agent C (library upgrade):
```diff
{diff_c}
```
These changes conflict. Merge them into a single implementation that:
1. Preserves all optimizations where possible
2. Maintains semantic correctness
3. Resolves conflicts by choosing the best approach
Explain your reasoning for conflict resolution.
"#,
original = original,
diff_a = outputs[0].diff,
diff_b = outputs[1].diff,
diff_c = outputs[2].diff,
);
let response = self.llm.generate(prompt).await?;
// Extract merged code and reasoning
let merged = self.extract_code(&response)?;
let reasoning = self.extract_reasoning(&response)?;
// Validate merge
if !self.validate_merge(&merged).await? {
return Err(MergeError::InvalidMerge);
}
Ok(merged)
}
}
**3. Conflict detection + human escalation (production approach)**
rust
impl MergeOrchestrator {
async fn merge_with_conflict_detection(
&self,
outputs: Vec,
) -> MergeResult {
// 1. Detect conflicts
let conflicts = self.detect_conflicts(&outputs).await;
if conflicts.is_empty() {
// No conflicts - simple merge
return self.simple_merge(outputs).await;
}
// 2. Categorize conflicts
let categorized = self.categorize_conflicts(conflicts);
for conflict in categorized {
match conflict.severity {
Severity::Trivial => {
// E.g., formatting differences - auto-resolve
self.auto_resolve(conflict).await;
}
Severity::Semantic => {
// E.g., different algorithms - try LLM merge
match self.llm_merge(conflict).await {
Ok(merged) => continue,
Err(_) => {
// LLM couldn't resolve - escalate
self.request_human_resolution(conflict).await;
}
}
}
Severity::Critical => {
// E.g., contradictory business logic - always escalate
self.request_human_resolution(conflict).await;
}
}
}
MergeResult::PartialMerge {
merged: self.get_merged_outputs(),
pending_conflicts: self.get_pending_conflicts(),
}
}
}
**Real production example:**
Task: "Optimize our API gateway"
Agent A: "Removed rate limiting (it's causing latency)"
Agent B: "Tightened rate limiting (we're getting DoS attacks)"
Agent C: "Moved rate limiting to edge CDN (best of both worlds)"
Merge conflict: A and B are contradictory
Resolution: Human reviewed, chose C's approach
Lesson: Some conflicts need domain expertise to resolve
---
## Conclusion: why this matters for Infrastructure engineers
If you're building agent systems in production, you can't treat them like stateless services. The orchestration challenges are fundamentally different:
1. **Non-determinism requires semantic checkpointing**, not just process restart
2. **Failure detection needs outcome validation**, not just exit codes
3. **Resource scheduling is multi-objective optimization**, not bin packing
4. **State is distributed across LLM context and external systems**, requiring explicit side-effect tracking
5. **Observability must capture reasoning**, not just execution metrics
6. **Task decomposition is recursive and dynamic**, requiring depth limits and deduplication
7. **Parallel agent outputs require intelligent merging**, not just process isolation
I built AgentFlow to address these challenges with:
- **Hierarchical task graphs** with runtime constraints
- **Semantic checkpointing** for failure recovery
- **Multi-model scheduling** with cost/quality/latency tradeoffs
- **Reasoning traces** for debugging agent decisions
- **Conflict detection and resolution** for parallel agent outputs
The paradigm shift: **Kubernetes orchestrates processes. Agent orchestrators orchestrate reasoning.**
---
If you're building production agent systems and hitting these problems, I'd love to hear how you're solving them. Find me on [Twitter](https://x.com/Siddhant_K_code) or [GitHub](https://github.com/Siddhant-K-code).
Top comments (0)