DEV Community

Cover image for Temporal Workflow Orchestration: Building Reliable Agentic AI Systems
Akash Thakur
Akash Thakur

Posted on

Temporal Workflow Orchestration: Building Reliable Agentic AI Systems

Introduction

In the world of distributed systems and agentic AI, managing complex, long-running workflows reliably is a significant challenge. Traditional approaches often struggle with failures, state management, and ensuring that processes complete successfully even when systems crash or network issues occur.

Temporal is an open-source platform that solves these problems by providing durable, reliable workflow orchestration. It abstracts away the complexities of distributed systems, allowing developers to focus on business logic while Temporal handles failures, retries, and state management automatically.

This article explores Temporal's core concepts, workflow patterns, and how it's revolutionizing the development of agentic AI systems.

What is Temporal?

Temporal is a workflow orchestration platform that provides:

  • Durable Execution: Workflows that can run for days, months, or even years without interruption
  • Automatic Failure Recovery: Built-in retry mechanisms and state persistence
  • Time-Travel Debugging: Complete event history for every workflow execution
  • Scalability: Horizontal scaling through distributed workers
  • Language Support: SDKs for Go, Java, Python, TypeScript, .NET, PHP, and Ruby

Unlike traditional task queues or workflow engines, Temporal maintains a complete event history of every workflow execution, enabling it to resume workflows from any point after failures.

Core Concepts

1. Workflows

Workflows are the heart of Temporal. They represent the business logic of your application as code. Workflows are:

  • Deterministic: They must produce the same result when replayed
  • Durable: State is persisted automatically
  • Long-running: Can execute for extended periods
  • Versioned: Support safe deployments and migrations
from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta

@workflow.defn
class TradingWorkflow:
    """Example workflow for automated trading analysis."""

    @workflow.run
    async def run(self, symbol: str, strategy: str) -> dict:
        # Workflow logic here
        result = await workflow.execute_activity(
            analyze_market_activity,
            args=[symbol, strategy],
            start_to_close_timeout=timedelta(seconds=180),
            retry_policy=RetryPolicy(
                initial_interval=timedelta(seconds=1),
                maximum_interval=timedelta(seconds=30),
                maximum_attempts=3
            )
        )
        return result
Enter fullscreen mode Exit fullscreen mode

2. Activities

Activities are individual units of work within workflows. They:

  • Interact with external systems (APIs, databases, services)
  • Can be retried automatically on failure
  • Support timeouts and cancellation
  • Execute in a separate process from workflows
from temporalio import activity

@activity.defn
async def fetch_market_data_activity(symbol: str, timeframe: str) -> dict:
    """
    Activity that fetches market data from external APIs.
    This can fail and will be automatically retried by Temporal.
    """
    # Import external dependencies inside activity
    import requests

    # Fetch market data from API
    response = requests.get(
        f"https://api.marketdata.com/v1/quote/{symbol}",
        params={"timeframe": timeframe},
        timeout=30
    )
    response.raise_for_status()

    return {
        "symbol": symbol,
        "price_data": response.json(),
        "timestamp": response.headers.get("timestamp")
    }
Enter fullscreen mode Exit fullscreen mode

3. Workers

Workers are processes that execute workflow and activity code. They:

  • Poll the Temporal service for tasks
  • Execute workflows and activities
  • Can be scaled horizontally
  • Support multiple task queues
from temporalio.client import Client
from temporalio.worker import Worker

async def run_worker():
    # Connect to Temporal
    client = await Client.connect(
        "temporal-server:7233",
        namespace="default"
    )

    # Create worker
    worker = Worker(
        client,
        task_queue="trading-queue",
        workflows=[TradingWorkflow],
        activities=[fetch_market_data_activity]
    )

    # Run worker (blocks until stopped)
    await worker.run()
Enter fullscreen mode Exit fullscreen mode

Workflow Patterns

Temporal supports various workflow patterns to handle complex business processes. Each pattern has specific use cases, performance characteristics, and architectural considerations. Let's explore the most important ones in detail:

1. Sequential Execution

Overview: The simplest and most common pattern where activities execute one after another, with each step depending on the previous step's output. This creates a linear pipeline where data flows sequentially through the workflow.

Architecture:

Sequential Execution

Characteristics:

  • Execution Time: Sum of all activity execution times (no parallelism)
  • Dependencies: Each activity depends on the previous one
  • Error Handling: Failure in any step stops the workflow
  • State Management: State accumulates as data passes through each step
  • Use When: Steps have strict dependencies, data must be processed in order, or when parallel execution isn't possible

Performance Considerations:

  • Total execution time = sum of all activity durations
  • No parallelism benefits
  • Simple to understand and debug
  • Best for pipelines where each step transforms data for the next
