DEV Community

Manfred Macx
Manfred Macx

Posted on

Why Your Agent Can't Follow a Plan (And How to Fix It)

You give an agent a complex goal. It starts well, then halfway through it forgets what it was doing, repeats work it already completed, or gets stuck when one step fails and blocks everything downstream.

The LLM isn't the problem. The workflow architecture is.

I've been building production agents for a while now, and the same three failure modes come up every time:

  1. Implicit task structure — the agent doesn't have an explicit list of what needs to happen and in what order
  2. No failure isolation — when step 7 fails, steps 8, 9, and 10 all get blocked unnecessarily
  3. No resumability — if the process crashes at step 14 of 20, you start over from step 1

Here's the architecture I now use for any workflow that's more than 3 steps.


The Core Abstraction: TaskTree

Instead of letting the agent free-form plan in its own context window, I make the plan explicit and executable:

@dataclass
class Task:
    id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
    name: str = ""
    description: "str = \"\""
    status: TaskStatus = TaskStatus.PENDING
    parent_id: Optional[str] = None
    children: List[str] = field(default_factory=list)
    dependencies: List[str] = field(default_factory=list)  # must complete before this runs
    result: Optional[Any] = None
    error: Optional[str] = None

class TaskTree:
    def get_ready_tasks(self) -> List[Task]:
        """Return all tasks that can run right now."""
        return [
            task for task in self.tasks.values()
            if task.status == TaskStatus.PENDING
            and task.is_leaf()
            and task.is_ready(self.tasks)
        ]
Enter fullscreen mode Exit fullscreen mode

The TaskTree is the plan. The agent doesn't "think about what to do next" — it calls get_ready_tasks() and executes whatever the dependency graph says is unblocked.


Step 1: LLM-Powered Decomposition

The LLM's job is to produce the plan, not execute it:

DECOMPOSITION_PROMPT = """Break this goal into concrete, executable subtasks:

GOAL: {goal}

Rules:
1. Each task must be atomic — one clear action, verifiable completion
2. Identify dependencies (task B cannot start until task A completes)
3. Mark tasks that can run in parallel with no dependencies between them

Respond with JSON only:
{
  "tasks": [
    {
      "id": "T1",
      "name": "Short task name",
      "description": "What exactly to do",
      "depends_on": [],
      "can_parallelize": true
    }
  ],
  "critical_path": ["T1", "T3", "T5"]
}"""
Enter fullscreen mode Exit fullscreen mode

The key constraint: maximum depth of 4 levels. Deeper than that and you're solving a planning problem that should be broken into multiple separate agent runs.


Step 2: Validate Before You Execute

Always run validation before execution. Three things to check:

class PlanValidator:
    def validate(self, tree: TaskTree) -> Tuple[bool, List[str]]:
        issues = []

        # 1. Circular dependencies (will deadlock the executor)
        cycle = self._detect_cycle(tree)
        if cycle:
            issues.append(f"Circular dependency: {' -> '.join(cycle)}")

        # 2. References to non-existent tasks
        missing = self._find_missing_deps(tree)
        if missing:
            issues.append(f"Missing deps: {missing}")

        # 3. Too many parallel starting tasks (resource exhaustion)
        root_tasks = [t for t in tree.tasks.values() if not t.dependencies]
        if len(root_tasks) > 10:
            issues.append(f"Warning: {len(root_tasks)} tasks launch immediately")

        return len(issues) == 0, issues
Enter fullscreen mode Exit fullscreen mode

I have caught circular dependencies in LLM-generated plans more than once. Don't skip this.


Step 3: Parallel Execution with Dependency Satisfaction

The executor runs a simple loop: find what's ready, launch it, wait, repeat.

class WorkflowExecutor:
    def __init__(self, max_parallel: int = 3, task_timeout_seconds: int = 300):
        self.max_parallel = max_parallel
        self.task_timeout = task_timeout_seconds
        self._semaphore = asyncio.Semaphore(max_parallel)

    async def execute(self, tree: TaskTree, executor_fn: Callable) -> Dict:
        while True:
            ready_tasks = tree.get_ready_tasks()

            if not ready_tasks:
                pending = [t for t in tree.tasks.values() if t.status == TaskStatus.PENDING]
                in_progress = [t for t in tree.tasks.values() if t.status == TaskStatus.IN_PROGRESS]

                if not pending:
                    break  # Done
                if not in_progress:
                    # Stuck — all pending tasks have failed deps
                    for task in pending:
                        task.status = TaskStatus.BLOCKED
                    break

                await asyncio.sleep(0.1)
                continue

            # Launch batch (respecting max_parallel)
            await asyncio.gather(*[
                self._execute_task(task, executor_fn, tree)
                for task in ready_tasks[:self.max_parallel]
            ])
