DEV Community

Vinicius Fagundes
Vinicius Fagundes

Posted on

Multi-Step Reasoning and Agentic Workflows: Building AI That Plans and Executes

Quick Reference: Terms You'll Encounter

Technical Acronyms:

  • DAG: Directed Acyclic Graph—workflow structure with no circular dependencies
  • FSM: Finite State Machine—system with defined states and transitions
  • CoT: Chain of Thought—prompting technique for step-by-step reasoning
  • ReAct: Reasoning + Acting—pattern combining thinking with tool use
  • LLM: Large Language Model—transformer-based text generation system

Statistical & Mathematical Terms:

  • State: Current snapshot of all variables in a workflow
  • Transition: Movement from one state to another based on conditions
  • Topological Sort: Ordering DAG nodes so dependencies come first
  • Idempotent: Operation that produces same result if executed multiple times

Introduction: From Single Prompts to Orchestrated Workflows

Imagine you're planning a cross-country road trip. You wouldn't just say "drive to California" and start moving. You'd:

  1. Decompose: Break it into legs (Chicago → Denver → Las Vegas → LA)
  2. Plan: Identify gas stops, hotels, attractions
  3. Execute: Drive each segment, adjusting for traffic and weather
  4. Track State: Know where you are, how much gas you have, what's completed

Single LLM calls are like asking "how do I get to California?" You get directions, but no execution. Agentic workflows actually make the trip—handling detours, flat tires, and closed roads along the way.

Here's another analogy: Single prompts are functions; agentic workflows are programs. A function computes one thing. A program orchestrates many functions, manages state, handles errors, and produces complex outcomes.

A third way to think about it: Traditional LLM use is a calculator; agentic AI is a spreadsheet. The calculator answers one question. The spreadsheet maintains state, has interdependent cells, and updates automatically when inputs change.


Task Decomposition: Breaking Problems Apart

from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from enum import Enum
import json

class TaskStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"
    BLOCKED = "blocked"

@dataclass
class Task:
    """Represents a single task in a workflow."""
    id: str
    description: str
    dependencies: List[str] = field(default_factory=list)
    status: TaskStatus = TaskStatus.PENDING
    result: Optional[Any] = None
    error: Optional[str] = None
    metadata: Dict[str, Any] = field(default_factory=dict)

@dataclass
class TaskPlan:
    """A decomposed plan with multiple tasks."""
    goal: str
    tasks: List[Task]
    created_at: str = ""

    def get_ready_tasks(self) -> List[Task]:
        """Get tasks whose dependencies are all completed."""
        completed_ids = {t.id for t in self.tasks if t.status == TaskStatus.COMPLETED}
        ready = []
        for task in self.tasks:
            if task.status == TaskStatus.PENDING:
                if all(dep in completed_ids for dep in task.dependencies):
                    ready.append(task)
        return ready

    def is_complete(self) -> bool:
        """Check if all tasks are completed."""
        return all(t.status == TaskStatus.COMPLETED for t in self.tasks)

    def has_failed(self) -> bool:
        """Check if any task has failed."""
        return any(t.status == TaskStatus.FAILED for t in self.tasks)


class TaskDecomposer:
    """
    Decompose complex goals into executable task plans.

    Two approaches:
    1. LLM-based: Let the model break down the task
    2. Template-based: Use predefined patterns for known task types
    """

    DECOMPOSITION_PROMPT = """Break down this goal into specific, executable tasks.

Goal: {goal}

Context: {context}

Rules:
1. Each task should be atomic (one clear action)
2. Identify dependencies between tasks
3. Tasks should be ordered logically
4. Include validation/verification tasks where appropriate

Output JSON format:
{{
    "tasks": [
        {{"id": "task_1", "description": "...", "dependencies": []}},
        {{"id": "task_2", "description": "...", "dependencies": ["task_1"]}}
    ]
}}

Output only valid JSON."""

    def __init__(self, llm_client):
        self.llm = llm_client

    def decompose(self, goal: str, context: str = "") -> TaskPlan:
        """Decompose a goal into tasks using LLM."""
        prompt = self.DECOMPOSITION_PROMPT.format(goal=goal, context=context)

        response = self.llm.generate(prompt)

        # Parse response
        try:
            # Handle markdown code blocks
            if "```