@workflow.defn
class SequentialTradingWorkflow:
    @workflow.run
    async def run(self, trade_request: dict) -> dict:
        # Step 1: Fetch Market Data
        market_data = await workflow.execute_activity(
            fetch_market_data_activity,
            args=[trade_request["symbol"], trade_request["timeframe"]],
            start_to_close_timeout=timedelta(seconds=180)
        )

        # Step 2: Technical Analysis (uses market data)
        trade_request["market_data"] = market_data["price_data"]
        analysis_result = await workflow.execute_activity(
            technical_analysis_activity,
            args=[trade_request],
            start_to_close_timeout=timedelta(seconds=120)
        )

        # Step 3: Risk Assessment (uses analysis result)
        trade_request["analysis"] = analysis_result["signals"]
        risk_result = await workflow.execute_activity(
            risk_assessment_activity,
            args=[trade_request],
            start_to_close_timeout=timedelta(seconds=120)
        )

        # Step 4: Execute Trade (uses all previous results)
        trade_request["risk_score"] = risk_result["risk_score"]
        execution_result = await workflow.execute_activity(
            execute_trade_activity,
            args=[trade_request],
            start_to_close_timeout=timedelta(seconds=120)
        )

        return execution_result
Enter fullscreen mode Exit fullscreen mode

Use Case: When each step depends on the previous step's output, like a data processing pipeline, trading workflows where analysis depends on market data, or ETL processes.

Best Practices:

  • Use appropriate timeouts for each activity
  • Implement retry policies for transient failures
  • Pass only necessary data between activities to reduce state size
  • Log intermediate results for debugging
  • Consider breaking into sub-workflows if the sequence becomes too long

2. Parallel Execution

Overview: Execute multiple independent activities concurrently to maximize throughput and minimize total execution time. This pattern is essential for performance optimization when activities don't depend on each other.

Architecture:

Parallel Execution

Characteristics:

  • Execution Time: Maximum of all activity execution times (not the sum)
  • Dependencies: Activities are independent and can run simultaneously
  • Error Handling: Can use return_exceptions=True to handle partial failures
  • State Management: Results are collected and aggregated after all complete
  • Use When: Activities are independent, you need to fetch data from multiple sources, or when performance is critical

Performance Considerations:

  • Total execution time ≈ max(activity durations) + aggregation time
  • Significant performance improvement over sequential execution
  • Resource usage scales with number of parallel activities
  • Best for I/O-bound operations (API calls, database queries)
@workflow.defn
class ParallelWorkflow:
    @workflow.run
    async def run(self, data_sources: list) -> dict:
        # Define activity options
        activity_options = {
            "start_to_close_timeout": timedelta(seconds=120),
            "retry_policy": RetryPolicy(maximum_attempts=3)
        }

        # Execute multiple activities in parallel
        futures = []
        for source in data_sources:
            future = workflow.execute_activity(
                fetch_data_activity,
                args=[source],
                **activity_options
            )
            futures.append(future)

        # Wait for all activities to complete
        results = await asyncio.gather(*futures)

        # Aggregate results
        aggregated = await workflow.execute_activity(
            aggregate_data_activity,
            args=[results],
            start_to_close_timeout=timedelta(seconds=60)
        )

        return aggregated
Enter fullscreen mode Exit fullscreen mode

Use Case: Fetching data from multiple sources, processing independent tasks, batch operations, multi-exchange trading data collection, or parallel analysis of different market segments.

Real-World Example: In a trading workflow, we could parallelize data collection from multiple exchanges:

# Parallel market data fetching from different exchanges
market_data_futures = [
    workflow.execute_activity(
        fetch_market_data_activity,
        args=[{"symbol": symbol, "exchange": "NYSE"}]
    ),
    workflow.execute_activity(
        fetch_market_data_activity,
        args=[{"symbol": symbol, "exchange": "NASDAQ"}]
    ),
    workflow.execute_activity(
        fetch_market_data_activity,
        args=[{"symbol": symbol, "exchange": "LSE"}]
    )
]

# Wait for all market data activities to complete
market_data_results = await asyncio.gather(*market_data_futures)
Enter fullscreen mode Exit fullscreen mode

Best Practices:

  • Use asyncio.gather() with return_exceptions=True for error handling
  • Set appropriate timeouts for each parallel activity
  • Consider rate limiting when calling external APIs in parallel
  • Aggregate results efficiently to avoid large state objects
  • Monitor resource usage when scaling parallel activities

3. Sub-Workflows (Child Workflows)

Overview: Break complex workflows into smaller, reusable sub-workflows (child workflows). This pattern promotes modularity, reusability, and separation of concerns. Child workflows can be independently versioned, monitored, and scaled.

Architecture:

Sub-Workflows

Characteristics:

  • Modularity: Each child workflow is a self-contained unit
  • Reusability: Child workflows can be reused across multiple parent workflows
  • Isolation: Child workflow failures don't necessarily fail the parent
  • Versioning: Child workflows can be versioned independently
  • Monitoring: Each child workflow has its own execution history
  • Use When: You need to modularize complex processes, create reusable components, or implement hierarchical workflows

Performance Considerations:

  • Child workflows execute independently and can be distributed across workers
  • Parent workflow waits for child completion (can be made async with signals)
  • Each child workflow maintains its own state and history
  • Useful for breaking down large workflows into manageable pieces
@workflow.defn
class OrderExecutionWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> dict:
        # Execute child workflow for order validation
        validation_result = await workflow.execute_child_workflow(
            OrderValidationWorkflow.run,
            args=[order_id],
            id=f"validation-{order_id}"
        )

        if validation_result["status"] != "valid":
            return {"status": "failed", "reason": "validation_failed"}

        # Execute child workflow for risk check
        risk_result = await workflow.execute_child_workflow(
            RiskCheckWorkflow.run,
            args=[order_id],
            id=f"risk-{order_id}"
        )

        # Execute child workflow for trade execution
        execution_result = await workflow.execute_child_workflow(
            TradeExecutionWorkflow.run,
            args=[order_id],
            id=f"execution-{order_id}"
        )

        return {
            "order_id": order_id,
            "validation": validation_result,
            "risk_check": risk_result,
            "execution": execution_result
        }

@workflow.defn
class OrderValidationWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> dict:
        # Order validation logic
        result = await workflow.execute_activity(
            validate_order_activity,
            args=[order_id]
        )
        return result

@workflow.defn
class RiskCheckWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> dict:
        # Risk assessment logic
        result = await workflow.execute_activity(
            assess_risk_activity,
            args=[order_id]
        )
        return result
Enter fullscreen mode Exit fullscreen mode

Use Case: Modularizing complex processes, creating reusable workflow components, implementing saga patterns, or when you need to orchestrate multiple independent workflows.

Best Practices:

  • Use descriptive workflow IDs for child workflows
  • Handle child workflow failures gracefully
  • Consider using signals for async child workflow communication
  • Version child workflows independently for safe deployments
  • Monitor child workflows separately for better observability

4. Conditional Execution

Overview: Execute different workflow paths based on runtime conditions, state, or external factors. This pattern enables dynamic workflow routing and adaptive behavior.

Architecture:

Conditional Execution

Characteristics:

  • Dynamic Routing: Workflow path determined at runtime
  • Conditional Logic: Based on activity results, external state, or user input
  • Branching: Multiple execution paths from a single decision point
  • Convergence: Paths may converge back to common activities
  • Use When: You need adaptive behavior, A/B testing, different processing based on data characteristics, or user-driven workflows

Performance Considerations:

  • Only one path executes, so performance depends on chosen path
  • Condition evaluation adds minimal overhead
  • Useful for avoiding unnecessary work in alternative paths
  • Can optimize by evaluating conditions early
@workflow.defn
class AdaptiveTradingWorkflow:
    @workflow.run
    async def run(self, trade_request: dict) -> dict:
        # Initial market analysis
        market_analysis = await workflow.execute_activity(
            analyze_market_activity,
            args=[trade_request]
        )

        # Conditional branching based on market conditions
        if market_analysis["volatility"] == "high":
            # Use conservative strategy
            result = await workflow.execute_activity(
                conservative_trading_activity,
                args=[trade_request, market_analysis]
            )
        elif market_analysis["volatility"] == "medium":
            # Use standard strategy
            result = await workflow.execute_activity(
                standard_trading_activity,
                args=[trade_request, market_analysis]
            )
        else:
            # Use aggressive strategy
            result = await workflow.execute_activity(
                aggressive_trading_activity,
                args=[trade_request, market_analysis]
            )

        return result
Enter fullscreen mode Exit fullscreen mode

Use Case: Dynamic workflow routing, A/B testing, adaptive processing based on data characteristics, market condition-based trading strategies, or user preference-driven workflows.

Best Practices:

  • Keep condition evaluation deterministic
  • Document all possible execution paths
  • Handle all branches in error scenarios
  • Use clear, descriptive condition names
  • Consider using enums for condition values

5. Fan-Out/Fan-In Pattern

Overview: Distribute work across multiple activities (fan-out), then collect and aggregate results (fan-in). This pattern is ideal for processing large datasets, batch operations, or map-reduce scenarios.

Architecture:

Fan-Out/Fan-In Pattern