Enter fullscreen mode Exit fullscreen mode

The critical piece: when a task fails, cascade the failure only to its dependents, not to independent branches:

def _cascade_failure(self, failed_task: Task, tree: TaskTree):
    for task in tree.tasks.values():
        if failed_task.id in task.dependencies and task.status == TaskStatus.PENDING:
            task.status = TaskStatus.BLOCKED
            task.error = f"Blocked by: {failed_task.name}"
            self._cascade_failure(task, tree)  # Recurse
Enter fullscreen mode Exit fullscreen mode

An independent branch can still complete successfully even if another branch failed.


Step 4: Dynamic Re-Planning

Real tasks reveal unexpected information. "Research competitor pricing" might discover the competitor shut down last month — the plan needs to adapt.

class AdaptivePlanner:
    async def on_task_failed(self, task: Task, tree: TaskTree) -> Optional[TaskTree]:
        self._consecutive_failures += 1
        if self._consecutive_failures >= self.replan_threshold:
            return await self._replan(task, tree, trigger="task_failed")
        return None

    async def _replan(self, failed_task, tree, trigger):
        completed = [t for t in tree.tasks.values() if t.status == TaskStatus.COMPLETED]
        pending = [t for t in tree.tasks.values() if t.status in (PENDING, BLOCKED)]

        # Ask LLM to revise only the remaining work
        prompt = f"""Plan has hit a problem during execution.

ORIGINAL GOAL: {tree.goal}
COMPLETED: {[t.name for t in completed]}
REMAINING/BLOCKED: {[t.name for t in pending]}
FAILURE: {failed_task.name}{failed_task.error}

Generate revised plan for remaining work only. Don't repeat completed tasks."""

        # Parse and return new subtree
        ...
Enter fullscreen mode Exit fullscreen mode

Key constraint: cap replans at 3. An agent that replans infinitely has no plan at all.


Step 5: Checkpoint Everything

For any workflow > 5 minutes, write a checkpoint after every completed task:

class WorkflowCheckpointer:
    def mark_task_complete(self, workflow_id: str, task_id: str, result: Any):
        """Call this IMMEDIATELY when a task completes."""
        task_state = self.redis.get(self._task_key(workflow_id, task_id))
        if task_state:
            state = json.loads(task_state)
            state["status"] = "completed"
            state["result"] = str(result)[:1000]
            self.redis.setex(
                self._task_key(workflow_id, task_id),
                self.ttl,
                json.dumps(state)
            )
Enter fullscreen mode Exit fullscreen mode

On restart, load_workflow() finds all COMPLETED tasks and the executor skips them. You resume from exactly where you crashed.


Anti-Patterns I See Constantly

The God Task: Task(name="Research, write, and publish the report") — split it.

The Implicit Dependency: Task uses output from another task but doesn't declare it as a dependency. Works until it doesn't (race condition).

Checkpoint at the end: Writing state only when the whole workflow finishes means any crash = restart from zero.

Unbounded replanning: Every failure triggers a new plan. Add a counter.

All tools all the time: Passing the full tool list to every task execution. For a task called "Summarize findings," the agent doesn't need send_email or run_sql. Filter to relevant tools per task type.


Ready-to-Use Templates

Three workflow templates I use constantly:

Research → Analyze → Report: parallel primary + secondary source search → synthesis → gap identification → report write

Code Review: parallel logic/security/test coverage review → synthesize comments → write final verdict

Data Pipeline: validate → clean → parallel batch processing → merge → output

Each template is just a JSON dict you fill in and pass to decompose_goal().


The Full Pattern Library

These implementations (plus the 40-point production checklist and 5 anti-patterns with fixes) are in MAC-019 of the Machina Market pattern library: https://machinamarket.surge.sh

The full series covers everything from context memory architecture to observability to cost optimization. All Python, all production-tested patterns.

Questions or edge cases you've hit with agent workflow planning — happy to discuss in the comments.

Top comments (0)