You give an agent a complex goal. It starts well, then halfway through it forgets what it was doing, repeats work it already completed, or gets stuck when one step fails and blocks everything downstream.
The LLM isn't the problem. The workflow architecture is.
I've been building production agents for a while now, and the same three failure modes come up every time:
- Implicit task structure — the agent doesn't have an explicit list of what needs to happen and in what order
- No failure isolation — when step 7 fails, steps 8, 9, and 10 all get blocked unnecessarily
- No resumability — if the process crashes at step 14 of 20, you start over from step 1
Here's the architecture I now use for any workflow that's more than 3 steps.
The Core Abstraction: TaskTree
Instead of letting the agent free-form plan in its own context window, I make the plan explicit and executable:
@dataclass
class Task:
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
name: str = ""
description: "str = \"\""
status: TaskStatus = TaskStatus.PENDING
parent_id: Optional[str] = None
children: List[str] = field(default_factory=list)
dependencies: List[str] = field(default_factory=list) # must complete before this runs
result: Optional[Any] = None
error: Optional[str] = None
class TaskTree:
def get_ready_tasks(self) -> List[Task]:
"""Return all tasks that can run right now."""
return [
task for task in self.tasks.values()
if task.status == TaskStatus.PENDING
and task.is_leaf()
and task.is_ready(self.tasks)
]
The TaskTree is the plan. The agent doesn't "think about what to do next" — it calls get_ready_tasks() and executes whatever the dependency graph says is unblocked.
Step 1: LLM-Powered Decomposition
The LLM's job is to produce the plan, not execute it:
DECOMPOSITION_PROMPT = """Break this goal into concrete, executable subtasks:
GOAL: {goal}
Rules:
1. Each task must be atomic — one clear action, verifiable completion
2. Identify dependencies (task B cannot start until task A completes)
3. Mark tasks that can run in parallel with no dependencies between them
Respond with JSON only:
{
"tasks": [
{
"id": "T1",
"name": "Short task name",
"description": "What exactly to do",
"depends_on": [],
"can_parallelize": true
}
],
"critical_path": ["T1", "T3", "T5"]
}"""
The key constraint: maximum depth of 4 levels. Deeper than that and you're solving a planning problem that should be broken into multiple separate agent runs.
Step 2: Validate Before You Execute
Always run validation before execution. Three things to check:
class PlanValidator:
def validate(self, tree: TaskTree) -> Tuple[bool, List[str]]:
issues = []
# 1. Circular dependencies (will deadlock the executor)
cycle = self._detect_cycle(tree)
if cycle:
issues.append(f"Circular dependency: {' -> '.join(cycle)}")
# 2. References to non-existent tasks
missing = self._find_missing_deps(tree)
if missing:
issues.append(f"Missing deps: {missing}")
# 3. Too many parallel starting tasks (resource exhaustion)
root_tasks = [t for t in tree.tasks.values() if not t.dependencies]
if len(root_tasks) > 10:
issues.append(f"Warning: {len(root_tasks)} tasks launch immediately")
return len(issues) == 0, issues
I have caught circular dependencies in LLM-generated plans more than once. Don't skip this.
Step 3: Parallel Execution with Dependency Satisfaction
The executor runs a simple loop: find what's ready, launch it, wait, repeat.
class WorkflowExecutor:
def __init__(self, max_parallel: int = 3, task_timeout_seconds: int = 300):
self.max_parallel = max_parallel
self.task_timeout = task_timeout_seconds
self._semaphore = asyncio.Semaphore(max_parallel)
async def execute(self, tree: TaskTree, executor_fn: Callable) -> Dict:
while True:
ready_tasks = tree.get_ready_tasks()
if not ready_tasks:
pending = [t for t in tree.tasks.values() if t.status == TaskStatus.PENDING]
in_progress = [t for t in tree.tasks.values() if t.status == TaskStatus.IN_PROGRESS]
if not pending:
break # Done
if not in_progress:
# Stuck — all pending tasks have failed deps
for task in pending:
task.status = TaskStatus.BLOCKED
break
await asyncio.sleep(0.1)
continue
# Launch batch (respecting max_parallel)
await asyncio.gather(*[
self._execute_task(task, executor_fn, tree)
for task in ready_tasks[:self.max_parallel]
])
The critical piece: when a task fails, cascade the failure only to its dependents, not to independent branches:
def _cascade_failure(self, failed_task: Task, tree: TaskTree):
for task in tree.tasks.values():
if failed_task.id in task.dependencies and task.status == TaskStatus.PENDING:
task.status = TaskStatus.BLOCKED
task.error = f"Blocked by: {failed_task.name}"
self._cascade_failure(task, tree) # Recurse
An independent branch can still complete successfully even if another branch failed.
Step 4: Dynamic Re-Planning
Real tasks reveal unexpected information. "Research competitor pricing" might discover the competitor shut down last month — the plan needs to adapt.
class AdaptivePlanner:
async def on_task_failed(self, task: Task, tree: TaskTree) -> Optional[TaskTree]:
self._consecutive_failures += 1
if self._consecutive_failures >= self.replan_threshold:
return await self._replan(task, tree, trigger="task_failed")
return None
async def _replan(self, failed_task, tree, trigger):
completed = [t for t in tree.tasks.values() if t.status == TaskStatus.COMPLETED]
pending = [t for t in tree.tasks.values() if t.status in (PENDING, BLOCKED)]
# Ask LLM to revise only the remaining work
prompt = f"""Plan has hit a problem during execution.
ORIGINAL GOAL: {tree.goal}
COMPLETED: {[t.name for t in completed]}
REMAINING/BLOCKED: {[t.name for t in pending]}
FAILURE: {failed_task.name} — {failed_task.error}
Generate revised plan for remaining work only. Don't repeat completed tasks."""
# Parse and return new subtree
...
Key constraint: cap replans at 3. An agent that replans infinitely has no plan at all.
Step 5: Checkpoint Everything
For any workflow > 5 minutes, write a checkpoint after every completed task:
class WorkflowCheckpointer:
def mark_task_complete(self, workflow_id: str, task_id: str, result: Any):
"""Call this IMMEDIATELY when a task completes."""
task_state = self.redis.get(self._task_key(workflow_id, task_id))
if task_state:
state = json.loads(task_state)
state["status"] = "completed"
state["result"] = str(result)[:1000]
self.redis.setex(
self._task_key(workflow_id, task_id),
self.ttl,
json.dumps(state)
)
On restart, load_workflow() finds all COMPLETED tasks and the executor skips them. You resume from exactly where you crashed.
Anti-Patterns I See Constantly
The God Task: Task(name="Research, write, and publish the report") — split it.
The Implicit Dependency: Task uses output from another task but doesn't declare it as a dependency. Works until it doesn't (race condition).
Checkpoint at the end: Writing state only when the whole workflow finishes means any crash = restart from zero.
Unbounded replanning: Every failure triggers a new plan. Add a counter.
All tools all the time: Passing the full tool list to every task execution. For a task called "Summarize findings," the agent doesn't need send_email or run_sql. Filter to relevant tools per task type.
Ready-to-Use Templates
Three workflow templates I use constantly:
Research → Analyze → Report: parallel primary + secondary source search → synthesis → gap identification → report write
Code Review: parallel logic/security/test coverage review → synthesize comments → write final verdict
Data Pipeline: validate → clean → parallel batch processing → merge → output
Each template is just a JSON dict you fill in and pass to decompose_goal().
The Full Pattern Library
These implementations (plus the 40-point production checklist and 5 anti-patterns with fixes) are in MAC-019 of the Machina Market pattern library: https://machinamarket.surge.sh
The full series covers everything from context memory architecture to observability to cost optimization. All Python, all production-tested patterns.
Questions or edge cases you've hit with agent workflow planning — happy to discuss in the comments.
Top comments (0)