Characteristics:

  • Fan-Out: Distribute work to multiple parallel activities
  • Fan-In: Collect results from all activities
  • Error Handling: Can handle partial failures gracefully
  • Scalability: Can process large numbers of tasks
  • Use When: Batch processing, map-reduce operations, processing large datasets in chunks, or parallel task execution

Performance Considerations:

  • Execution time ≈ max(task durations) + aggregation time
  • Scales well with number of tasks
  • Can process thousands of tasks in parallel
  • Resource usage scales with task count
  • Best for CPU-bound or I/O-bound batch operations
@workflow.defn
class FanOutFanInWorkflow:
    @workflow.run
    async def run(self, task_list: list) -> dict:
        # Fan-out: Distribute tasks across multiple activities
        futures = []
        for task in task_list:
            future = workflow.execute_activity(
                process_task_activity,
                args=[task],
                start_to_close_timeout=timedelta(seconds=300)
            )
            futures.append(future)

        # Fan-in: Wait for all tasks to complete
        results = await asyncio.gather(*futures, return_exceptions=True)

        # Process results (handle exceptions)
        successful_results = []
        failed_tasks = []

        for i, result in enumerate(results):
            if isinstance(result, Exception):
                failed_tasks.append({"task": task_list[i], "error": str(result)})
            else:
                successful_results.append(result)

        # Aggregate results
        summary = await workflow.execute_activity(
            aggregate_results_activity,
            args=[successful_results, failed_tasks]
        )

        return summary
Enter fullscreen mode Exit fullscreen mode

Use Case: Batch processing, map-reduce operations, processing large datasets in chunks, processing multiple orders simultaneously, or parallel portfolio analysis.

Best Practices:

  • Use return_exceptions=True to handle partial failures
  • Implement retry logic for transient failures
  • Consider batching tasks to avoid overwhelming workers
  • Monitor task completion rates
  • Implement graceful degradation for partial failures

6. Saga Pattern (Compensating Transactions)

Overview: Execute a series of transactions with compensating actions (rollback operations) if any step fails. This pattern ensures data consistency in distributed systems where traditional ACID transactions aren't possible.

Architecture:

Saga Pattern

Characteristics:

  • Forward Path: Execute transactions in sequence
  • Compensation: Each transaction has a compensating action
  • Rollback: On failure, execute compensations in reverse order
  • Consistency: Ensures system remains in consistent state
  • Use When: Distributed transactions, order processing, multi-step processes requiring rollback, or financial operations

Performance Considerations:

  • Execution time includes both forward and potential compensation paths
  • Compensation adds overhead but ensures consistency
  • Critical for financial and transactional systems
  • Can be optimized by minimizing compensation complexity
@workflow.defn
class TradeExecutionSagaWorkflow:
    @workflow.run
    async def run(self, trade_order: dict) -> dict:
        completed_steps = []

        try:
            # Step 1: Reserve capital
            capital_result = await workflow.execute_activity(
                reserve_capital_activity,
                args=[trade_order]
            )
            completed_steps.append(("capital", capital_result))

            # Step 2: Place order
            order_result = await workflow.execute_activity(
                place_order_activity,
                args=[trade_order]
            )
            completed_steps.append(("order", order_result))

            # Step 3: Confirm execution
            execution_result = await workflow.execute_activity(
                confirm_execution_activity,
                args=[trade_order]
            )
            completed_steps.append(("execution", execution_result))

            return {"status": "success", "steps": completed_steps}

        except Exception as e:
            # Compensate: Undo completed steps in reverse order
            for step_name, step_result in reversed(completed_steps):
                try:
                    await workflow.execute_activity(
                        compensate_trade_activity,
                        args=[step_name, step_result]
                    )
                except Exception as comp_error:
                    workflow.logger.error(f"Compensation failed for {step_name}: {comp_error}")

            raise e
Enter fullscreen mode Exit fullscreen mode

Use Case: Distributed transactions, order processing, trade execution with rollback, or any multi-step process requiring rollback.

Best Practices:

  • Design compensating actions to be idempotent
  • Store completed steps in workflow state
  • Execute compensations in reverse order
  • Handle compensation failures gracefully
  • Log all compensation actions for audit trails

7. Polling Pattern

Overview: Poll an external system or check a condition repeatedly until a desired state is reached or a timeout occurs. This pattern is useful when waiting for external processes or asynchronous operations to complete.

Architecture:

Polling Pattern

Characteristics:

  • Polling Loop: Repeatedly check condition with intervals
  • Timeout: Maximum time to wait before giving up
  • Interval: Time between polling attempts
  • Condition Check: Determines if polling should continue
  • Use When: Waiting for external processes, monitoring long-running tasks, checking resource availability, or waiting for async operations

