Quick Reference: Terms You'll Encounter
Technical Acronyms:
- DAG: Directed Acyclic Graph—workflow structure with no circular dependencies
- FSM: Finite State Machine—system with defined states and transitions
- CoT: Chain of Thought—prompting technique for step-by-step reasoning
- ReAct: Reasoning + Acting—pattern combining thinking with tool use
- LLM: Large Language Model—transformer-based text generation system
Statistical & Mathematical Terms:
- State: Current snapshot of all variables in a workflow
- Transition: Movement from one state to another based on conditions
- Topological Sort: Ordering DAG nodes so dependencies come first
- Idempotent: Operation that produces same result if executed multiple times
Introduction: From Single Prompts to Orchestrated Workflows
Imagine you're planning a cross-country road trip. You wouldn't just say "drive to California" and start moving. You'd:
- Decompose: Break it into legs (Chicago → Denver → Las Vegas → LA)
- Plan: Identify gas stops, hotels, attractions
- Execute: Drive each segment, adjusting for traffic and weather
- Track State: Know where you are, how much gas you have, what's completed
Single LLM calls are like asking "how do I get to California?" You get directions, but no execution. Agentic workflows actually make the trip—handling detours, flat tires, and closed roads along the way.
Here's another analogy: Single prompts are functions; agentic workflows are programs. A function computes one thing. A program orchestrates many functions, manages state, handles errors, and produces complex outcomes.
A third way to think about it: Traditional LLM use is a calculator; agentic AI is a spreadsheet. The calculator answers one question. The spreadsheet maintains state, has interdependent cells, and updates automatically when inputs change.
Task Decomposition: Breaking Problems Apart
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from enum import Enum
import json
class TaskStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
BLOCKED = "blocked"
@dataclass
class Task:
"""Represents a single task in a workflow."""
id: str
description: str
dependencies: List[str] = field(default_factory=list)
status: TaskStatus = TaskStatus.PENDING
result: Optional[Any] = None
error: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class TaskPlan:
"""A decomposed plan with multiple tasks."""
goal: str
tasks: List[Task]
created_at: str = ""
def get_ready_tasks(self) -> List[Task]:
"""Get tasks whose dependencies are all completed."""
completed_ids = {t.id for t in self.tasks if t.status == TaskStatus.COMPLETED}
ready = []
for task in self.tasks:
if task.status == TaskStatus.PENDING:
if all(dep in completed_ids for dep in task.dependencies):
ready.append(task)
return ready
def is_complete(self) -> bool:
"""Check if all tasks are completed."""
return all(t.status == TaskStatus.COMPLETED for t in self.tasks)
def has_failed(self) -> bool:
"""Check if any task has failed."""
return any(t.status == TaskStatus.FAILED for t in self.tasks)
class TaskDecomposer:
"""
Decompose complex goals into executable task plans.
Two approaches:
1. LLM-based: Let the model break down the task
2. Template-based: Use predefined patterns for known task types
"""
DECOMPOSITION_PROMPT = """Break down this goal into specific, executable tasks.
Goal: {goal}
Context: {context}
Rules:
1. Each task should be atomic (one clear action)
2. Identify dependencies between tasks
3. Tasks should be ordered logically
4. Include validation/verification tasks where appropriate
Output JSON format:
{{
"tasks": [
{{"id": "task_1", "description": "...", "dependencies": []}},
{{"id": "task_2", "description": "...", "dependencies": ["task_1"]}}
]
}}
Output only valid JSON."""
def __init__(self, llm_client):
self.llm = llm_client
def decompose(self, goal: str, context: str = "") -> TaskPlan:
"""Decompose a goal into tasks using LLM."""
prompt = self.DECOMPOSITION_PROMPT.format(goal=goal, context=context)
response = self.llm.generate(prompt)
# Parse response
try:
# Handle markdown code blocks
if "```
" in response:
response = response.split("
```")[1]
if response.startswith("json"):
response = response[4:]
data = json.loads(response.strip())
tasks = [
Task(
id=t["id"],
description=t["description"],
dependencies=t.get("dependencies", [])
)
for t in data["tasks"]
]
return TaskPlan(goal=goal, tasks=tasks)
except (json.JSONDecodeError, KeyError) as e:
# Fallback: single task
return TaskPlan(
goal=goal,
tasks=[Task(id="task_1", description=goal)]
)
def decompose_with_template(
self,
goal: str,
template: str
) -> TaskPlan:
"""Use predefined templates for common task patterns."""
templates = {
"research": [
Task(id="search", description="Search for relevant sources", dependencies=[]),
Task(id="extract", description="Extract key information", dependencies=["search"]),
Task(id="synthesize", description="Synthesize findings", dependencies=["extract"]),
Task(id="validate", description="Validate accuracy", dependencies=["synthesize"])
],
"data_pipeline": [
Task(id="extract", description="Extract data from source", dependencies=[]),
Task(id="validate_input", description="Validate input data", dependencies=["extract"]),
Task(id="transform", description="Transform data", dependencies=["validate_input"]),
Task(id="validate_output", description="Validate output data", dependencies=["transform"]),
Task(id="load", description="Load to destination", dependencies=["validate_output"])
],
"analysis": [
Task(id="gather", description="Gather relevant data", dependencies=[]),
Task(id="clean", description="Clean and prepare data", dependencies=["gather"]),
Task(id="analyze", description="Perform analysis", dependencies=["clean"]),
Task(id="interpret", description="Interpret results", dependencies=["analyze"]),
Task(id="report", description="Generate report", dependencies=["interpret"])
]
}
if template not in templates:
raise ValueError(f"Unknown template: {template}")
# Customize task descriptions with goal
tasks = []
for t in templates[template]:
tasks.append(Task(
id=t.id,
description=f"{t.description} for: {goal}",
dependencies=t.dependencies.copy()
))
return TaskPlan(goal=goal, tasks=tasks)
# Simple LLM client interface
class LLMClient:
"""Provider-agnostic LLM client."""
def __init__(self, provider: str = "openai", model: str = None):
self.provider = provider
self.model = model or self._default_model()
def _default_model(self) -> str:
return {"openai": "gpt-4o-mini", "anthropic": "claude-3-haiku-20240307"}.get(self.provider, "gpt-4o-mini")
def generate(self, prompt: str, temperature: float = 0) -> str:
if self.provider == "openai":
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=temperature
)
return response.choices[0].message.content
elif self.provider == "anthropic":
import anthropic
client = anthropic.Anthropic()
response = client.messages.create(
model=self.model,
max_tokens=2048,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
raise ValueError(f"Unknown provider: {self.provider}")
# Example usage
if __name__ == "__main__":
# Template-based decomposition (no LLM needed)
decomposer = TaskDecomposer(llm_client=None)
plan = decomposer.decompose_with_template(
goal="Analyze Q3 sales performance",
template="analysis"
)
print(f"Goal: {plan.goal}\n")
print("Tasks:")
for task in plan.tasks:
deps = f" (depends on: {', '.join(task.dependencies)})" if task.dependencies else ""
print(f" [{task.id}] {task.description}{deps}")
State Management: Tracking Progress Through Complexity
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field, asdict
from datetime import datetime
import json
import hashlib
import copy
@dataclass
class WorkflowState:
"""
Immutable state snapshot for a workflow.
Key principles:
1. State is immutable—create new states, don't modify
2. Every state change is tracked
3. State can be serialized for persistence/recovery
"""
workflow_id: str
current_step: str
variables: Dict[str, Any] = field(default_factory=dict)
history: List[Dict] = field(default_factory=list)
created_at: str = ""
updated_at: str = ""
def __post_init__(self):
if not self.created_at:
self.created_at = datetime.utcnow().isoformat()
self.updated_at = datetime.utcnow().isoformat()
def get(self, key: str, default: Any = None) -> Any:
"""Get variable from state."""
return self.variables.get(key, default)
def with_update(self, **updates) -> 'WorkflowState':
"""Create new state with updates (immutable pattern)."""
new_variables = {**self.variables, **updates}
new_history = self.history + [{
"timestamp": datetime.utcnow().isoformat(),
"step": self.current_step,
"updates": list(updates.keys())
}]
return WorkflowState(
workflow_id=self.workflow_id,
current_step=self.current_step,
variables=new_variables,
history=new_history,
created_at=self.created_at
)
def with_step(self, step: str) -> 'WorkflowState':
"""Create new state at different step."""
return WorkflowState(
workflow_id=self.workflow_id,
current_step=step,
variables=copy.deepcopy(self.variables),
history=self.history + [{
"timestamp": datetime.utcnow().isoformat(),
"transition": f"{self.current_step} -> {step}"
}],
created_at=self.created_at
)
def to_json(self) -> str:
"""Serialize state to JSON."""
return json.dumps(asdict(self), indent=2)
@classmethod
def from_json(cls, json_str: str) -> 'WorkflowState':
"""Deserialize state from JSON."""
data = json.loads(json_str)
return cls(**data)
def checksum(self) -> str:
"""Generate checksum for state verification."""
content = json.dumps(self.variables, sort_keys=True)
return hashlib.md5(content.encode()).hexdigest()[:8]
class StateStore:
"""
Persistent state storage with versioning.
Production implementations would use:
- Redis for fast access
- PostgreSQL for durability
- S3 for long-term archival
"""
def __init__(self):
self._store: Dict[str, List[WorkflowState]] = {}
def save(self, state: WorkflowState):
"""Save state (appends to version history)."""
if state.workflow_id not in self._store:
self._store[state.workflow_id] = []
self._store[state.workflow_id].append(state)
def load(self, workflow_id: str) -> Optional[WorkflowState]:
"""Load latest state for workflow."""
if workflow_id not in self._store:
return None
return self._store[workflow_id][-1]
def load_version(self, workflow_id: str, version: int) -> Optional[WorkflowState]:
"""Load specific version of state."""
if workflow_id not in self._store:
return None
if version >= len(self._store[workflow_id]):
return None
return self._store[workflow_id][version]
def get_history(self, workflow_id: str) -> List[WorkflowState]:
"""Get all state versions for workflow."""
return self._store.get(workflow_id, [])
class ConversationMemory:
"""
Manage conversation context for multi-turn agent interactions.
"""
def __init__(self, max_turns: int = 20, max_tokens: int = 4000):
self.max_turns = max_turns
self.max_tokens = max_tokens
self.messages: List[Dict[str, str]] = []
self.summary: Optional[str] = None
def add(self, role: str, content: str):
"""Add message to conversation."""
self.messages.append({"role": role, "content": content})
# Trim if too long
if len(self.messages) > self.max_turns:
self._summarize_and_trim()
def _estimate_tokens(self) -> int:
"""Rough token estimate."""
return sum(len(m["content"]) // 4 for m in self.messages)
def _summarize_and_trim(self):
"""Summarize old messages and keep recent ones."""
# Keep last N messages, summarize the rest
keep = self.max_turns // 2
to_summarize = self.messages[:-keep]
self.messages = self.messages[-keep:]
# Create summary (in production, use LLM)
summary_parts = [f"{m['role']}: {m['content'][:100]}..." for m in to_summarize]
self.summary = "Previous conversation summary:\n" + "\n".join(summary_parts)
def get_context(self) -> List[Dict[str, str]]:
"""Get messages formatted for LLM."""
context = []
if self.summary:
context.append({"role": "system", "content": self.summary})
context.extend(self.messages)
return context
def clear(self):
"""Clear conversation memory."""
self.messages = []
self.summary = None
DAG-Based Workflow Orchestration
from typing import Dict, List, Callable, Any, Optional, Set
from dataclasses import dataclass, field
from enum import Enum
from collections import deque
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class NodeStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class DAGNode:
"""A node in the workflow DAG."""
id: str
executor: Callable[[WorkflowState], WorkflowState]
dependencies: List[str] = field(default_factory=list)
status: NodeStatus = NodeStatus.PENDING
retry_count: int = 0
max_retries: int = 3
timeout_seconds: float = 60
result: Optional[Any] = None
error: Optional[str] = None
def can_run(self, completed: Set[str]) -> bool:
"""Check if all dependencies are satisfied."""
return all(dep in completed for dep in self.dependencies)
class WorkflowDAG:
"""
DAG-based workflow orchestrator.
Why DAGs for workflows:
1. Clear dependency visualization
2. Parallel execution of independent nodes
3. Deterministic execution order
4. Easy to reason about and debug
"""
def __init__(self, workflow_id: str):
self.workflow_id = workflow_id
self.nodes: Dict[str, DAGNode] = {}
self.state_store = StateStore()
def add_node(
self,
node_id: str,
executor: Callable[[WorkflowState], WorkflowState],
dependencies: List[str] = None,
**kwargs
) -> 'WorkflowDAG':
"""Add a node to the DAG."""
# Validate dependencies exist
for dep in (dependencies or []):
if dep not in self.nodes:
raise ValueError(f"Dependency '{dep}' not found. Add nodes in dependency order.")
self.nodes[node_id] = DAGNode(
id=node_id,
executor=executor,
dependencies=dependencies or [],
**kwargs
)
return self
def _topological_sort(self) -> List[str]:
"""Sort nodes in execution order."""
in_degree = {node_id: len(node.dependencies) for node_id, node in self.nodes.items()}
queue = deque([node_id for node_id, degree in in_degree.items() if degree == 0])
result = []
while queue:
node_id = queue.popleft()
result.append(node_id)
for other_id, other_node in self.nodes.items():
if node_id in other_node.dependencies:
in_degree[other_id] -= 1
if in_degree[other_id] == 0:
queue.append(other_id)
if len(result) != len(self.nodes):
raise ValueError("DAG contains cycles")
return result
def _execute_node(self, node: DAGNode, state: WorkflowState) -> WorkflowState:
"""Execute a single node with retry logic."""
node.status = NodeStatus.RUNNING
for attempt in range(node.max_retries):
try:
logger.info(f"Executing node '{node.id}' (attempt {attempt + 1})")
start_time = time.time()
new_state = node.executor(state)
elapsed = time.time() - start_time
if elapsed > node.timeout_seconds:
raise TimeoutError(f"Node exceeded timeout ({elapsed:.1f}s > {node.timeout_seconds}s)")
node.status = NodeStatus.COMPLETED
node.result = new_state.get(f"{node.id}_result")
logger.info(f"Node '{node.id}' completed in {elapsed:.2f}s")
return new_state
except Exception as e:
node.retry_count = attempt + 1
node.error = str(e)
logger.warning(f"Node '{node.id}' failed (attempt {attempt + 1}): {e}")
if attempt + 1 >= node.max_retries:
node.status = NodeStatus.FAILED
raise
time.sleep(2 ** attempt) # Exponential backoff
return state
def run(self, initial_state: Optional[WorkflowState] = None) -> WorkflowState:
"""Execute the DAG."""
state = initial_state or WorkflowState(
workflow_id=self.workflow_id,
current_step="start"
)
execution_order = self._topological_sort()
completed: Set[str] = set()
logger.info(f"Starting workflow '{self.workflow_id}' with {len(execution_order)} nodes")
for node_id in execution_order:
node = self.nodes[node_id]
# Check dependencies
if not node.can_run(completed):
node.status = NodeStatus.SKIPPED
logger.warning(f"Skipping node '{node_id}' due to failed dependencies")
continue
try:
state = state.with_step(node_id)
state = self._execute_node(node, state)
completed.add(node_id)
self.state_store.save(state)
except Exception as e:
logger.error(f"Workflow failed at node '{node_id}': {e}")
state = state.with_update(
workflow_status="failed",
failed_node=node_id,
error=str(e)
)
self.state_store.save(state)
raise
state = state.with_update(workflow_status="completed")
self.state_store.save(state)
logger.info(f"Workflow '{self.workflow_id}' completed successfully")
return state
def visualize(self) -> str:
"""Generate ASCII visualization of DAG."""
lines = [f"Workflow: {self.workflow_id}", "=" * 40]
order = self._topological_sort()
for node_id in order:
node = self.nodes[node_id]
status_icon = {
NodeStatus.PENDING: "○",
NodeStatus.RUNNING: "◐",
NodeStatus.COMPLETED: "●",
NodeStatus.FAILED: "✗",
NodeStatus.SKIPPED: "○"
}.get(node.status, "?")
deps = f" <- [{', '.join(node.dependencies)}]" if node.dependencies else ""
lines.append(f" {status_icon} {node_id}{deps}")
return "\n".join(lines)
# Example: Research workflow
def create_research_workflow(topic: str, llm_client: LLMClient) -> WorkflowDAG:
"""Create a research workflow DAG."""
dag = WorkflowDAG(workflow_id=f"research_{topic[:20]}")
def search_step(state: WorkflowState) -> WorkflowState:
# Simulate search (in production, use search API)
results = [f"Source {i}: Information about {topic}" for i in range(3)]
return state.with_update(search_results=results, search_result=f"Found {len(results)} sources")
def extract_step(state: WorkflowState) -> WorkflowState:
sources = state.get("search_results", [])
extracted = [{"source": s, "key_points": ["point 1", "point 2"]} for s in sources]
return state.with_update(extracted_info=extracted, extract_result="Extracted key points")
def synthesize_step(state: WorkflowState) -> WorkflowState:
extracted = state.get("extracted_info", [])
prompt = f"Synthesize these findings about {topic}: {json.dumps(extracted)}"
# Use LLM for synthesis
try:
synthesis = llm_client.generate(prompt)
except:
synthesis = f"Synthesis of {len(extracted)} sources on {topic}"
return state.with_update(synthesis=synthesis, synthesize_result="Created synthesis")
def validate_step(state: WorkflowState) -> WorkflowState:
synthesis = state.get("synthesis", "")
# Simple validation (in production, more sophisticated checks)
is_valid = len(synthesis) > 50
return state.with_update(
is_valid=is_valid,
validate_result="Validation passed" if is_valid else "Validation failed"
)
dag.add_node("search", search_step)
dag.add_node("extract", extract_step, dependencies=["search"])
dag.add_node("synthesize", synthesize_step, dependencies=["extract"])
dag.add_node("validate", validate_step, dependencies=["synthesize"])
return dag
if __name__ == "__main__":
# Create and run workflow
llm = LLMClient(provider="openai")
workflow = create_research_workflow("machine learning in healthcare", llm)
print(workflow.visualize())
print()
try:
final_state = workflow.run()
print("\n" + workflow.visualize())
print(f"\nFinal synthesis: {final_state.get('synthesis', '')[:200]}...")
except Exception as e:
print(f"Workflow failed: {e}")
ReAct Pattern: Reasoning and Acting
from typing import List, Dict, Any, Callable, Optional
from dataclasses import dataclass
from enum import Enum
import re
import json
class ActionType(Enum):
SEARCH = "search"
CALCULATE = "calculate"
LOOKUP = "lookup"
FINISH = "finish"
@dataclass
class Thought:
"""Agent's reasoning step."""
content: str
@dataclass
class Action:
"""Agent's action to take."""
type: ActionType
input: str
@dataclass
class Observation:
"""Result of an action."""
content: str
@dataclass
class ReActStep:
"""One iteration of the ReAct loop."""
thought: Thought
action: Action
observation: Optional[Observation] = None
class ReActAgent:
"""
ReAct (Reasoning + Acting) agent implementation.
Pattern:
1. Thought: Reason about what to do
2. Action: Take an action (use a tool)
3. Observation: See the result
4. Repeat until task is complete
Key insight: Interleaving reasoning with actions improves both.
"""
REACT_PROMPT = """You are a helpful assistant that solves problems step by step.
Available actions:
- search[query]: Search for information
- calculate[expression]: Calculate a mathematical expression
- lookup[term]: Look up a specific term or definition
- finish[answer]: Provide the final answer
Always follow this format:
Thought: [your reasoning about what to do next]
Action: [action_name][input]
Question: {question}
{history}
Thought:"""
def __init__(
self,
llm_client: LLMClient,
tools: Dict[str, Callable[[str], str]],
max_steps: int = 10
):
self.llm = llm_client
self.tools = tools
self.max_steps = max_steps
def _parse_response(self, response: str) -> tuple:
"""Parse LLM response into thought and action."""
# Extract thought
thought_match = re.search(r'Thought:\s*(.+?)(?=Action:|$)', response, re.DOTALL)
thought = thought_match.group(1).strip() if thought_match else ""
# Extract action
action_match = re.search(r'Action:\s*(\w+)\[(.+?)\]', response)
if action_match:
action_type = action_match.group(1).lower()
action_input = action_match.group(2)
try:
action = Action(type=ActionType(action_type), input=action_input)
except ValueError:
action = Action(type=ActionType.FINISH, input="Unable to parse action")
else:
action = Action(type=ActionType.FINISH, input=thought)
return Thought(content=thought), action
def _execute_action(self, action: Action) -> Observation:
"""Execute an action using available tools."""
if action.type == ActionType.FINISH:
return Observation(content=action.input)
tool_name = action.type.value
if tool_name in self.tools:
try:
result = self.tools[tool_name](action.input)
return Observation(content=result)
except Exception as e:
return Observation(content=f"Error: {e}")
return Observation(content=f"Unknown action: {action.type}")
def _format_history(self, steps: List[ReActStep]) -> str:
"""Format previous steps for prompt."""
lines = []
for step in steps:
lines.append(f"Thought: {step.thought.content}")
lines.append(f"Action: {step.action.type.value}[{step.action.input}]")
if step.observation:
lines.append(f"Observation: {step.observation.content}")
return "\n".join(lines)
def run(self, question: str) -> tuple:
"""
Run the ReAct loop until completion.
Returns:
answer: Final answer string
steps: List of ReActStep showing reasoning process
"""
steps: List[ReActStep] = []
for i in range(self.max_steps):
# Generate thought and action
history = self._format_history(steps)
prompt = self.REACT_PROMPT.format(question=question, history=history)
response = self.llm.generate(prompt)
thought, action = self._parse_response(response)
# Execute action
observation = self._execute_action(action)
step = ReActStep(thought=thought, action=action, observation=observation)
steps.append(step)
# Check if finished
if action.type == ActionType.FINISH:
return action.input, steps
return "Max steps reached without conclusion", steps
# Default tools
def create_default_tools() -> Dict[str, Callable[[str], str]]:
"""Create default tool implementations."""
def search(query: str) -> str:
# Placeholder—in production, use actual search API
return f"Search results for '{query}': Found relevant information about the topic."
def calculate(expression: str) -> str:
try:
# Safe evaluation of mathematical expressions
allowed = set('0123456789+-*/.() ')
if not all(c in allowed for c in expression):
return "Invalid expression"
result = eval(expression)
return str(result)
except:
return "Calculation error"
def lookup(term: str) -> str:
# Placeholder—in production, use knowledge base
definitions = {
"rag": "Retrieval-Augmented Generation: A technique that enhances LLM responses with retrieved context.",
"embedding": "A vector representation of text that captures semantic meaning.",
"transformer": "A neural network architecture based on self-attention mechanisms."
}
return definitions.get(term.lower(), f"No definition found for '{term}'")
return {
"search": search,
"calculate": calculate,
"lookup": lookup
}
if __name__ == "__main__":
try:
llm = LLMClient(provider="openai")
tools = create_default_tools()
agent = ReActAgent(llm_client=llm, tools=tools, max_steps=5)
question = "What is RAG and how does it improve LLM responses?"
print(f"Question: {question}\n")
print("=" * 50)
answer, steps = agent.run(question)
for i, step in enumerate(steps):
print(f"\nStep {i + 1}:")
print(f" Thought: {step.thought.content}")
print(f" Action: {step.action.type.value}[{step.action.input}]")
if step.observation:
print(f" Observation: {step.observation.content[:100]}...")
print(f"\nFinal Answer: {answer}")
except ImportError as e:
print(f"Install dependencies: pip install openai")
Production Agent Framework
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import uuid
import json
import logging
logger = logging.getLogger(__name__)
class AgentStatus(Enum):
IDLE = "idle"
THINKING = "thinking"
ACTING = "acting"
WAITING = "waiting"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class AgentConfig:
"""Configuration for an agent."""
max_steps: int = 20
max_retries: int = 3
timeout_seconds: float = 300
temperature: float = 0.1
enable_memory: bool = True
enable_planning: bool = True
@dataclass
class AgentTrace:
"""Detailed trace of agent execution."""
trace_id: str
steps: List[Dict[str, Any]] = field(default_factory=list)
total_tokens: int = 0
total_cost: float = 0
start_time: str = ""
end_time: str = ""
def add_step(self, step_type: str, content: Dict[str, Any], tokens: int = 0):
self.steps.append({
"timestamp": datetime.utcnow().isoformat(),
"type": step_type,
"content": content,
"tokens": tokens
})
self.total_tokens += tokens
def to_dict(self) -> Dict:
return {
"trace_id": self.trace_id,
"steps": self.steps,
"total_tokens": self.total_tokens,
"duration_seconds": self._calculate_duration()
}
def _calculate_duration(self) -> float:
if not self.start_time or not self.end_time:
return 0
start = datetime.fromisoformat(self.start_time)
end = datetime.fromisoformat(self.end_time)
return (end - start).total_seconds()
class ProductionAgent:
"""
Production-ready agent with:
- Planning and task decomposition
- Tool use with error handling
- State management and memory
- Comprehensive tracing
- Cost tracking
"""
PLANNING_PROMPT = """Analyze this task and create an execution plan.
Task: {task}
Available Tools: {tools}
Create a step-by-step plan. Output JSON:
{{"steps": ["step 1 description", "step 2 description", ...]}}"""
EXECUTION_PROMPT = """Execute this step of the plan.
Overall Task: {task}
Current Step: {step}
Previous Results: {previous_results}
Available Tools: {tools}
Decide what action to take. Output JSON:
{{"thought": "your reasoning", "action": "tool_name", "input": "tool input"}}
If the step is complete, use action "complete" with the result as input."""
def __init__(
self,
llm_client: LLMClient,
tools: Dict[str, Callable],
config: AgentConfig = None
):
self.llm = llm_client
self.tools = tools
self.config = config or AgentConfig()
self.memory = ConversationMemory() if self.config.enable_memory else None
self.status = AgentStatus.IDLE
def _plan(self, task: str, trace: AgentTrace) -> List[str]:
"""Create execution plan for task."""
if not self.config.enable_planning:
return [task]
prompt = self.PLANNING_PROMPT.format(
task=task,
tools=list(self.tools.keys())
)
response = self.llm.generate(prompt, temperature=0)
trace.add_step("planning", {"prompt": prompt[:200], "response": response[:200]})
try:
if "```
" in response:
response = response.split("
```")[1].replace("json", "").strip()
data = json.loads(response)
return data.get("steps", [task])
except:
return [task]
def _execute_step(
self,
task: str,
step: str,
previous_results: List[str],
trace: AgentTrace
) -> str:
"""Execute a single step of the plan."""
prompt = self.EXECUTION_PROMPT.format(
task=task,
step=step,
previous_results=previous_results[-3:] if previous_results else "None",
tools=list(self.tools.keys())
)
response = self.llm.generate(prompt, temperature=self.config.temperature)
try:
if "```
" in response:
response = response.split("
```")[1].replace("json", "").strip()
data = json.loads(response)
thought = data.get("thought", "")
action = data.get("action", "complete")
action_input = data.get("input", "")
trace.add_step("execution", {
"step": step,
"thought": thought,
"action": action,
"input": action_input[:100]
})
if action == "complete":
return action_input
if action in self.tools:
result = self.tools[action](action_input)
trace.add_step("tool_result", {"tool": action, "result": str(result)[:200]})
return result
return f"Unknown tool: {action}"
except Exception as e:
trace.add_step("error", {"error": str(e)})
return f"Error executing step: {e}"
def run(self, task: str) -> tuple:
"""
Run agent on a task.
Returns:
result: Final result string
trace: AgentTrace with execution details
"""
trace = AgentTrace(
trace_id=str(uuid.uuid4())[:8],
start_time=datetime.utcnow().isoformat()
)
self.status = AgentStatus.THINKING
try:
# Planning phase
steps = self._plan(task, trace)
logger.info(f"Created plan with {len(steps)} steps")
# Execution phase
results = []
self.status = AgentStatus.ACTING
for i, step in enumerate(steps):
if i >= self.config.max_steps:
break
result = self._execute_step(task, step, results, trace)
results.append(result)
if self.memory:
self.memory.add("assistant", f"Step {i+1}: {result[:200]}")
self.status = AgentStatus.COMPLETED
trace.end_time = datetime.utcnow().isoformat()
final_result = results[-1] if results else "No results"
return final_result, trace
except Exception as e:
self.status = AgentStatus.FAILED
trace.add_step("failure", {"error": str(e)})
trace.end_time = datetime.utcnow().isoformat()
raise
if __name__ == "__main__":
try:
llm = LLMClient(provider="openai")
tools = create_default_tools()
agent = ProductionAgent(
llm_client=llm,
tools=tools,
config=AgentConfig(max_steps=10, enable_planning=True)
)
task = "Research what RAG is and calculate the potential cost savings of implementing it for 10000 queries per day"
result, trace = agent.run(task)
print(f"Result: {result}")
print(f"\nExecution trace ({len(trace.steps)} steps):")
for step in trace.steps:
print(f" [{step['type']}] {json.dumps(step['content'])[:80]}...")
except ImportError:
print("Install: pip install openai")
Data Engineer's ROI Lens: The Business Impact
def analyze_agentic_roi(
daily_complex_tasks: int,
avg_manual_time_minutes: float = 30,
avg_agent_time_minutes: float = 2,
llm_cost_per_task: float = 0.05,
human_hourly_rate: float = 75,
error_rate_manual: float = 0.10,
error_rate_agent: float = 0.05,
error_cost: float = 100
) -> Dict:
"""Calculate ROI of agentic workflows vs manual processes."""
monthly_tasks = daily_complex_tasks * 22 # Working days
# Manual process costs
manual_time_hours = monthly_tasks * avg_manual_time_minutes / 60
manual_labor_cost = manual_time_hours * human_hourly_rate
manual_error_cost = monthly_tasks * error_rate_manual * error_cost
manual_total = manual_labor_cost + manual_error_cost
# Agent process costs
agent_time_hours = monthly_tasks * avg_agent_time_minutes / 60
agent_labor_cost = agent_time_hours * human_hourly_rate # Oversight time
agent_llm_cost = monthly_tasks * llm_cost_per_task
agent_error_cost = monthly_tasks * error_rate_agent * error_cost
agent_total = agent_labor_cost + agent_llm_cost + agent_error_cost
# Savings
monthly_savings = manual_total - agent_total
time_saved_hours = manual_time_hours - agent_time_hours
# Setup costs
setup_hours = 40
setup_cost = setup_hours * 150 # Senior engineer rate
return {
"monthly_tasks": monthly_tasks,
"manual_monthly_cost": round(manual_total, 2),
"agent_monthly_cost": round(agent_total, 2),
"monthly_savings": round(monthly_savings, 2),
"annual_savings": round(monthly_savings * 12, 2),
"time_saved_hours_monthly": round(time_saved_hours, 1),
"setup_cost": setup_cost,
"payback_months": round(setup_cost / monthly_savings, 1) if monthly_savings > 0 else float('inf'),
"roi_first_year": round((monthly_savings * 12 - setup_cost) / setup_cost * 100, 1)
}
if __name__ == "__main__":
roi = analyze_agentic_roi(
daily_complex_tasks=20,
avg_manual_time_minutes=30,
avg_agent_time_minutes=3
)
print("=== Agentic Workflow ROI Analysis ===")
print(f"Monthly tasks: {roi['monthly_tasks']}")
print(f"\nManual process: ${roi['manual_monthly_cost']:,.0f}/month")
print(f"Agent process: ${roi['agent_monthly_cost']:,.0f}/month")
print(f"Monthly savings: ${roi['monthly_savings']:,.0f}")
print(f"\nTime saved: {roi['time_saved_hours_monthly']} hours/month")
print(f"Setup cost: ${roi['setup_cost']:,}")
print(f"Payback: {roi['payback_months']} months")
print(f"First year ROI: {roi['roi_first_year']}%")
Sample Output:
=== Agentic Workflow ROI Analysis ===
Monthly tasks: 440
Manual process: $18,700/month
Agent process: $1,892/month
Monthly savings: $16,808
Time saved: 198.0 hours/month
Setup cost: $6,000
Payback: 0.4 months
First year ROI: 3261.6%
Key Takeaways
Decompose first: Break complex tasks into atomic steps before execution
State is everything: Immutable state with history enables debugging, recovery, and observability
DAGs for orchestration: Clear dependencies, deterministic execution, easy parallelization
ReAct for reasoning: Interleaving thought and action improves both accuracy and explainability
Plan execution separately: Planning phase + execution phase prevents getting lost in complexity
The ROI is massive: Automating 30-minute manual tasks to 3-minute agent tasks delivers 10x+ returns
Start simple: one task, one DAG, one agent. Add complexity only when you've validated the pattern works for your domain.
Next in this series: Human-in-the-Loop Systems—building confidence thresholds, review queues, and feedback pipelines that keep humans appropriately in control.
Top comments (0)