Introduction
In the world of distributed systems and agentic AI, managing complex, long-running workflows reliably is a significant challenge. Traditional approaches often struggle with failures, state management, and ensuring that processes complete successfully even when systems crash or network issues occur.
Temporal is an open-source platform that solves these problems by providing durable, reliable workflow orchestration. It abstracts away the complexities of distributed systems, allowing developers to focus on business logic while Temporal handles failures, retries, and state management automatically.
This article explores Temporal's core concepts, workflow patterns, and how it's revolutionizing the development of agentic AI systems.
What is Temporal?
Temporal is a workflow orchestration platform that provides:
- Durable Execution: Workflows that can run for days, months, or even years without interruption
- Automatic Failure Recovery: Built-in retry mechanisms and state persistence
- Time-Travel Debugging: Complete event history for every workflow execution
- Scalability: Horizontal scaling through distributed workers
- Language Support: SDKs for Go, Java, Python, TypeScript, .NET, PHP, and Ruby
Unlike traditional task queues or workflow engines, Temporal maintains a complete event history of every workflow execution, enabling it to resume workflows from any point after failures.
Core Concepts
1. Workflows
Workflows are the heart of Temporal. They represent the business logic of your application as code. Workflows are:
- Deterministic: They must produce the same result when replayed
- Durable: State is persisted automatically
- Long-running: Can execute for extended periods
- Versioned: Support safe deployments and migrations
from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta
@workflow.defn
class TradingWorkflow:
"""Example workflow for automated trading analysis."""
@workflow.run
async def run(self, symbol: str, strategy: str) -> dict:
# Workflow logic here
result = await workflow.execute_activity(
analyze_market_activity,
args=[symbol, strategy],
start_to_close_timeout=timedelta(seconds=180),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=30),
maximum_attempts=3
)
)
return result
2. Activities
Activities are individual units of work within workflows. They:
- Interact with external systems (APIs, databases, services)
- Can be retried automatically on failure
- Support timeouts and cancellation
- Execute in a separate process from workflows
from temporalio import activity
@activity.defn
async def fetch_market_data_activity(symbol: str, timeframe: str) -> dict:
"""
Activity that fetches market data from external APIs.
This can fail and will be automatically retried by Temporal.
"""
# Import external dependencies inside activity
import requests
# Fetch market data from API
response = requests.get(
f"https://api.marketdata.com/v1/quote/{symbol}",
params={"timeframe": timeframe},
timeout=30
)
response.raise_for_status()
return {
"symbol": symbol,
"price_data": response.json(),
"timestamp": response.headers.get("timestamp")
}
3. Workers
Workers are processes that execute workflow and activity code. They:
- Poll the Temporal service for tasks
- Execute workflows and activities
- Can be scaled horizontally
- Support multiple task queues
from temporalio.client import Client
from temporalio.worker import Worker
async def run_worker():
# Connect to Temporal
client = await Client.connect(
"temporal-server:7233",
namespace="default"
)
# Create worker
worker = Worker(
client,
task_queue="trading-queue",
workflows=[TradingWorkflow],
activities=[fetch_market_data_activity]
)
# Run worker (blocks until stopped)
await worker.run()
Workflow Patterns
Temporal supports various workflow patterns to handle complex business processes. Each pattern has specific use cases, performance characteristics, and architectural considerations. Let's explore the most important ones in detail:
1. Sequential Execution
Overview: The simplest and most common pattern where activities execute one after another, with each step depending on the previous step's output. This creates a linear pipeline where data flows sequentially through the workflow.
Architecture:
Characteristics:
- Execution Time: Sum of all activity execution times (no parallelism)
- Dependencies: Each activity depends on the previous one
- Error Handling: Failure in any step stops the workflow
- State Management: State accumulates as data passes through each step
- Use When: Steps have strict dependencies, data must be processed in order, or when parallel execution isn't possible
Performance Considerations:
- Total execution time = sum of all activity durations
- No parallelism benefits
- Simple to understand and debug
- Best for pipelines where each step transforms data for the next
@workflow.defn
class SequentialTradingWorkflow:
@workflow.run
async def run(self, trade_request: dict) -> dict:
# Step 1: Fetch Market Data
market_data = await workflow.execute_activity(
fetch_market_data_activity,
args=[trade_request["symbol"], trade_request["timeframe"]],
start_to_close_timeout=timedelta(seconds=180)
)
# Step 2: Technical Analysis (uses market data)
trade_request["market_data"] = market_data["price_data"]
analysis_result = await workflow.execute_activity(
technical_analysis_activity,
args=[trade_request],
start_to_close_timeout=timedelta(seconds=120)
)
# Step 3: Risk Assessment (uses analysis result)
trade_request["analysis"] = analysis_result["signals"]
risk_result = await workflow.execute_activity(
risk_assessment_activity,
args=[trade_request],
start_to_close_timeout=timedelta(seconds=120)
)
# Step 4: Execute Trade (uses all previous results)
trade_request["risk_score"] = risk_result["risk_score"]
execution_result = await workflow.execute_activity(
execute_trade_activity,
args=[trade_request],
start_to_close_timeout=timedelta(seconds=120)
)
return execution_result
Use Case: When each step depends on the previous step's output, like a data processing pipeline, trading workflows where analysis depends on market data, or ETL processes.
Best Practices:
- Use appropriate timeouts for each activity
- Implement retry policies for transient failures
- Pass only necessary data between activities to reduce state size
- Log intermediate results for debugging
- Consider breaking into sub-workflows if the sequence becomes too long
2. Parallel Execution
Overview: Execute multiple independent activities concurrently to maximize throughput and minimize total execution time. This pattern is essential for performance optimization when activities don't depend on each other.
Architecture:
Characteristics:
- Execution Time: Maximum of all activity execution times (not the sum)
- Dependencies: Activities are independent and can run simultaneously
-
Error Handling: Can use
return_exceptions=Trueto handle partial failures - State Management: Results are collected and aggregated after all complete
- Use When: Activities are independent, you need to fetch data from multiple sources, or when performance is critical
Performance Considerations:
- Total execution time ≈ max(activity durations) + aggregation time
- Significant performance improvement over sequential execution
- Resource usage scales with number of parallel activities
- Best for I/O-bound operations (API calls, database queries)
@workflow.defn
class ParallelWorkflow:
@workflow.run
async def run(self, data_sources: list) -> dict:
# Define activity options
activity_options = {
"start_to_close_timeout": timedelta(seconds=120),
"retry_policy": RetryPolicy(maximum_attempts=3)
}
# Execute multiple activities in parallel
futures = []
for source in data_sources:
future = workflow.execute_activity(
fetch_data_activity,
args=[source],
**activity_options
)
futures.append(future)
# Wait for all activities to complete
results = await asyncio.gather(*futures)
# Aggregate results
aggregated = await workflow.execute_activity(
aggregate_data_activity,
args=[results],
start_to_close_timeout=timedelta(seconds=60)
)
return aggregated
Use Case: Fetching data from multiple sources, processing independent tasks, batch operations, multi-exchange trading data collection, or parallel analysis of different market segments.
Real-World Example: In a trading workflow, we could parallelize data collection from multiple exchanges:
# Parallel market data fetching from different exchanges
market_data_futures = [
workflow.execute_activity(
fetch_market_data_activity,
args=[{"symbol": symbol, "exchange": "NYSE"}]
),
workflow.execute_activity(
fetch_market_data_activity,
args=[{"symbol": symbol, "exchange": "NASDAQ"}]
),
workflow.execute_activity(
fetch_market_data_activity,
args=[{"symbol": symbol, "exchange": "LSE"}]
)
]
# Wait for all market data activities to complete
market_data_results = await asyncio.gather(*market_data_futures)
Best Practices:
- Use
asyncio.gather()withreturn_exceptions=Truefor error handling - Set appropriate timeouts for each parallel activity
- Consider rate limiting when calling external APIs in parallel
- Aggregate results efficiently to avoid large state objects
- Monitor resource usage when scaling parallel activities
3. Sub-Workflows (Child Workflows)
Overview: Break complex workflows into smaller, reusable sub-workflows (child workflows). This pattern promotes modularity, reusability, and separation of concerns. Child workflows can be independently versioned, monitored, and scaled.
Architecture:
Characteristics:
- Modularity: Each child workflow is a self-contained unit
- Reusability: Child workflows can be reused across multiple parent workflows
- Isolation: Child workflow failures don't necessarily fail the parent
- Versioning: Child workflows can be versioned independently
- Monitoring: Each child workflow has its own execution history
- Use When: You need to modularize complex processes, create reusable components, or implement hierarchical workflows
Performance Considerations:
- Child workflows execute independently and can be distributed across workers
- Parent workflow waits for child completion (can be made async with signals)
- Each child workflow maintains its own state and history
- Useful for breaking down large workflows into manageable pieces
@workflow.defn
class OrderExecutionWorkflow:
@workflow.run
async def run(self, order_id: str) -> dict:
# Execute child workflow for order validation
validation_result = await workflow.execute_child_workflow(
OrderValidationWorkflow.run,
args=[order_id],
id=f"validation-{order_id}"
)
if validation_result["status"] != "valid":
return {"status": "failed", "reason": "validation_failed"}
# Execute child workflow for risk check
risk_result = await workflow.execute_child_workflow(
RiskCheckWorkflow.run,
args=[order_id],
id=f"risk-{order_id}"
)
# Execute child workflow for trade execution
execution_result = await workflow.execute_child_workflow(
TradeExecutionWorkflow.run,
args=[order_id],
id=f"execution-{order_id}"
)
return {
"order_id": order_id,
"validation": validation_result,
"risk_check": risk_result,
"execution": execution_result
}
@workflow.defn
class OrderValidationWorkflow:
@workflow.run
async def run(self, order_id: str) -> dict:
# Order validation logic
result = await workflow.execute_activity(
validate_order_activity,
args=[order_id]
)
return result
@workflow.defn
class RiskCheckWorkflow:
@workflow.run
async def run(self, order_id: str) -> dict:
# Risk assessment logic
result = await workflow.execute_activity(
assess_risk_activity,
args=[order_id]
)
return result
Use Case: Modularizing complex processes, creating reusable workflow components, implementing saga patterns, or when you need to orchestrate multiple independent workflows.
Best Practices:
- Use descriptive workflow IDs for child workflows
- Handle child workflow failures gracefully
- Consider using signals for async child workflow communication
- Version child workflows independently for safe deployments
- Monitor child workflows separately for better observability
4. Conditional Execution
Overview: Execute different workflow paths based on runtime conditions, state, or external factors. This pattern enables dynamic workflow routing and adaptive behavior.
Architecture:
Characteristics:
- Dynamic Routing: Workflow path determined at runtime
- Conditional Logic: Based on activity results, external state, or user input
- Branching: Multiple execution paths from a single decision point
- Convergence: Paths may converge back to common activities
- Use When: You need adaptive behavior, A/B testing, different processing based on data characteristics, or user-driven workflows
Performance Considerations:
- Only one path executes, so performance depends on chosen path
- Condition evaluation adds minimal overhead
- Useful for avoiding unnecessary work in alternative paths
- Can optimize by evaluating conditions early
@workflow.defn
class AdaptiveTradingWorkflow:
@workflow.run
async def run(self, trade_request: dict) -> dict:
# Initial market analysis
market_analysis = await workflow.execute_activity(
analyze_market_activity,
args=[trade_request]
)
# Conditional branching based on market conditions
if market_analysis["volatility"] == "high":
# Use conservative strategy
result = await workflow.execute_activity(
conservative_trading_activity,
args=[trade_request, market_analysis]
)
elif market_analysis["volatility"] == "medium":
# Use standard strategy
result = await workflow.execute_activity(
standard_trading_activity,
args=[trade_request, market_analysis]
)
else:
# Use aggressive strategy
result = await workflow.execute_activity(
aggressive_trading_activity,
args=[trade_request, market_analysis]
)
return result
Use Case: Dynamic workflow routing, A/B testing, adaptive processing based on data characteristics, market condition-based trading strategies, or user preference-driven workflows.
Best Practices:
- Keep condition evaluation deterministic
- Document all possible execution paths
- Handle all branches in error scenarios
- Use clear, descriptive condition names
- Consider using enums for condition values
5. Fan-Out/Fan-In Pattern
Overview: Distribute work across multiple activities (fan-out), then collect and aggregate results (fan-in). This pattern is ideal for processing large datasets, batch operations, or map-reduce scenarios.
Architecture:
Characteristics:
- Fan-Out: Distribute work to multiple parallel activities
- Fan-In: Collect results from all activities
- Error Handling: Can handle partial failures gracefully
- Scalability: Can process large numbers of tasks
- Use When: Batch processing, map-reduce operations, processing large datasets in chunks, or parallel task execution
Performance Considerations:
- Execution time ≈ max(task durations) + aggregation time
- Scales well with number of tasks
- Can process thousands of tasks in parallel
- Resource usage scales with task count
- Best for CPU-bound or I/O-bound batch operations
@workflow.defn
class FanOutFanInWorkflow:
@workflow.run
async def run(self, task_list: list) -> dict:
# Fan-out: Distribute tasks across multiple activities
futures = []
for task in task_list:
future = workflow.execute_activity(
process_task_activity,
args=[task],
start_to_close_timeout=timedelta(seconds=300)
)
futures.append(future)
# Fan-in: Wait for all tasks to complete
results = await asyncio.gather(*futures, return_exceptions=True)
# Process results (handle exceptions)
successful_results = []
failed_tasks = []
for i, result in enumerate(results):
if isinstance(result, Exception):
failed_tasks.append({"task": task_list[i], "error": str(result)})
else:
successful_results.append(result)
# Aggregate results
summary = await workflow.execute_activity(
aggregate_results_activity,
args=[successful_results, failed_tasks]
)
return summary
Use Case: Batch processing, map-reduce operations, processing large datasets in chunks, processing multiple orders simultaneously, or parallel portfolio analysis.
Best Practices:
- Use
return_exceptions=Trueto handle partial failures - Implement retry logic for transient failures
- Consider batching tasks to avoid overwhelming workers
- Monitor task completion rates
- Implement graceful degradation for partial failures
6. Saga Pattern (Compensating Transactions)
Overview: Execute a series of transactions with compensating actions (rollback operations) if any step fails. This pattern ensures data consistency in distributed systems where traditional ACID transactions aren't possible.
Architecture:
Characteristics:
- Forward Path: Execute transactions in sequence
- Compensation: Each transaction has a compensating action
- Rollback: On failure, execute compensations in reverse order
- Consistency: Ensures system remains in consistent state
- Use When: Distributed transactions, order processing, multi-step processes requiring rollback, or financial operations
Performance Considerations:
- Execution time includes both forward and potential compensation paths
- Compensation adds overhead but ensures consistency
- Critical for financial and transactional systems
- Can be optimized by minimizing compensation complexity
@workflow.defn
class TradeExecutionSagaWorkflow:
@workflow.run
async def run(self, trade_order: dict) -> dict:
completed_steps = []
try:
# Step 1: Reserve capital
capital_result = await workflow.execute_activity(
reserve_capital_activity,
args=[trade_order]
)
completed_steps.append(("capital", capital_result))
# Step 2: Place order
order_result = await workflow.execute_activity(
place_order_activity,
args=[trade_order]
)
completed_steps.append(("order", order_result))
# Step 3: Confirm execution
execution_result = await workflow.execute_activity(
confirm_execution_activity,
args=[trade_order]
)
completed_steps.append(("execution", execution_result))
return {"status": "success", "steps": completed_steps}
except Exception as e:
# Compensate: Undo completed steps in reverse order
for step_name, step_result in reversed(completed_steps):
try:
await workflow.execute_activity(
compensate_trade_activity,
args=[step_name, step_result]
)
except Exception as comp_error:
workflow.logger.error(f"Compensation failed for {step_name}: {comp_error}")
raise e
Use Case: Distributed transactions, order processing, trade execution with rollback, or any multi-step process requiring rollback.
Best Practices:
- Design compensating actions to be idempotent
- Store completed steps in workflow state
- Execute compensations in reverse order
- Handle compensation failures gracefully
- Log all compensation actions for audit trails
7. Polling Pattern
Overview: Poll an external system or check a condition repeatedly until a desired state is reached or a timeout occurs. This pattern is useful when waiting for external processes or asynchronous operations to complete.
Architecture:
Characteristics:
- Polling Loop: Repeatedly check condition with intervals
- Timeout: Maximum time to wait before giving up
- Interval: Time between polling attempts
- Condition Check: Determines if polling should continue
- Use When: Waiting for external processes, monitoring long-running tasks, checking resource availability, or waiting for async operations
Performance Considerations:
- Execution time depends on how quickly condition is met
- Polling interval affects responsiveness vs. resource usage
- Use appropriate timeouts to avoid infinite loops
- Consider exponential backoff for polling intervals
- Best for waiting on external systems with unknown completion times
@workflow.defn
class PollingWorkflow:
@workflow.run
async def run(self, job_id: str) -> dict:
max_attempts = 60 # Poll for up to 5 minutes (60 * 5 seconds)
attempt = 0
while attempt < max_attempts:
# Check job status
status = await workflow.execute_activity(
check_job_status_activity,
args=[job_id],
start_to_close_timeout=timedelta(seconds=10)
)
if status["state"] == "completed":
# Fetch results
results = await workflow.execute_activity(
fetch_job_results_activity,
args=[job_id]
)
return results
elif status["state"] == "failed":
raise Exception(f"Job {job_id} failed: {status.get('error')}")
# Wait before next poll
await workflow.sleep(timedelta(seconds=5))
attempt += 1
raise TimeoutError(f"Job {job_id} did not complete within timeout")
Use Case: Waiting for external processes, monitoring long-running tasks, checking for resource availability, waiting for trade execution confirmation, or polling for order status.
Best Practices:
- Use exponential backoff for polling intervals
- Set reasonable timeouts to avoid infinite loops
- Make condition checks efficient (avoid heavy operations)
- Consider using signals instead of polling when possible
- Log polling attempts for debugging
8. Event-Driven Pattern
Overview: Workflows that respond to external events or signals, enabling reactive and interactive workflows. This pattern allows workflows to wait for external input or events before proceeding.
Architecture:
Characteristics:
- Reactive: Workflows respond to external events
- Interactive: Can wait for human input or external systems
- Flexible: Workflow behavior adapts based on received events
- Use When: Human-in-the-loop workflows, waiting for external approvals, event-driven systems, or interactive processes
Example:
@workflow.defn
class EventDrivenTradingWorkflow:
@workflow.run
async def run(self, trade_request: dict) -> dict:
# Initial analysis
analysis = await workflow.execute_activity(
analyze_market_activity,
args=[trade_request]
)
# Wait for approval signal (can come from user or system)
approval = await workflow.wait_condition(
lambda: self.approved is not None,
timeout=timedelta(minutes=5)
)
if self.approved:
# Execute trade
result = await workflow.execute_activity(
execute_trade_activity,
args=[trade_request, analysis]
)
return result
else:
return {"status": "rejected", "reason": "not_approved"}
@workflow.signal
def approve_trade(self, approved: bool):
self.approved = approved
9. Chained Workflows Pattern
Overview: Execute workflows in a chain where the output of one workflow triggers or provides input to the next. This creates a pipeline of workflows, each handling a specific stage of processing.
Architecture:
Characteristics:
- Pipeline: Workflows execute in sequence
- Data Flow: Output of one workflow becomes input to next
- Modularity: Each workflow handles a specific stage
- Use When: Multi-stage processing, data transformation pipelines, or when you need to separate concerns across workflow boundaries
Pattern Comparison Summary
| Pattern | Execution Time | Complexity | Use Case |
|---|---|---|---|
| Sequential | Sum of all activities | Low | Dependent steps, pipelines |
| Parallel | Max of activities | Medium | Independent tasks, performance |
| Sub-Workflows | Depends on children | Medium-High | Modularity, reusability |
| Conditional | Single path | Medium | Dynamic routing, adaptation |
| Fan-Out/Fan-In | Max + aggregation | Medium-High | Batch processing, map-reduce |
| Saga | Forward + compensation | High | Transactions, rollback |
| Polling | Variable (until condition) | Low-Medium | External waits, monitoring |
| Event-Driven | Variable (until event) | Medium | Interactive, reactive |
| Chained | Sum of workflows | Medium | Multi-stage pipelines |
Temporal in Agentic AI
Agentic AI systems involve autonomous agents that make decisions and perform tasks over extended periods. Temporal provides an ideal foundation for building these systems.
Why Temporal for Agentic AI?
1. Reliability and Resilience
AI agents operate in unpredictable environments and may encounter:
- Network failures
- API rate limits
- Model inference errors
- Resource constraints
Temporal's durable execution ensures agent workflows recover automatically:
@workflow.defn
class AIAgentWorkflow:
@workflow.run
async def run(self, user_query: str) -> dict:
# Agent reasoning step
reasoning = await workflow.execute_activity(
agent_reasoning_activity,
args=[user_query],
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=2),
maximum_interval=timedelta(seconds=60),
maximum_attempts=5
)
)
# Agent action step
action_result = await workflow.execute_activity(
agent_action_activity,
args=[reasoning],
retry_policy=RetryPolicy(maximum_attempts=3)
)
return action_result
2. Complex Decision-Making Orchestration
AI agents often need to coordinate multiple decision-making processes. Temporal's support for parallel execution and sub-workflows enables agents to:
- Perform concurrent analysis
- Delegate subtasks efficiently
- Make decisions based on aggregated results
@workflow.defn
class MultiAgentOrchestrationWorkflow:
@workflow.run
async def run(self, task: dict) -> dict:
# Parallel agent execution
agent_futures = [
workflow.execute_activity(
research_agent_activity,
args=[task]
),
workflow.execute_activity(
analysis_agent_activity,
args=[task]
),
workflow.execute_activity(
planning_agent_activity,
args=[task]
)
]
# Wait for all agents to complete
agent_results = await asyncio.gather(*agent_futures)
# Coordinator agent synthesizes results
final_result = await workflow.execute_activity(
coordinator_agent_activity,
args=[agent_results]
)
return final_result
3. State Management Across Long-Running Processes
AI agents need to maintain context over extended periods. Temporal's event history provides:
- Complete state persistence
- Ability to resume from any point
- Time-travel debugging
- Audit trails
@workflow.defn
class LongRunningAgentWorkflow:
def __init__(self):
self.conversation_history = []
self.agent_state = {}
@workflow.query
def get_conversation_history(self) -> list:
"""Query method to get current conversation state."""
return self.conversation_history
@workflow.run
async def run(self, initial_message: str) -> dict:
self.conversation_history.append({
"role": "user",
"content": initial_message
})
# Agent processes message
response = await workflow.execute_activity(
agent_process_activity,
args=[self.conversation_history]
)
self.conversation_history.append({
"role": "assistant",
"content": response["content"]
})
# Workflow can continue indefinitely, maintaining state
return {"status": "completed", "history": self.conversation_history}
4. Scalability for AI Workloads
As AI systems scale, Temporal's architecture supports:
- Horizontal scaling through multiple workers
- Load distribution across task queues
- Efficient resource utilization
# Multiple workers can process different trading agent types
worker_market_data = Worker(
client,
task_queue="market-data-agents-queue",
workflows=[MarketDataAgentWorkflow],
activities=[market_data_activities]
)
worker_trading = Worker(
client,
task_queue="trading-agents-queue",
workflows=[TradingAgentWorkflow],
activities=[trading_activities]
)
Real-World Example: Automated Trading Agent System
Let's examine a real-world implementation that demonstrates Temporal's power in agentic AI for trading:
@workflow.defn
class AutomatedTradingWorkflow:
"""
Automated Trading Workflow - Orchestrates multiple AI agents
for comprehensive trading analysis and execution.
"""
def __init__(self):
self.progress_events = []
self.trade_state = {}
@workflow.query
def get_progress(self) -> list:
"""Query method for real-time progress tracking."""
return self.progress_events
@workflow.query
def get_trade_state(self) -> dict:
"""Query method for current trade state."""
return self.trade_state
@workflow.run
async def run(
self,
symbol: str,
strategy: str,
user_id: str = "default-user"
) -> str:
"""
Execute automated trading workflow with 4 sequential AI agents.
"""
trade_request = {
"symbol": symbol,
"strategy": strategy,
"user_id": user_id
}
all_results = []
# Define retry policy for reliability
retry_policy = RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=30),
maximum_attempts=3,
non_retryable_error_types=["ValueError", "InsufficientFundsError"]
)
# Agent 1: Market Data Collection
self.progress_events.append({
"event": "node_executing",
"node": "market_data_collection",
"type": "data_fetch"
})
result1 = await workflow.execute_activity(
fetch_market_data_activity,
args=[symbol, "1h"],
start_to_close_timeout=timedelta(seconds=180),
retry_policy=retry_policy
)
trade_request["market_data"] = result1["price_data"]
self.trade_state["market_data"] = result1
all_results.append({"agent": "Market Data Collector", "result": result1})
# Agent 2: Technical Analysis
self.progress_events.append({
"event": "node_executing",
"node": "technical_analysis",
"type": "analysis"
})
result2 = await workflow.execute_activity(
technical_analysis_activity,
args=[trade_request],
start_to_close_timeout=timedelta(seconds=120),
retry_policy=retry_policy
)
trade_request["signals"] = result2["signals"]
self.trade_state["analysis"] = result2
all_results.append({"agent": "Technical Analyst", "result": result2})
# Agent 3: Risk Assessment
self.progress_events.append({
"event": "node_executing",
"node": "risk_assessment",
"type": "risk_analysis"
})
result3 = await workflow.execute_activity(
risk_assessment_activity,
args=[trade_request],
start_to_close_timeout=timedelta(seconds=120),
retry_policy=retry_policy
)
trade_request["risk_score"] = result3["risk_score"]
self.trade_state["risk"] = result3
all_results.append({"agent": "Risk Analyst", "result": result3})
# Agent 4: Trade Execution (only if risk is acceptable)
if result3["risk_score"] < 0.7: # Risk threshold
self.progress_events.append({
"event": "node_executing",
"node": "trade_execution",
"type": "execution"
})
result4 = await workflow.execute_activity(
execute_trade_activity,
args=[trade_request],
start_to_close_timeout=timedelta(seconds=120),
retry_policy=retry_policy
)
self.trade_state["execution"] = result4
all_results.append({"agent": "Trade Executor", "result": result4})
else:
all_results.append({
"agent": "Trade Executor",
"result": {"status": "skipped", "reason": "risk_too_high"}
})
return json.dumps({
"summary": "Trading workflow completed",
"agents": all_results,
"workflow_id": workflow.info().workflow_id,
"final_state": self.trade_state
})
Key Features Demonstrated:
- Sequential Agent Execution: Each agent uses the previous agent's output
- Progress Tracking: Real-time progress events via workflow queries
- State Queries: Ability to query current trade state at any time
- Automatic Retries: Retry policy ensures reliability
- Conditional Execution: Trade execution only if risk is acceptable
- Timeout Management: Appropriate timeouts for each activity type
- State Persistence: Workflow state maintained across failures
Enhanced Version: Parallel Agent Execution
We can enhance the workflow to use parallel execution for independent tasks:
@workflow.defn
class EnhancedTradingWorkflow:
@workflow.run
async def run(self, symbol: str, strategy: str) -> dict:
# Parallel data collection from multiple exchanges
market_data_futures = [
workflow.execute_activity(
fetch_market_data_activity,
args=[{"symbol": symbol, "exchange": "NYSE"}]
),
workflow.execute_activity(
fetch_market_data_activity,
args=[{"symbol": symbol, "exchange": "NASDAQ"}]
),
workflow.execute_activity(
fetch_market_data_activity,
args=[{"symbol": symbol, "exchange": "LSE"}]
)
]
# Wait for all market data to complete
market_data_results = await asyncio.gather(*market_data_futures)
# Parallel analysis on different timeframes
analysis_futures = [
workflow.execute_activity(
technical_analysis_activity,
args=[{"data": result, "timeframe": "1h"}]
) for result in market_data_results
]
analysis_results = await asyncio.gather(*analysis_futures)
# Aggregate signals and execute trade
aggregated_signals = await workflow.execute_activity(
aggregate_signals_activity,
args=[{"analyses": analysis_results}]
)
# Execute trade based on aggregated signals
trade_result = await workflow.execute_activity(
execute_trade_activity,
args=[{"symbol": symbol, "signals": aggregated_signals}]
)
return trade_result
Advanced Patterns for Agentic AI
1. Agent Decision Trees
Use sub-workflows to create complex decision trees for AI agents:
@workflow.defn
class TradingStrategyRouterWorkflow:
@workflow.run
async def run(self, trade_request: dict) -> dict:
# Initial market analysis
market_analysis = await workflow.execute_activity(
analyze_market_activity,
args=[trade_request]
)
# Route to appropriate trading strategy based on market conditions
if market_analysis["market_condition"] == "trending":
return await workflow.execute_child_workflow(
TrendFollowingStrategyWorkflow.run,
args=[trade_request, market_analysis]
)
elif market_analysis["market_condition"] == "ranging":
return await workflow.execute_child_workflow(
MeanReversionStrategyWorkflow.run,
args=[trade_request, market_analysis]
)
else:
return await workflow.execute_child_workflow(
AdaptiveStrategyWorkflow.run,
args=[trade_request, market_analysis]
)
2. Agent Collaboration
Orchestrate multiple agents working together:
@workflow.defn
class CollaborativeTradingWorkflow:
@workflow.run
async def run(self, trade_request: dict) -> dict:
# Agent 1: Market Data Collection
market_data = await workflow.execute_activity(
market_data_agent_activity,
args=[trade_request]
)
# Agent 2: Technical Analysis (uses market data)
technical_analysis = await workflow.execute_activity(
technical_analysis_agent_activity,
args=[market_data]
)
# Agent 3: Risk Assessment (uses both)
risk_assessment = await workflow.execute_activity(
risk_assessment_agent_activity,
args=[market_data, technical_analysis]
)
# Agent 4: Trade Decision (uses all)
trade_decision = await workflow.execute_activity(
trade_decision_agent_activity,
args=[market_data, technical_analysis, risk_assessment]
)
return trade_decision
3. Adaptive Agent Workflows
Adjust workflow behavior based on intermediate results:
@workflow.defn
class AdaptiveTradingAgentWorkflow:
@workflow.run
async def run(self, trade_request: dict) -> dict:
max_iterations = 5
for iteration in range(max_iterations):
# Agent processes trade request
result = await workflow.execute_activity(
trading_agent_process_activity,
args=[trade_request]
)
# Check if trade is complete
if result["status"] == "executed" or result["status"] == "rejected":
return result
# Adapt based on result
if result["needs_more_data"]:
# Fetch additional market data
additional_data = await workflow.execute_activity(
fetch_additional_data_activity,
args=[result["missing_data"]]
)
trade_request["additional_data"] = additional_data
# Adjust strategy based on market feedback
if result["strategy_adjustment_needed"]:
trade_request["strategy"] = result["recommended_strategy"]
# Update trade request for next iteration
trade_request["iteration"] = iteration + 1
trade_request["previous_result"] = result
return {"status": "max_iterations_reached", "final_result": trade_request}
Benefits of Temporal for Agentic AI
1. Reliability
- Automatic retries on failures
- State persistence across crashes
- Guaranteed execution completion
- No data loss
2. Observability
- Complete execution history
- Real-time progress tracking
- Time-travel debugging
- Built-in monitoring
3. Scalability
- Horizontal worker scaling
- Load distribution
- Efficient resource utilization
- Support for high-throughput systems
4. Developer Experience
- Write workflows as code
- Familiar programming constructs
- Strong typing support
- Comprehensive SDKs
5. Production-Ready
- Battle-tested at scale
- Used by major companies
- Active community
- Regular updates and improvements
Best Practices
1. Activity Design
- Keep activities focused and single-purpose
- Import external dependencies inside activities
- Use appropriate timeouts
- Handle errors gracefully
2. Workflow Design
- Keep workflows deterministic
- Avoid random number generation (use activities)
- Don't use system time directly (use workflow.now())
- Use queries for read-only state access
3. Error Handling
- Define appropriate retry policies
- Use non-retryable error types for permanent failures
- Implement compensation logic for sagas
- Log errors appropriately
4. Performance
- Use parallel execution when possible
- Set appropriate timeouts
- Monitor workflow execution times
- Optimize activity execution
5. Testing
- Test workflows in isolation
- Mock activities for unit testing
- Use Temporal's test framework
- Test failure scenarios
Conclusion
Temporal provides a robust foundation for building reliable, scalable agentic AI systems. Its durable execution model, comprehensive workflow patterns, and excellent developer experience make it an ideal choice for orchestrating complex AI agent workflows.
Key takeaways:
- Temporal abstracts complexity: Focus on business logic, not infrastructure
- Multiple workflow patterns: Sequential, parallel, sub-workflows, and more
- Perfect for agentic AI: Reliability, state management, and scalability
- Production-ready: Battle-tested and widely adopted
- Rich ecosystem: Multiple language SDKs and active community
Whether you're building simple sequential workflows or complex multi-agent systems, Temporal provides the reliability and observability needed for production AI applications.









Top comments (0)