Performance Considerations:

  • Execution time depends on how quickly condition is met
  • Polling interval affects responsiveness vs. resource usage
  • Use appropriate timeouts to avoid infinite loops
  • Consider exponential backoff for polling intervals
  • Best for waiting on external systems with unknown completion times
@workflow.defn
class PollingWorkflow:
    @workflow.run
    async def run(self, job_id: str) -> dict:
        max_attempts = 60  # Poll for up to 5 minutes (60 * 5 seconds)
        attempt = 0

        while attempt < max_attempts:
            # Check job status
            status = await workflow.execute_activity(
                check_job_status_activity,
                args=[job_id],
                start_to_close_timeout=timedelta(seconds=10)
            )

            if status["state"] == "completed":
                # Fetch results
                results = await workflow.execute_activity(
                    fetch_job_results_activity,
                    args=[job_id]
                )
                return results

            elif status["state"] == "failed":
                raise Exception(f"Job {job_id} failed: {status.get('error')}")

            # Wait before next poll
            await workflow.sleep(timedelta(seconds=5))
            attempt += 1

        raise TimeoutError(f"Job {job_id} did not complete within timeout")
Enter fullscreen mode Exit fullscreen mode

Use Case: Waiting for external processes, monitoring long-running tasks, checking for resource availability, waiting for trade execution confirmation, or polling for order status.

Best Practices:

  • Use exponential backoff for polling intervals
  • Set reasonable timeouts to avoid infinite loops
  • Make condition checks efficient (avoid heavy operations)
  • Consider using signals instead of polling when possible
  • Log polling attempts for debugging

8. Event-Driven Pattern

Overview: Workflows that respond to external events or signals, enabling reactive and interactive workflows. This pattern allows workflows to wait for external input or events before proceeding.

Architecture:

Event-Driven Pattern

Characteristics:

  • Reactive: Workflows respond to external events
  • Interactive: Can wait for human input or external systems
  • Flexible: Workflow behavior adapts based on received events
  • Use When: Human-in-the-loop workflows, waiting for external approvals, event-driven systems, or interactive processes

Example:

@workflow.defn
class EventDrivenTradingWorkflow:
    @workflow.run
    async def run(self, trade_request: dict) -> dict:
        # Initial analysis
        analysis = await workflow.execute_activity(
            analyze_market_activity,
            args=[trade_request]
        )

        # Wait for approval signal (can come from user or system)
        approval = await workflow.wait_condition(
            lambda: self.approved is not None,
            timeout=timedelta(minutes=5)
        )

        if self.approved:
            # Execute trade
            result = await workflow.execute_activity(
                execute_trade_activity,
                args=[trade_request, analysis]
            )
            return result
        else:
            return {"status": "rejected", "reason": "not_approved"}

    @workflow.signal
    def approve_trade(self, approved: bool):
        self.approved = approved
Enter fullscreen mode Exit fullscreen mode

9. Chained Workflows Pattern

Overview: Execute workflows in a chain where the output of one workflow triggers or provides input to the next. This creates a pipeline of workflows, each handling a specific stage of processing.

Architecture:

Chained Workflows Pattern

Characteristics:

  • Pipeline: Workflows execute in sequence
  • Data Flow: Output of one workflow becomes input to next
  • Modularity: Each workflow handles a specific stage
  • Use When: Multi-stage processing, data transformation pipelines, or when you need to separate concerns across workflow boundaries

Pattern Comparison Summary

Pattern Execution Time Complexity Use Case
Sequential Sum of all activities Low Dependent steps, pipelines
Parallel Max of activities Medium Independent tasks, performance
Sub-Workflows Depends on children Medium-High Modularity, reusability
Conditional Single path Medium Dynamic routing, adaptation
Fan-Out/Fan-In Max + aggregation Medium-High Batch processing, map-reduce
Saga Forward + compensation High Transactions, rollback
Polling Variable (until condition) Low-Medium External waits, monitoring
Event-Driven Variable (until event) Medium Interactive, reactive
Chained Sum of workflows Medium Multi-stage pipelines

Temporal in Agentic AI

Agentic AI systems involve autonomous agents that make decisions and perform tasks over extended periods. Temporal provides an ideal foundation for building these systems.

Why Temporal for Agentic AI?

1. Reliability and Resilience

AI agents operate in unpredictable environments and may encounter:

  • Network failures
  • API rate limits
  • Model inference errors
  • Resource constraints

Temporal's durable execution ensures agent workflows recover automatically:

@workflow.defn
class AIAgentWorkflow:
    @workflow.run
    async def run(self, user_query: str) -> dict:
        # Agent reasoning step
        reasoning = await workflow.execute_activity(
            agent_reasoning_activity,
            args=[user_query],
            retry_policy=RetryPolicy(
                initial_interval=timedelta(seconds=2),
                maximum_interval=timedelta(seconds=60),
                maximum_attempts=5
            )
        )

        # Agent action step
        action_result = await workflow.execute_activity(
            agent_action_activity,
            args=[reasoning],
            retry_policy=RetryPolicy(maximum_attempts=3)
        )

        return action_result