" in response:
                response = response.split("

```")[1]
                if response.startswith("json"):
                    response = response[4:]

            data = json.loads(response.strip())

            tasks = [
                Task(
                    id=t["id"],
                    description=t["description"],
                    dependencies=t.get("dependencies", [])
                )
                for t in data["tasks"]
            ]

            return TaskPlan(goal=goal, tasks=tasks)

        except (json.JSONDecodeError, KeyError) as e:
            # Fallback: single task
            return TaskPlan(
                goal=goal,
                tasks=[Task(id="task_1", description=goal)]
            )

    def decompose_with_template(
        self, 
        goal: str, 
        template: str
    ) -> TaskPlan:
        """Use predefined templates for common task patterns."""

        templates = {
            "research": [
                Task(id="search", description="Search for relevant sources", dependencies=[]),
                Task(id="extract", description="Extract key information", dependencies=["search"]),
                Task(id="synthesize", description="Synthesize findings", dependencies=["extract"]),
                Task(id="validate", description="Validate accuracy", dependencies=["synthesize"])
            ],
            "data_pipeline": [
                Task(id="extract", description="Extract data from source", dependencies=[]),
                Task(id="validate_input", description="Validate input data", dependencies=["extract"]),
                Task(id="transform", description="Transform data", dependencies=["validate_input"]),
                Task(id="validate_output", description="Validate output data", dependencies=["transform"]),
                Task(id="load", description="Load to destination", dependencies=["validate_output"])
            ],
            "analysis": [
                Task(id="gather", description="Gather relevant data", dependencies=[]),
                Task(id="clean", description="Clean and prepare data", dependencies=["gather"]),
                Task(id="analyze", description="Perform analysis", dependencies=["clean"]),
                Task(id="interpret", description="Interpret results", dependencies=["analyze"]),
                Task(id="report", description="Generate report", dependencies=["interpret"])
            ]
        }

        if template not in templates:
            raise ValueError(f"Unknown template: {template}")

        # Customize task descriptions with goal
        tasks = []
        for t in templates[template]:
            tasks.append(Task(
                id=t.id,
                description=f"{t.description} for: {goal}",
                dependencies=t.dependencies.copy()
            ))

        return TaskPlan(goal=goal, tasks=tasks)


# Simple LLM client interface
class LLMClient:
    """Provider-agnostic LLM client."""

    def __init__(self, provider: str = "openai", model: str = None):
        self.provider = provider
        self.model = model or self._default_model()

    def _default_model(self) -> str:
        return {"openai": "gpt-4o-mini", "anthropic": "claude-3-haiku-20240307"}.get(self.provider, "gpt-4o-mini")

    def generate(self, prompt: str, temperature: float = 0) -> str:
        if self.provider == "openai":
            from openai import OpenAI
            client = OpenAI()
            response = client.chat.completions.create(
                model=self.model,
                messages=[{"role": "user", "content": prompt}],
                temperature=temperature
            )
            return response.choices[0].message.content

        elif self.provider == "anthropic":
            import anthropic
            client = anthropic.Anthropic()
            response = client.messages.create(
                model=self.model,
                max_tokens=2048,
                messages=[{"role": "user", "content": prompt}]
            )
            return response.content[0].text

        raise ValueError(f"Unknown provider: {self.provider}")


# Example usage
if __name__ == "__main__":
    # Template-based decomposition (no LLM needed)
    decomposer = TaskDecomposer(llm_client=None)

    plan = decomposer.decompose_with_template(
        goal="Analyze Q3 sales performance",
        template="analysis"
    )

    print(f"Goal: {plan.goal}\n")
    print("Tasks:")
    for task in plan.tasks:
        deps = f" (depends on: {', '.join(task.dependencies)})" if task.dependencies else ""
        print(f"  [{task.id}] {task.description}{deps}")
Enter fullscreen mode Exit fullscreen mode

State Management: Tracking Progress Through Complexity

from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field, asdict
from datetime import datetime
import json
import hashlib
import copy

@dataclass
class WorkflowState:
    """
    Immutable state snapshot for a workflow.

    Key principles:
    1. State is immutable—create new states, don't modify
    2. Every state change is tracked
    3. State can be serialized for persistence/recovery
    """
    workflow_id: str
    current_step: str
    variables: Dict[str, Any] = field(default_factory=dict)
    history: List[Dict] = field(default_factory=list)
    created_at: str = ""
    updated_at: str = ""

    def __post_init__(self):
        if not self.created_at:
            self.created_at = datetime.utcnow().isoformat()
        self.updated_at = datetime.utcnow().isoformat()

    def get(self, key: str, default: Any = None) -> Any:
        """Get variable from state."""
        return self.variables.get(key, default)

    def with_update(self, **updates) -> 'WorkflowState':
        """Create new state with updates (immutable pattern)."""
        new_variables = {**self.variables, **updates}
        new_history = self.history + [{
            "timestamp": datetime.utcnow().isoformat(),
            "step": self.current_step,
            "updates": list(updates.keys())
        }]

        return WorkflowState(
            workflow_id=self.workflow_id,
            current_step=self.current_step,
            variables=new_variables,
            history=new_history,
            created_at=self.created_at
        )

    def with_step(self, step: str) -> 'WorkflowState':
        """Create new state at different step."""
        return WorkflowState(
            workflow_id=self.workflow_id,
            current_step=step,
            variables=copy.deepcopy(self.variables),
            history=self.history + [{
                "timestamp": datetime.utcnow().isoformat(),
                "transition": f"{self.current_step} -> {step}"
            }],
            created_at=self.created_at
        )

    def to_json(self) -> str:
        """Serialize state to JSON."""
        return json.dumps(asdict(self), indent=2)

    @classmethod
    def from_json(cls, json_str: str) -> 'WorkflowState':
        """Deserialize state from JSON."""
        data = json.loads(json_str)
        return cls(**data)

    def checksum(self) -> str:
        """Generate checksum for state verification."""
        content = json.dumps(self.variables, sort_keys=True)
        return hashlib.md5(content.encode()).hexdigest()[:8]


class StateStore:
    """
    Persistent state storage with versioning.

    Production implementations would use:
    - Redis for fast access
    - PostgreSQL for durability
    - S3 for long-term archival
    """

    def __init__(self):
        self._store: Dict[str, List[WorkflowState]] = {}

    def save(self, state: WorkflowState):
        """Save state (appends to version history)."""
        if state.workflow_id not in self._store:
            self._store[state.workflow_id] = []
        self._store[state.workflow_id].append(state)

    def load(self, workflow_id: str) -> Optional[WorkflowState]:
        """Load latest state for workflow."""
        if workflow_id not in self._store:
            return None
        return self._store[workflow_id][-1]

    def load_version(self, workflow_id: str, version: int) -> Optional[WorkflowState]:
        """Load specific version of state."""
        if workflow_id not in self._store:
            return None
        if version >= len(self._store[workflow_id]):
            return None
        return self._store[workflow_id][version]

    def get_history(self, workflow_id: str) -> List[WorkflowState]:
        """Get all state versions for workflow."""
        return self._store.get(workflow_id, [])


class ConversationMemory:
    """
    Manage conversation context for multi-turn agent interactions.
    """

    def __init__(self, max_turns: int = 20, max_tokens: int = 4000):
        self.max_turns = max_turns
        self.max_tokens = max_tokens
        self.messages: List[Dict[str, str]] = []
        self.summary: Optional[str] = None

    def add(self, role: str, content: str):
        """Add message to conversation."""
        self.messages.append({"role": role, "content": content})

        # Trim if too long
        if len(self.messages) > self.max_turns:
            self._summarize_and_trim()

    def _estimate_tokens(self) -> int:
        """Rough token estimate."""
        return sum(len(m["content"]) // 4 for m in self.messages)

    def _summarize_and_trim(self):
        """Summarize old messages and keep recent ones."""
        # Keep last N messages, summarize the rest
        keep = self.max_turns // 2
        to_summarize = self.messages[:-keep]
        self.messages = self.messages[-keep:]

        # Create summary (in production, use LLM)
        summary_parts = [f"{m['role']}: {m['content'][:100]}..." for m in to_summarize]
        self.summary = "Previous conversation summary:\n" + "\n".join(summary_parts)

    def get_context(self) -> List[Dict[str, str]]:
        """Get messages formatted for LLM."""
        context = []
        if self.summary:
            context.append({"role": "system", "content": self.summary})
        context.extend(self.messages)
        return context

    def clear(self):
        """Clear conversation memory."""
        self.messages = []
        self.summary = None
Enter fullscreen mode Exit fullscreen mode

DAG-Based Workflow Orchestration

from typing import Dict, List, Callable, Any, Optional, Set
from dataclasses import dataclass, field
from enum import Enum
from collections import deque
import time
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class NodeStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"

@dataclass
class DAGNode:
    """A node in the workflow DAG."""
    id: str
    executor: Callable[[WorkflowState], WorkflowState]
    dependencies: List[str] = field(default_factory=list)
    status: NodeStatus = NodeStatus.PENDING
    retry_count: int = 0
    max_retries: int = 3
    timeout_seconds: float = 60
    result: Optional[Any] = None
    error: Optional[str] = None

    def can_run(self, completed: Set[str]) -> bool:
        """Check if all dependencies are satisfied."""
        return all(dep in completed for dep in self.dependencies)

class WorkflowDAG:
    """
    DAG-based workflow orchestrator.

    Why DAGs for workflows:
    1. Clear dependency visualization
    2. Parallel execution of independent nodes
    3. Deterministic execution order
    4. Easy to reason about and debug
    """

    def __init__(self, workflow_id: str):
        self.workflow_id = workflow_id
        self.nodes: Dict[str, DAGNode] = {}
        self.state_store = StateStore()

    def add_node(
        self,
        node_id: str,
        executor: Callable[[WorkflowState], WorkflowState],
        dependencies: List[str] = None,
        **kwargs
    ) -> 'WorkflowDAG':
        """Add a node to the DAG."""
        # Validate dependencies exist
        for dep in (dependencies or []):
            if dep not in self.nodes:
                raise ValueError(f"Dependency '{dep}' not found. Add nodes in dependency order.")

        self.nodes[node_id] = DAGNode(
            id=node_id,
            executor=executor,
            dependencies=dependencies or [],
            **kwargs
        )
        return self

    def _topological_sort(self) -> List[str]:
        """Sort nodes in execution order."""
        in_degree = {node_id: len(node.dependencies) for node_id, node in self.nodes.items()}
        queue = deque([node_id for node_id, degree in in_degree.items() if degree == 0])
        result = []

        while queue:
            node_id = queue.popleft()
            result.append(node_id)

            for other_id, other_node in self.nodes.items():
                if node_id in other_node.dependencies:
                    in_degree[other_id] -= 1
                    if in_degree[other_id] == 0:
                        queue.append(other_id)

        if len(result) != len(self.nodes):
            raise ValueError("DAG contains cycles")

        return result

    def _execute_node(self, node: DAGNode, state: WorkflowState) -> WorkflowState:
        """Execute a single node with retry logic."""
        node.status = NodeStatus.RUNNING

        for attempt in range(node.max_retries):
            try:
                logger.info(f"Executing node '{node.id}' (attempt {attempt + 1})")
                start_time = time.time()

                new_state = node.executor(state)

                elapsed = time.time() - start_time
                if elapsed > node.timeout_seconds:
                    raise TimeoutError(f"Node exceeded timeout ({elapsed:.1f}s > {node.timeout_seconds}s)")

                node.status = NodeStatus.COMPLETED
                node.result = new_state.get(f"{node.id}_result")
                logger.info(f"Node '{node.id}' completed in {elapsed:.2f}s")

                return new_state

            except Exception as e:
                node.retry_count = attempt + 1
                node.error = str(e)
                logger.warning(f"Node '{node.id}' failed (attempt {attempt + 1}): {e}")

                if attempt + 1 >= node.max_retries:
                    node.status = NodeStatus.FAILED
                    raise

                time.sleep(2 ** attempt)  # Exponential backoff

        return state

    def run(self, initial_state: Optional[WorkflowState] = None) -> WorkflowState:
        """Execute the DAG."""
        state = initial_state or WorkflowState(
            workflow_id=self.workflow_id,
            current_step="start"
        )

        execution_order = self._topological_sort()
        completed: Set[str] = set()

        logger.info(f"Starting workflow '{self.workflow_id}' with {len(execution_order)} nodes")

        for node_id in execution_order:
            node = self.nodes[node_id]

            # Check dependencies
            if not node.can_run(completed):
                node.status = NodeStatus.SKIPPED
                logger.warning(f"Skipping node '{node_id}' due to failed dependencies")
                continue

            try:
                state = state.with_step(node_id)
                state = self._execute_node(node, state)
                completed.add(node_id)
                self.state_store.save(state)

            except Exception as e:
                logger.error(f"Workflow failed at node '{node_id}': {e}")
                state = state.with_update(
                    workflow_status="failed",
                    failed_node=node_id,
                    error=str(e)
                )
                self.state_store.save(state)
                raise

        state = state.with_update(workflow_status="completed")
        self.state_store.save(state)
        logger.info(f"Workflow '{self.workflow_id}' completed successfully")

        return state

    def visualize(self) -> str:
        """Generate ASCII visualization of DAG."""
        lines = [f"Workflow: {self.workflow_id}", "=" * 40]

        order = self._topological_sort()
        for node_id in order:
            node = self.nodes[node_id]
            status_icon = {
                NodeStatus.PENDING: "",
                NodeStatus.RUNNING: "",
                NodeStatus.COMPLETED: "",
                NodeStatus.FAILED: "",
                NodeStatus.SKIPPED: ""
            }.get(node.status, "?")

            deps = f" <- [{', '.join(node.dependencies)}]" if node.dependencies else ""
            lines.append(f"  {status_icon} {node_id}{deps}")

        return "\n".join(lines)


# Example: Research workflow
def create_research_workflow(topic: str, llm_client: LLMClient) -> WorkflowDAG:
    """Create a research workflow DAG."""

    dag = WorkflowDAG(workflow_id=f"research_{topic[:20]}")

    def search_step(state: WorkflowState) -> WorkflowState:
        # Simulate search (in production, use search API)
        results = [f"Source {i}: Information about {topic}" for i in range(3)]
        return state.with_update(search_results=results, search_result=f"Found {len(results)} sources")

    def extract_step(state: WorkflowState) -> WorkflowState:
        sources = state.get("search_results", [])
        extracted = [{"source": s, "key_points": ["point 1", "point 2"]} for s in sources]
        return state.with_update(extracted_info=extracted, extract_result="Extracted key points")

    def synthesize_step(state: WorkflowState) -> WorkflowState:
        extracted = state.get("extracted_info", [])
        prompt = f"Synthesize these findings about {topic}: {json.dumps(extracted)}"

        # Use LLM for synthesis
        try:
            synthesis = llm_client.generate(prompt)
        except:
            synthesis = f"Synthesis of {len(extracted)} sources on {topic}"

        return state.with_update(synthesis=synthesis, synthesize_result="Created synthesis")

    def validate_step(state: WorkflowState) -> WorkflowState:
        synthesis = state.get("synthesis", "")
        # Simple validation (in production, more sophisticated checks)
        is_valid = len(synthesis) > 50
        return state.with_update(
            is_valid=is_valid,
            validate_result="Validation passed" if is_valid else "Validation failed"
        )

    dag.add_node("search", search_step)
    dag.add_node("extract", extract_step, dependencies=["search"])
    dag.add_node("synthesize", synthesize_step, dependencies=["extract"])
    dag.add_node("validate", validate_step, dependencies=["synthesize"])

    return dag


if __name__ == "__main__":
    # Create and run workflow
    llm = LLMClient(provider="openai")

    workflow = create_research_workflow("machine learning in healthcare", llm)

    print(workflow.visualize())
    print()

    try:
        final_state = workflow.run()
        print("\n" + workflow.visualize())
        print(f"\nFinal synthesis: {final_state.get('synthesis', '')[:200]}...")
    except Exception as e:
        print(f"Workflow failed: {e}")
Enter fullscreen mode Exit fullscreen mode

ReAct Pattern: Reasoning and Acting

from typing import List, Dict, Any, Callable, Optional
from dataclasses import dataclass
from enum import Enum
import re
import json

class ActionType(Enum):
    SEARCH = "search"
    CALCULATE = "calculate"
    LOOKUP = "lookup"
    FINISH = "finish"

@dataclass
class Thought:
    """Agent's reasoning step."""
    content: str

@dataclass  
class Action:
    """Agent's action to take."""
    type: ActionType
    input: str

@dataclass
class Observation:
    """Result of an action."""
    content: str

@dataclass
class ReActStep:
    """One iteration of the ReAct loop."""
    thought: Thought
    action: Action
    observation: Optional[Observation] = None

class ReActAgent:
    """
    ReAct (Reasoning + Acting) agent implementation.

    Pattern:
    1. Thought: Reason about what to do
    2. Action: Take an action (use a tool)
    3. Observation: See the result
    4. Repeat until task is complete

    Key insight: Interleaving reasoning with actions improves both.
    """

    REACT_PROMPT = """You are a helpful assistant that solves problems step by step.

Available actions:
- search[query]: Search for information
- calculate[expression]: Calculate a mathematical expression  
- lookup[term]: Look up a specific term or definition
- finish[answer]: Provide the final answer

Always follow this format:
Thought: [your reasoning about what to do next]
Action: [action_name][input]

Question: {question}

{history}

Thought:"""

    def __init__(
        self,
        llm_client: LLMClient,
        tools: Dict[str, Callable[[str], str]],
        max_steps: int = 10
    ):
        self.llm = llm_client
        self.tools = tools
        self.max_steps = max_steps

    def _parse_response(self, response: str) -> tuple:
        """Parse LLM response into thought and action."""
        # Extract thought
        thought_match = re.search(r'Thought:\s*(.+?)(?=Action:|$)', response, re.DOTALL)
        thought = thought_match.group(1).strip() if thought_match else ""

        # Extract action
        action_match = re.search(r'Action:\s*(\w+)\[(.+?)\]', response)
        if action_match:
            action_type = action_match.group(1).lower()
            action_input = action_match.group(2)

            try:
                action = Action(type=ActionType(action_type), input=action_input)
            except ValueError:
                action = Action(type=ActionType.FINISH, input="Unable to parse action")
        else:
            action = Action(type=ActionType.FINISH, input=thought)

        return Thought(content=thought), action

    def _execute_action(self, action: Action) -> Observation:
        """Execute an action using available tools."""
        if action.type == ActionType.FINISH:
            return Observation(content=action.input)

        tool_name = action.type.value
        if tool_name in self.tools:
            try:
                result = self.tools[tool_name](action.input)
                return Observation(content=result)
            except Exception as e:
                return Observation(content=f"Error: {e}")

        return Observation(content=f"Unknown action: {action.type}")

    def _format_history(self, steps: List[ReActStep]) -> str:
        """Format previous steps for prompt."""
        lines = []
        for step in steps:
            lines.append(f"Thought: {step.thought.content}")
            lines.append(f"Action: {step.action.type.value}[{step.action.input}]")
            if step.observation:
                lines.append(f"Observation: {step.observation.content}")
        return "\n".join(lines)

    def run(self, question: str) -> tuple:
        """
        Run the ReAct loop until completion.

        Returns:
            answer: Final answer string
            steps: List of ReActStep showing reasoning process
        """
        steps: List[ReActStep] = []

        for i in range(self.max_steps):
            # Generate thought and action
            history = self._format_history(steps)
            prompt = self.REACT_PROMPT.format(question=question, history=history)

            response = self.llm.generate(prompt)
            thought, action = self._parse_response(response)

            # Execute action
            observation = self._execute_action(action)

            step = ReActStep(thought=thought, action=action, observation=observation)
            steps.append(step)

            # Check if finished
            if action.type == ActionType.FINISH:
                return action.input, steps

        return "Max steps reached without conclusion", steps


# Default tools
def create_default_tools() -> Dict[str, Callable[[str], str]]:
    """Create default tool implementations."""

    def search(query: str) -> str:
        # Placeholder—in production, use actual search API
        return f"Search results for '{query}': Found relevant information about the topic."

    def calculate(expression: str) -> str:
        try:
            # Safe evaluation of mathematical expressions
            allowed = set('0123456789+-*/.() ')
            if not all(c in allowed for c in expression):
                return "Invalid expression"
            result = eval(expression)
            return str(result)
        except:
            return "Calculation error"

    def lookup(term: str) -> str:
        # Placeholder—in production, use knowledge base
        definitions = {
            "rag": "Retrieval-Augmented Generation: A technique that enhances LLM responses with retrieved context.",
            "embedding": "A vector representation of text that captures semantic meaning.",
            "transformer": "A neural network architecture based on self-attention mechanisms."
        }
        return definitions.get(term.lower(), f"No definition found for '{term}'")

    return {
        "search": search,
        "calculate": calculate,
        "lookup": lookup
    }


if __name__ == "__main__":
    try:
        llm = LLMClient(provider="openai")
        tools = create_default_tools()

        agent = ReActAgent(llm_client=llm, tools=tools, max_steps=5)

        question = "What is RAG and how does it improve LLM responses?"

        print(f"Question: {question}\n")
        print("=" * 50)

        answer, steps = agent.run(question)

        for i, step in enumerate(steps):
            print(f"\nStep {i + 1}:")
            print(f"  Thought: {step.thought.content}")
            print(f"  Action: {step.action.type.value}[{step.action.input}]")
            if step.observation:
                print(f"  Observation: {step.observation.content[:100]}...")

        print(f"\nFinal Answer: {answer}")

    except ImportError as e:
        print(f"Install dependencies: pip install openai")
Enter fullscreen mode Exit fullscreen mode

Production Agent Framework

from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import uuid
import json
import logging

logger = logging.getLogger(__name__)

class AgentStatus(Enum):
    IDLE = "idle"
    THINKING = "thinking"
    ACTING = "acting"
    WAITING = "waiting"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class AgentConfig:
    """Configuration for an agent."""
    max_steps: int = 20
    max_retries: int = 3
    timeout_seconds: float = 300
    temperature: float = 0.1
    enable_memory: bool = True
    enable_planning: bool = True

@dataclass
class AgentTrace:
    """Detailed trace of agent execution."""
    trace_id: str
    steps: List[Dict[str, Any]] = field(default_factory=list)
    total_tokens: int = 0
    total_cost: float = 0
    start_time: str = ""
    end_time: str = ""

    def add_step(self, step_type: str, content: Dict[str, Any], tokens: int = 0):
        self.steps.append({
            "timestamp": datetime.utcnow().isoformat(),
            "type": step_type,
            "content": content,
            "tokens": tokens
        })
        self.total_tokens += tokens

    def to_dict(self) -> Dict:
        return {
            "trace_id": self.trace_id,
            "steps": self.steps,
            "total_tokens": self.total_tokens,
            "duration_seconds": self._calculate_duration()
        }

    def _calculate_duration(self) -> float:
        if not self.start_time or not self.end_time:
            return 0
        start = datetime.fromisoformat(self.start_time)
        end = datetime.fromisoformat(self.end_time)
        return (end - start).total_seconds()

class ProductionAgent:
    """
    Production-ready agent with:
    - Planning and task decomposition
    - Tool use with error handling
    - State management and memory
    - Comprehensive tracing
    - Cost tracking
    """

    PLANNING_PROMPT = """Analyze this task and create an execution plan.

Task: {task}
Available Tools: {tools}

Create a step-by-step plan. Output JSON:
{{"steps": ["step 1 description", "step 2 description", ...]}}"""

    EXECUTION_PROMPT = """Execute this step of the plan.

Overall Task: {task}
Current Step: {step}
Previous Results: {previous_results}
Available Tools: {tools}

Decide what action to take. Output JSON:
{{"thought": "your reasoning", "action": "tool_name", "input": "tool input"}}

If the step is complete, use action "complete" with the result as input."""

    def __init__(
        self,
        llm_client: LLMClient,
        tools: Dict[str, Callable],
        config: AgentConfig = None
    ):
        self.llm = llm_client
        self.tools = tools
        self.config = config or AgentConfig()
        self.memory = ConversationMemory() if self.config.enable_memory else None
        self.status = AgentStatus.IDLE

    def _plan(self, task: str, trace: AgentTrace) -> List[str]:
        """Create execution plan for task."""
        if not self.config.enable_planning:
            return [task]

        prompt = self.PLANNING_PROMPT.format(
            task=task,
            tools=list(self.tools.keys())
        )

        response = self.llm.generate(prompt, temperature=0)
        trace.add_step("planning", {"prompt": prompt[:200], "response": response[:200]})

        try:
            if "```

" in response:
                response = response.split("

```")[1].replace("json", "").strip()
            data = json.loads(response)
            return data.get("steps", [task])
        except:
            return [task]

    def _execute_step(
        self,
        task: str,
        step: str,
        previous_results: List[str],
        trace: AgentTrace
    ) -> str:
        """Execute a single step of the plan."""
        prompt = self.EXECUTION_PROMPT.format(
            task=task,
            step=step,
            previous_results=previous_results[-3:] if previous_results else "None",
            tools=list(self.tools.keys())
        )

        response = self.llm.generate(prompt, temperature=self.config.temperature)

        try:
            if "```

" in response:
                response = response.split("

```")[1].replace("json", "").strip()
            data = json.loads(response)

            thought = data.get("thought", "")
            action = data.get("action", "complete")
            action_input = data.get("input", "")

            trace.add_step("execution", {
                "step": step,
                "thought": thought,
                "action": action,
                "input": action_input[:100]
            })

            if action == "complete":
                return action_input

            if action in self.tools:
                result = self.tools[action](action_input)
                trace.add_step("tool_result", {"tool": action, "result": str(result)[:200]})
                return result

            return f"Unknown tool: {action}"

        except Exception as e:
            trace.add_step("error", {"error": str(e)})
            return f"Error executing step: {e}"

    def run(self, task: str) -> tuple:
        """
        Run agent on a task.

        Returns:
            result: Final result string
            trace: AgentTrace with execution details
        """
        trace = AgentTrace(
            trace_id=str(uuid.uuid4())[:8],
            start_time=datetime.utcnow().isoformat()
        )

        self.status = AgentStatus.THINKING

        try:
            # Planning phase
            steps = self._plan(task, trace)
            logger.info(f"Created plan with {len(steps)} steps")

            # Execution phase
            results = []
            self.status = AgentStatus.ACTING

            for i, step in enumerate(steps):
                if i >= self.config.max_steps:
                    break

                result = self._execute_step(task, step, results, trace)
                results.append(result)

                if self.memory:
                    self.memory.add("assistant", f"Step {i+1}: {result[:200]}")

            self.status = AgentStatus.COMPLETED
            trace.end_time = datetime.utcnow().isoformat()

            final_result = results[-1] if results else "No results"
            return final_result, trace

        except Exception as e:
            self.status = AgentStatus.FAILED
            trace.add_step("failure", {"error": str(e)})
            trace.end_time = datetime.utcnow().isoformat()
            raise


if __name__ == "__main__":
    try:
        llm = LLMClient(provider="openai")
        tools = create_default_tools()

        agent = ProductionAgent(
            llm_client=llm,
            tools=tools,
            config=AgentConfig(max_steps=10, enable_planning=True)
        )

        task = "Research what RAG is and calculate the potential cost savings of implementing it for 10000 queries per day"

        result, trace = agent.run(task)

        print(f"Result: {result}")
        print(f"\nExecution trace ({len(trace.steps)} steps):")
        for step in trace.steps:
            print(f"  [{step['type']}] {json.dumps(step['content'])[:80]}...")

    except ImportError:
        print("Install: pip install openai")
Enter fullscreen mode Exit fullscreen mode

Data Engineer's ROI Lens: The Business Impact

def analyze_agentic_roi(
    daily_complex_tasks: int,
    avg_manual_time_minutes: float = 30,
    avg_agent_time_minutes: float = 2,
    llm_cost_per_task: float = 0.05,
    human_hourly_rate: float = 75,
    error_rate_manual: float = 0.10,
    error_rate_agent: float = 0.05,
    error_cost: float = 100
) -> Dict:
    """Calculate ROI of agentic workflows vs manual processes."""

    monthly_tasks = daily_complex_tasks * 22  # Working days

    # Manual process costs
    manual_time_hours = monthly_tasks * avg_manual_time_minutes / 60
    manual_labor_cost = manual_time_hours * human_hourly_rate
    manual_error_cost = monthly_tasks * error_rate_manual * error_cost
    manual_total = manual_labor_cost + manual_error_cost

    # Agent process costs
    agent_time_hours = monthly_tasks * avg_agent_time_minutes / 60
    agent_labor_cost = agent_time_hours * human_hourly_rate  # Oversight time
    agent_llm_cost = monthly_tasks * llm_cost_per_task
    agent_error_cost = monthly_tasks * error_rate_agent * error_cost
    agent_total = agent_labor_cost + agent_llm_cost + agent_error_cost

    # Savings
    monthly_savings = manual_total - agent_total
    time_saved_hours = manual_time_hours - agent_time_hours

    # Setup costs
    setup_hours = 40
    setup_cost = setup_hours * 150  # Senior engineer rate

    return {
        "monthly_tasks": monthly_tasks,
        "manual_monthly_cost": round(manual_total, 2),
        "agent_monthly_cost": round(agent_total, 2),
        "monthly_savings": round(monthly_savings, 2),
        "annual_savings": round(monthly_savings * 12, 2),
        "time_saved_hours_monthly": round(time_saved_hours, 1),
        "setup_cost": setup_cost,
        "payback_months": round(setup_cost / monthly_savings, 1) if monthly_savings > 0 else float('inf'),
        "roi_first_year": round((monthly_savings * 12 - setup_cost) / setup_cost * 100, 1)
    }


if __name__ == "__main__":
    roi = analyze_agentic_roi(
        daily_complex_tasks=20,
        avg_manual_time_minutes=30,
        avg_agent_time_minutes=3
    )

    print("=== Agentic Workflow ROI Analysis ===")
    print(f"Monthly tasks: {roi['monthly_tasks']}")
    print(f"\nManual process: ${roi['manual_monthly_cost']:,.0f}/month")
    print(f"Agent process:  ${roi['agent_monthly_cost']:,.0f}/month")
    print(f"Monthly savings: ${roi['monthly_savings']:,.0f}")
    print(f"\nTime saved: {roi['time_saved_hours_monthly']} hours/month")
    print(f"Setup cost: ${roi['setup_cost']:,}")
    print(f"Payback: {roi['payback_months']} months")
    print(f"First year ROI: {roi['roi_first_year']}%")
Enter fullscreen mode Exit fullscreen mode

Sample Output:

=== Agentic Workflow ROI Analysis ===
Monthly tasks: 440

Manual process: $18,700/month
Agent process:  $1,892/month
Monthly savings: $16,808

Time saved: 198.0 hours/month
Setup cost: $6,000
Payback: 0.4 months
First year ROI: 3261.6%
Enter fullscreen mode Exit fullscreen mode

Key Takeaways

  1. Decompose first: Break complex tasks into atomic steps before execution

  2. State is everything: Immutable state with history enables debugging, recovery, and observability

  3. DAGs for orchestration: Clear dependencies, deterministic execution, easy parallelization

  4. ReAct for reasoning: Interleaving thought and action improves both accuracy and explainability

  5. Plan execution separately: Planning phase + execution phase prevents getting lost in complexity

  6. The ROI is massive: Automating 30-minute manual tasks to 3-minute agent tasks delivers 10x+ returns

Start simple: one task, one DAG, one agent. Add complexity only when you've validated the pattern works for your domain.


Next in this series: Human-in-the-Loop Systems—building confidence thresholds, review queues, and feedback pipelines that keep humans appropriately in control.

Top comments (0)