Enter fullscreen mode Exit fullscreen mode

2. Complex Decision-Making Orchestration

AI agents often need to coordinate multiple decision-making processes. Temporal's support for parallel execution and sub-workflows enables agents to:

  • Perform concurrent analysis
  • Delegate subtasks efficiently
  • Make decisions based on aggregated results
@workflow.defn
class MultiAgentOrchestrationWorkflow:
    @workflow.run
    async def run(self, task: dict) -> dict:
        # Parallel agent execution
        agent_futures = [
            workflow.execute_activity(
                research_agent_activity,
                args=[task]
            ),
            workflow.execute_activity(
                analysis_agent_activity,
                args=[task]
            ),
            workflow.execute_activity(
                planning_agent_activity,
                args=[task]
            )
        ]

        # Wait for all agents to complete
        agent_results = await asyncio.gather(*agent_futures)

        # Coordinator agent synthesizes results
        final_result = await workflow.execute_activity(
            coordinator_agent_activity,
            args=[agent_results]
        )

        return final_result
Enter fullscreen mode Exit fullscreen mode

3. State Management Across Long-Running Processes

AI agents need to maintain context over extended periods. Temporal's event history provides:

  • Complete state persistence
  • Ability to resume from any point
  • Time-travel debugging
  • Audit trails
@workflow.defn
class LongRunningAgentWorkflow:
    def __init__(self):
        self.conversation_history = []
        self.agent_state = {}

    @workflow.query
    def get_conversation_history(self) -> list:
        """Query method to get current conversation state."""
        return self.conversation_history

    @workflow.run
    async def run(self, initial_message: str) -> dict:
        self.conversation_history.append({
            "role": "user",
            "content": initial_message
        })

        # Agent processes message
        response = await workflow.execute_activity(
            agent_process_activity,
            args=[self.conversation_history]
        )

        self.conversation_history.append({
            "role": "assistant",
            "content": response["content"]
        })

        # Workflow can continue indefinitely, maintaining state
        return {"status": "completed", "history": self.conversation_history}
Enter fullscreen mode Exit fullscreen mode

4. Scalability for AI Workloads

As AI systems scale, Temporal's architecture supports:

  • Horizontal scaling through multiple workers
  • Load distribution across task queues
  • Efficient resource utilization
# Multiple workers can process different trading agent types
worker_market_data = Worker(
    client,
    task_queue="market-data-agents-queue",
    workflows=[MarketDataAgentWorkflow],
    activities=[market_data_activities]
)

worker_trading = Worker(
    client,
    task_queue="trading-agents-queue",
    workflows=[TradingAgentWorkflow],
    activities=[trading_activities]
)
Enter fullscreen mode Exit fullscreen mode

Real-World Example: Automated Trading Agent System

Let's examine a real-world implementation that demonstrates Temporal's power in agentic AI for trading:

@workflow.defn
class AutomatedTradingWorkflow:
    """
    Automated Trading Workflow - Orchestrates multiple AI agents
    for comprehensive trading analysis and execution.
    """

    def __init__(self):
        self.progress_events = []
        self.trade_state = {}

    @workflow.query
    def get_progress(self) -> list:
        """Query method for real-time progress tracking."""
        return self.progress_events

    @workflow.query
    def get_trade_state(self) -> dict:
        """Query method for current trade state."""
        return self.trade_state

    @workflow.run
    async def run(
        self,
        symbol: str,
        strategy: str,
        user_id: str = "default-user"
    ) -> str:
        """
        Execute automated trading workflow with 4 sequential AI agents.
        """
        trade_request = {
            "symbol": symbol,
            "strategy": strategy,
            "user_id": user_id
        }
        all_results = []

        # Define retry policy for reliability
        retry_policy = RetryPolicy(
            initial_interval=timedelta(seconds=1),
            maximum_interval=timedelta(seconds=30),
            maximum_attempts=3,
            non_retryable_error_types=["ValueError", "InsufficientFundsError"]
        )

        # Agent 1: Market Data Collection
        self.progress_events.append({
            "event": "node_executing",
            "node": "market_data_collection",
            "type": "data_fetch"
        })

        result1 = await workflow.execute_activity(
            fetch_market_data_activity,
            args=[symbol, "1h"],
            start_to_close_timeout=timedelta(seconds=180),
            retry_policy=retry_policy
        )

        trade_request["market_data"] = result1["price_data"]
        self.trade_state["market_data"] = result1
        all_results.append({"agent": "Market Data Collector", "result": result1})

        # Agent 2: Technical Analysis
        self.progress_events.append({
            "event": "node_executing",
            "node": "technical_analysis",
            "type": "analysis"
        })

        result2 = await workflow.execute_activity(
            technical_analysis_activity,
            args=[trade_request],
            start_to_close_timeout=timedelta(seconds=120),
            retry_policy=retry_policy
        )

        trade_request["signals"] = result2["signals"]
        self.trade_state["analysis"] = result2
        all_results.append({"agent": "Technical Analyst", "result": result2})

        # Agent 3: Risk Assessment
        self.progress_events.append({
            "event": "node_executing",
            "node": "risk_assessment",
            "type": "risk_analysis"
        })

        result3 = await workflow.execute_activity(
            risk_assessment_activity,
            args=[trade_request],
            start_to_close_timeout=timedelta(seconds=120),
            retry_policy=retry_policy
        )

        trade_request["risk_score"] = result3["risk_score"]
        self.trade_state["risk"] = result3
        all_results.append({"agent": "Risk Analyst", "result": result3})

        # Agent 4: Trade Execution (only if risk is acceptable)
        if result3["risk_score"] < 0.7:  # Risk threshold
            self.progress_events.append({
                "event": "node_executing",
                "node": "trade_execution",
                "type": "execution"
            })

            result4 = await workflow.execute_activity(
                execute_trade_activity,
                args=[trade_request],
                start_to_close_timeout=timedelta(seconds=120),
                retry_policy=retry_policy
            )

            self.trade_state["execution"] = result4
            all_results.append({"agent": "Trade Executor", "result": result4})
        else:
            all_results.append({
                "agent": "Trade Executor",
                "result": {"status": "skipped", "reason": "risk_too_high"}
            })

        return json.dumps({
            "summary": "Trading workflow completed",
            "agents": all_results,
            "workflow_id": workflow.info().workflow_id,
            "final_state": self.trade_state
        })
Enter fullscreen mode Exit fullscreen mode

Key Features Demonstrated:

  1. Sequential Agent Execution: Each agent uses the previous agent's output
  2. Progress Tracking: Real-time progress events via workflow queries
  3. State Queries: Ability to query current trade state at any time
  4. Automatic Retries: Retry policy ensures reliability
  5. Conditional Execution: Trade execution only if risk is acceptable
  6. Timeout Management: Appropriate timeouts for each activity type
  7. State Persistence: Workflow state maintained across failures

Enhanced Version: Parallel Agent Execution

We can enhance the workflow to use parallel execution for independent tasks:

@workflow.defn
class EnhancedTradingWorkflow:
    @workflow.run
    async def run(self, symbol: str, strategy: str) -> dict:
        # Parallel data collection from multiple exchanges
        market_data_futures = [
            workflow.execute_activity(
                fetch_market_data_activity,
                args=[{"symbol": symbol, "exchange": "NYSE"}]
            ),
            workflow.execute_activity(
                fetch_market_data_activity,
                args=[{"symbol": symbol, "exchange": "NASDAQ"}]
            ),
            workflow.execute_activity(
                fetch_market_data_activity,
                args=[{"symbol": symbol, "exchange": "LSE"}]
            )
        ]

        # Wait for all market data to complete
        market_data_results = await asyncio.gather(*market_data_futures)

        # Parallel analysis on different timeframes
        analysis_futures = [
            workflow.execute_activity(
                technical_analysis_activity,
                args=[{"data": result, "timeframe": "1h"}]
            ) for result in market_data_results
        ]

        analysis_results = await asyncio.gather(*analysis_futures)

        # Aggregate signals and execute trade
        aggregated_signals = await workflow.execute_activity(
            aggregate_signals_activity,
            args=[{"analyses": analysis_results}]
        )

        # Execute trade based on aggregated signals
        trade_result = await workflow.execute_activity(
            execute_trade_activity,
            args=[{"symbol": symbol, "signals": aggregated_signals}]
        )

        return trade_result
Enter fullscreen mode Exit fullscreen mode

Advanced Patterns for Agentic AI

1. Agent Decision Trees

Use sub-workflows to create complex decision trees for AI agents:

@workflow.defn
class TradingStrategyRouterWorkflow:
    @workflow.run
    async def run(self, trade_request: dict) -> dict:
        # Initial market analysis
        market_analysis = await workflow.execute_activity(
            analyze_market_activity,
            args=[trade_request]
        )

        # Route to appropriate trading strategy based on market conditions
        if market_analysis["market_condition"] == "trending":
            return await workflow.execute_child_workflow(
                TrendFollowingStrategyWorkflow.run,
                args=[trade_request, market_analysis]
            )
        elif market_analysis["market_condition"] == "ranging":
            return await workflow.execute_child_workflow(
                MeanReversionStrategyWorkflow.run,
                args=[trade_request, market_analysis]
            )
        else:
            return await workflow.execute_child_workflow(
                AdaptiveStrategyWorkflow.run,
                args=[trade_request, market_analysis]
            )
Enter fullscreen mode Exit fullscreen mode

2. Agent Collaboration

Orchestrate multiple agents working together:

@workflow.defn
class CollaborativeTradingWorkflow:
    @workflow.run
    async def run(self, trade_request: dict) -> dict:
        # Agent 1: Market Data Collection
        market_data = await workflow.execute_activity(
            market_data_agent_activity,
            args=[trade_request]
        )

        # Agent 2: Technical Analysis (uses market data)
        technical_analysis = await workflow.execute_activity(
            technical_analysis_agent_activity,
            args=[market_data]
        )

        # Agent 3: Risk Assessment (uses both)
        risk_assessment = await workflow.execute_activity(
            risk_assessment_agent_activity,
            args=[market_data, technical_analysis]
        )

        # Agent 4: Trade Decision (uses all)
        trade_decision = await workflow.execute_activity(
            trade_decision_agent_activity,
            args=[market_data, technical_analysis, risk_assessment]
        )

        return trade_decision
Enter fullscreen mode Exit fullscreen mode

3. Adaptive Agent Workflows

Adjust workflow behavior based on intermediate results:

@workflow.defn
class AdaptiveTradingAgentWorkflow:
    @workflow.run
    async def run(self, trade_request: dict) -> dict:
        max_iterations = 5

        for iteration in range(max_iterations):
            # Agent processes trade request
            result = await workflow.execute_activity(
                trading_agent_process_activity,
                args=[trade_request]
            )

            # Check if trade is complete
            if result["status"] == "executed" or result["status"] == "rejected":
                return result

            # Adapt based on result
            if result["needs_more_data"]:
                # Fetch additional market data
                additional_data = await workflow.execute_activity(
                    fetch_additional_data_activity,
                    args=[result["missing_data"]]
                )
                trade_request["additional_data"] = additional_data

            # Adjust strategy based on market feedback
            if result["strategy_adjustment_needed"]:
                trade_request["strategy"] = result["recommended_strategy"]

            # Update trade request for next iteration
            trade_request["iteration"] = iteration + 1
            trade_request["previous_result"] = result

        return {"status": "max_iterations_reached", "final_result": trade_request}
Enter fullscreen mode Exit fullscreen mode

Benefits of Temporal for Agentic AI

1. Reliability

  • Automatic retries on failures
  • State persistence across crashes
  • Guaranteed execution completion
  • No data loss

2. Observability

  • Complete execution history
  • Real-time progress tracking
  • Time-travel debugging
  • Built-in monitoring

3. Scalability

  • Horizontal worker scaling
  • Load distribution
  • Efficient resource utilization
  • Support for high-throughput systems

4. Developer Experience

  • Write workflows as code
  • Familiar programming constructs
  • Strong typing support
  • Comprehensive SDKs

5. Production-Ready

  • Battle-tested at scale
  • Used by major companies
  • Active community
  • Regular updates and improvements

Best Practices

1. Activity Design

  • Keep activities focused and single-purpose
  • Import external dependencies inside activities
  • Use appropriate timeouts
  • Handle errors gracefully

2. Workflow Design

  • Keep workflows deterministic
  • Avoid random number generation (use activities)
  • Don't use system time directly (use workflow.now())
  • Use queries for read-only state access

3. Error Handling

  • Define appropriate retry policies
  • Use non-retryable error types for permanent failures
  • Implement compensation logic for sagas
  • Log errors appropriately

4. Performance

  • Use parallel execution when possible
  • Set appropriate timeouts
  • Monitor workflow execution times
  • Optimize activity execution

5. Testing

  • Test workflows in isolation
  • Mock activities for unit testing
  • Use Temporal's test framework
  • Test failure scenarios

Conclusion

Temporal provides a robust foundation for building reliable, scalable agentic AI systems. Its durable execution model, comprehensive workflow patterns, and excellent developer experience make it an ideal choice for orchestrating complex AI agent workflows.

Key takeaways:

  • Temporal abstracts complexity: Focus on business logic, not infrastructure
  • Multiple workflow patterns: Sequential, parallel, sub-workflows, and more
  • Perfect for agentic AI: Reliability, state management, and scalability
  • Production-ready: Battle-tested and widely adopted
  • Rich ecosystem: Multiple language SDKs and active community

Whether you're building simple sequential workflows or complex multi-agent systems, Temporal provides the reliability and observability needed for production AI applications.

Resources

Top comments (0)