DEV Community

Hungai Amuhinda
Hungai Amuhinda

Posted on • Originally published at hungaikev.in on

Implementing an Order Processing System: Part 2 - Advanced Temporal Workflows

1. Introduction and Goals

Welcome back to our series on implementing a sophisticated order processing system! In our previous post, we laid the foundation for our project, setting up a basic CRUD API, integrating with a Postgres database, and implementing a simple Temporal workflow. Today, we’re diving deeper into the world of Temporal workflows to create a robust, scalable order processing system.

Recap of the Previous Post

In Part 1, we:

  • Set up our project structure
  • Implemented a basic CRUD API using Golang and Gin
  • Integrated with a Postgres database
  • Created a simple Temporal workflow
  • Dockerized our application

Goals for This Post

In this post, we’ll significantly expand our use of Temporal, exploring advanced concepts and implementing complex workflows. By the end of this article, you’ll be able to:

  1. Design and implement multi-step order processing workflows
  2. Handle long-running processes effectively
  3. Implement robust error handling and retry mechanisms
  4. Version workflows for safe updates in production
  5. Implement saga patterns for distributed transactions
  6. Set up monitoring and observability for Temporal workflows

Let’s dive in!

2. Theoretical Background and Concepts

Before we start coding, let’s review some key Temporal concepts that will be crucial for our advanced implementation.

Temporal Workflows and Activities

In Temporal, a Workflow is a durable function that orchestrates long-running business logic. Workflows are fault-tolerant and can survive process and machine failures. They can be thought of as reliable coordination mechanisms for your application’s state transitions.

Activities, on the other hand, are the building blocks of a workflow. They represent a single, well-defined action or task, such as making an API call, writing to a database, or sending an email. Activities can be retried independently of the workflow that invokes them.

Workflow Execution, History, and State Management

When a workflow is executed, Temporal maintains a history of all the events that occur during its lifetime. This history is the source of truth for the workflow’s state. If a workflow worker fails and restarts, it can reconstruct the workflow’s state by replaying this history.

This event-sourcing approach allows Temporal to provide strong consistency guarantees and enables features like workflow versioning and continue-as-new.

Handling Long-Running Processes

Temporal is designed to handle processes that can run for extended periods - from minutes to days or even months. It provides mechanisms like heartbeats for long-running activities and continue-as-new for workflows that generate large histories.

Workflow Versioning

As your system evolves, you may need to update workflow definitions. Temporal provides versioning capabilities that allow you to make non-breaking changes to workflows without affecting running instances.

Saga Pattern for Distributed Transactions

The Saga pattern is a way to manage data consistency across microservices in distributed transaction scenarios. It’s particularly useful when you need to maintain consistency across multiple services without using distributed ACID transactions. Temporal provides an excellent framework for implementing sagas.

Now that we’ve covered these concepts, let’s start implementing our advanced order processing workflow.

3. Implementing Complex Order Processing Workflows

Let’s design a multi-step order processing workflow that includes order validation, payment processing, inventory management, and shipping arrangement. We’ll implement each of these steps as separate activities coordinated by a workflow.

First, let’s define our activities:

// internal/workflow/activities.go

package workflow

import (
    "context"
    "errors"

    "go.temporal.io/sdk/activity"
    "github.com/yourusername/order-processing-system/internal/db"
)

type OrderActivities struct {
    queries *db.Queries
}

func NewOrderActivities(queries *db.Queries) *OrderActivities {
    return &OrderActivities{queries: queries}
}

func (a *OrderActivities) ValidateOrder(ctx context.Context, order db.Order) error {
    // Implement order validation logic
    if order.TotalAmount <= 0 {
        return errors.New("invalid order amount")
    }
    // Add more validation as needed
    return nil
}

func (a *OrderActivities) ProcessPayment(ctx context.Context, order db.Order) error {
    // Implement payment processing logic
    // This could involve calling a payment gateway API
    activity.GetLogger(ctx).Info("Processing payment", "orderId", order.ID, "amount", order.TotalAmount)
    // Simulate payment processing
    // In a real scenario, you'd integrate with a payment gateway here
    return nil
}

func (a *OrderActivities) UpdateInventory(ctx context.Context, order db.Order) error {
    // Implement inventory update logic
    // This could involve updating stock levels in the database
    activity.GetLogger(ctx).Info("Updating inventory", "orderId", order.ID)
    // Simulate inventory update
    // In a real scenario, you'd update your inventory management system here
    return nil
}

func (a *OrderActivities) ArrangeShipping(ctx context.Context, order db.Order) error {
    // Implement shipping arrangement logic
    // This could involve calling a shipping provider's API
    activity.GetLogger(ctx).Info("Arranging shipping", "orderId", order.ID)
    // Simulate shipping arrangement
    // In a real scenario, you'd integrate with a shipping provider here
    return nil
}

Enter fullscreen mode Exit fullscreen mode

Now, let’s implement our complex order processing workflow:

// internal/workflow/order_workflow.go

package workflow

import (
    "time"

    "go.temporal.io/sdk/workflow"
    "github.com/yourusername/order-processing-system/internal/db"
)

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started", "OrderID", order.ID)

    // Activity options
    activityOptions := workflow.ActivityOptions{
        StartToCloseTimeout: time.Minute,
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval: time.Second,
            BackoffCoefficient: 2.0,
            MaximumInterval: time.Minute,
            MaximumAttempts: 5,
        },
    }
    ctx = workflow.WithActivityOptions(ctx, activityOptions)

    // Step 1: Validate Order
    err := workflow.ExecuteActivity(ctx, a.ValidateOrder, order).Get(ctx, nil)
    if err != nil {
        logger.Error("Order validation failed", "OrderID", order.ID, "Error", err)
        return err
    }

    // Step 2: Process Payment
    err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil)
    if err != nil {
        logger.Error("Payment processing failed", "OrderID", order.ID, "Error", err)
        return err
    }

    // Step 3: Update Inventory
    err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil)
    if err != nil {
        logger.Error("Inventory update failed", "OrderID", order.ID, "Error", err)
        // In case of inventory update failure, we might need to refund the payment
        // This is where the saga pattern becomes useful, which we'll cover later
        return err
    }

    // Step 4: Arrange Shipping
    err = workflow.ExecuteActivity(ctx, a.ArrangeShipping, order).Get(ctx, nil)
    if err != nil {
        logger.Error("Shipping arrangement failed", "OrderID", order.ID, "Error", err)
        // If shipping fails, we might need to revert inventory and refund payment
        return err
    }

    logger.Info("OrderWorkflow completed successfully", "OrderID", order.ID)
    return nil
}

Enter fullscreen mode Exit fullscreen mode

This workflow coordinates multiple activities, each representing a step in our order processing. Note how we’re using workflow.ExecuteActivity to run each activity, passing the order data as needed.

We’ve also set up activity options with a retry policy. This means if an activity fails (e.g., due to a temporary network issue), Temporal will automatically retry it based on our specified policy.

In the next section, we’ll explore how to handle long-running processes within this workflow structure.

4. Handling Long-Running Processes with Temporal

In real-world scenarios, some of our activities might take a long time to complete. For example, payment processing might need to wait for bank confirmation, or shipping arrangement might depend on external logistics systems. Temporal provides several mechanisms to handle such long-running processes effectively.

Heartbeats for Long-Running Activities

For activities that might run for extended periods, it’s crucial to implement heartbeats. Heartbeats allow an activity to report its progress and let Temporal know that it’s still alive and working. If an activity fails to heartbeat within the expected interval, Temporal can mark it as failed and potentially retry it.

Let’s modify our ArrangeShipping activity to include heartbeats:

func (a *OrderActivities) ArrangeShipping(ctx context.Context, order db.Order) error {
    logger := activity.GetLogger(ctx)
    logger.Info("Arranging shipping", "orderId", order.ID)

    // Simulate a long-running process
    for i := 0; i < 10; i++ {
        // Simulate work
        time.Sleep(time.Second)

        // Record heartbeat
        activity.RecordHeartbeat(ctx, i)

        // Check if we need to cancel
        if activity.GetInfo(ctx).Attempt > 1 {
            logger.Info("Cancelling shipping arrangement due to retry", "orderId", order.ID)
            return nil
        }
    }

    logger.Info("Shipping arranged", "orderId", order.ID)
    return nil
}

Enter fullscreen mode Exit fullscreen mode

In this example, we’re simulating a long-running process with a loop. We record a heartbeat in each iteration, allowing Temporal to track the activity’s progress.

Using Continue-As-New for Very Long-Running Workflows

For workflows that run for very long periods or accumulate a large history, Temporal provides the “continue-as-new” feature. This allows you to complete the current workflow execution and immediately start a new execution with the same workflow ID, carrying over any necessary state.

Here’s an example of how we might use continue-as-new in a long-running order tracking workflow:

func LongRunningOrderTrackingWorkflow(ctx workflow.Context, orderID string) error {
    logger := workflow.GetLogger(ctx)

    // Set up a timer for how long we want this workflow execution to run
    timerFired := workflow.NewTimer(ctx, 24*time.Hour)

    // Set up a selector to wait for either the timer to fire or the order to be delivered
    selector := workflow.NewSelector(ctx)

    var orderDelivered bool
    selector.AddFuture(timerFired, func(f workflow.Future) {
        // Timer fired, we'll continue-as-new
        logger.Info("24 hours passed, continuing as new", "orderID", orderID)
        workflow.NewContinueAsNewError(ctx, LongRunningOrderTrackingWorkflow, orderID)
    })

    selector.AddReceive(workflow.GetSignalChannel(ctx, "orderDelivered"), func(c workflow.ReceiveChannel, more bool) {
        c.Receive(ctx, &orderDelivered)
        logger.Info("Order delivered signal received", "orderID", orderID)
    })

    selector.Select(ctx)

    if orderDelivered {
        logger.Info("Order tracking completed, order delivered", "orderID", orderID)
        return nil
    }

    // If we reach here, it means we're continuing as new
    return workflow.NewContinueAsNewError(ctx, LongRunningOrderTrackingWorkflow, orderID)
}

Enter fullscreen mode Exit fullscreen mode

In this example, we set up a workflow that tracks an order for delivery. It runs for 24 hours before using continue-as-new to start a fresh execution. This prevents the workflow history from growing too large over extended periods.

By leveraging these techniques, we can handle long-running processes effectively in our order processing system, ensuring reliability and scalability even for operations that take extended periods to complete.

In the next section, we’ll dive into implementing robust retry logic and error handling in our workflows and activities.

5. Implementing Retry Logic and Error Handling

Robust error handling and retry mechanisms are crucial for building resilient systems, especially in distributed environments. Temporal provides powerful built-in retry mechanisms, but it’s important to understand how to use them effectively and when to implement custom retry logic.

Configuring Retry Policies for Activities

Temporal allows you to configure retry policies at both the workflow and activity level. Let’s update our workflow to include a more sophisticated retry policy:

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started", "OrderID", order.ID)

    // Define a retry policy
    retryPolicy := &temporal.RetryPolicy{
        InitialInterval: time.Second,
        BackoffCoefficient: 2.0,
        MaximumInterval: time.Minute,
        MaximumAttempts: 5,
        NonRetryableErrorTypes: []string{"InvalidOrderError"},
    }

    // Activity options with retry policy
    activityOptions := workflow.ActivityOptions{
        StartToCloseTimeout: time.Minute,
        RetryPolicy: retryPolicy,
    }
    ctx = workflow.WithActivityOptions(ctx, activityOptions)

    // Execute activities with retry policy
    err := workflow.ExecuteActivity(ctx, a.ValidateOrder, order).Get(ctx, nil)
    if err != nil {
        return handleOrderError(ctx, "ValidateOrder", err, order)
    }

    // ... (other activities)

    return nil
}

Enter fullscreen mode Exit fullscreen mode

In this example, we’ve defined a retry policy that starts with a 1-second interval, doubles the interval with each retry (up to a maximum of 1 minute), and allows up to 5 attempts. We’ve also specified that errors of type “InvalidOrderError” should not be retried.

Implementing Custom Retry Logic

While Temporal’s built-in retry mechanisms are powerful, sometimes you need custom retry logic. Here’s an example of implementing custom retry logic for a payment processing activity:

func (a *OrderActivities) ProcessPaymentWithCustomRetry(ctx context.Context, order db.Order) error {
    logger := activity.GetLogger(ctx)
    var err error
    for attempt := 1; attempt <= 3; attempt++ {
        err = a.processPayment(ctx, order)
        if err == nil {
            return nil
        }

        if _, ok := err.(*PaymentDeclinedError); ok {
            // Payment was declined, no point in retrying
            return err
        }

        logger.Info("Payment processing failed, retrying", "attempt", attempt, "error", err)
        time.Sleep(time.Duration(attempt) * time.Second)
    }
    return err
}

func (a *OrderActivities) processPayment(ctx context.Context, order db.Order) error {
    // Actual payment processing logic here
    // ...
}

Enter fullscreen mode Exit fullscreen mode

In this example, we implement a custom retry mechanism that attempts the payment processing up to 3 times, with an increasing delay between attempts. It also handles a specific error type (PaymentDeclinedError) differently, not retrying in that case.

Handling and Propagating Errors

Proper error handling is crucial for maintaining the integrity of our workflow. Let’s implement a helper function to handle errors in our workflow:

func handleOrderError(ctx workflow.Context, activityName string, err error, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Error("Activity failed", "activity", activityName, "orderID", order.ID, "error", err)

    // Depending on the activity and error type, we might want to compensate
    switch activityName {
    case "ProcessPayment":
        // If payment processing failed, we might need to cancel the order
        _ = workflow.ExecuteActivity(ctx, CancelOrder, order).Get(ctx, nil)
    case "UpdateInventory":
        // If inventory update failed after payment, we might need to refund
        _ = workflow.ExecuteActivity(ctx, RefundPayment, order).Get(ctx, nil)
    }

    // Create a customer-facing error message
    return workflow.NewCustomError("OrderProcessingFailed", "Failed to process order due to: "+err.Error())
}

Enter fullscreen mode Exit fullscreen mode

This helper function logs the error, performs any necessary compensating actions, and returns a custom error that can be safely returned to the customer.

6. Versioning Workflows for Safe Updates

As your system evolves, you’ll need to update your workflow definitions. Temporal provides versioning capabilities that allow you to make changes to workflows without affecting running instances.

Implementing Versioned Workflows

Here’s an example of how to implement versioning in our order processing workflow:

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started", "OrderID", order.ID)

    // Use GetVersion to handle workflow versioning
    v := workflow.GetVersion(ctx, "OrderWorkflow.PaymentProcessing", workflow.DefaultVersion, 1)

    if v == workflow.DefaultVersion {
        // Old version: process payment before updating inventory
        err := workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil)
        if err != nil {
            return handleOrderError(ctx, "ProcessPayment", err, order)
        }

        err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil)
        if err != nil {
            return handleOrderError(ctx, "UpdateInventory", err, order)
        }
    } else {
        // New version: update inventory before processing payment
        err := workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil)
        if err != nil {
            return handleOrderError(ctx, "UpdateInventory", err, order)
        }

        err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil)
        if err != nil {
            return handleOrderError(ctx, "ProcessPayment", err, order)
        }
    }

    // ... rest of the workflow

    return nil
}

Enter fullscreen mode Exit fullscreen mode

In this example, we’ve used workflow.GetVersion to introduce a change in the order of operations. The new version updates inventory before processing payment, while the old version does the opposite. This allows us to gradually roll out the change without affecting running workflow instances.

Strategies for Updating Workflows in Production

When updating workflows in a production environment, consider the following strategies:

  1. Incremental Changes : Make small, incremental changes rather than large overhauls. This makes it easier to manage versions and roll back if needed.

  2. Compatibility Periods : Maintain compatibility with older versions for a certain period to allow running workflows to complete.

  3. Feature Flags : Use feature flags in conjunction with workflow versions to control the rollout of new features.

  4. Monitoring and Alerting : Set up monitoring and alerting for workflow versions to track the progress of updates and quickly identify any issues.

  5. Rollback Plan : Always have a plan to roll back to the previous version if issues are detected with the new version.

By following these strategies and leveraging Temporal’s versioning capabilities, you can safely evolve your workflows over time without disrupting ongoing operations.

In the next section, we’ll explore how to implement the Saga pattern for managing distributed transactions in our order processing system.

7. Implementing Saga Patterns for Distributed Transactions

The Saga pattern is a way to manage data consistency across microservices in distributed transaction scenarios. It’s particularly useful in our order processing system where we need to coordinate actions across multiple services (e.g., inventory, payment, shipping) and provide a mechanism for compensating actions if any step fails.

Designing a Saga for Our Order Processing System

Let’s design a saga for our order processing system that includes the following steps:

  1. Reserve Inventory
  2. Process Payment
  3. Update Inventory
  4. Arrange Shipping

If any of these steps fail, we need to execute compensating actions for the steps that have already completed.

Here’s how we can implement this saga using Temporal:

func OrderSaga(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderSaga started", "OrderID", order.ID)

    // Saga compensations
    var compensations []func(context.Context) error

    // Step 1: Reserve Inventory
    err := workflow.ExecuteActivity(ctx, a.ReserveInventory, order).Get(ctx, nil)
    if err != nil {
        return fmt.Errorf("failed to reserve inventory: %w", err)
    }
    compensations = append(compensations, func(ctx context.Context) error {
        return a.ReleaseInventoryReservation(ctx, order)
    })

    // Step 2: Process Payment
    err = workflow.ExecuteActivity(ctx, a.ProcessPayment, order).Get(ctx, nil)
    if err != nil {
        return compensate(ctx, compensations, fmt.Errorf("failed to process payment: %w", err))
    }
    compensations = append(compensations, func(ctx context.Context) error {
        return a.RefundPayment(ctx, order)
    })

    // Step 3: Update Inventory
    err = workflow.ExecuteActivity(ctx, a.UpdateInventory, order).Get(ctx, nil)
    if err != nil {
        return compensate(ctx, compensations, fmt.Errorf("failed to update inventory: %w", err))
    }
    // No compensation needed for this step, as we've already updated the inventory

    // Step 4: Arrange Shipping
    err = workflow.ExecuteActivity(ctx, a.ArrangeShipping, order).Get(ctx, nil)
    if err != nil {
        return compensate(ctx, compensations, fmt.Errorf("failed to arrange shipping: %w", err))
    }

    logger.Info("OrderSaga completed successfully", "OrderID", order.ID)
    return nil
}

func compensate(ctx workflow.Context, compensations []func(context.Context) error, err error) error {
    logger := workflow.GetLogger(ctx)
    logger.Error("Saga failed, executing compensations", "error", err)

    for i := len(compensations) - 1; i >= 0; i-- {
        compensationErr := workflow.ExecuteActivity(ctx, compensations[i]).Get(ctx, nil)
        if compensationErr != nil {
            logger.Error("Compensation failed", "error", compensationErr)
            // In a real-world scenario, you might want to implement more sophisticated
            // error handling for failed compensations, such as retrying or alerting
        }
    }

    return err
}

Enter fullscreen mode Exit fullscreen mode

In this implementation, we execute each step of the order process as an activity. After each successful step, we add a compensating action to a slice. If any step fails, we call the compensate function, which executes all the compensating actions in reverse order.

This approach ensures that we maintain data consistency across our distributed system, even in the face of failures.

8. Monitoring and Observability for Temporal Workflows

Effective monitoring and observability are crucial for operating Temporal workflows in production. Let’s explore how to implement comprehensive monitoring for our order processing system.

Implementing Custom Metrics

Temporal provides built-in metrics, but we can also implement custom metrics for our specific use cases. Here’s an example of how to add custom metrics to our workflow:

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started", "OrderID", order.ID)

    // Define metric
    orderProcessingTime := workflow.NewTimer(ctx, 0)
    defer func() {
        duration := orderProcessingTime.ElapsedTime()
        workflow.GetMetricsHandler(ctx).Timer("order_processing_time").Record(duration)
    }()

    // ... rest of the workflow implementation

    return nil
}

Enter fullscreen mode Exit fullscreen mode

In this example, we’re recording the total time taken to process an order.

Integrating with Prometheus

To integrate with Prometheus, we need to expose our metrics. Here’s how we can set up a Prometheus endpoint in our main application:

package main

import (
    "net/http"

    "github.com/prometheus/client_golang/prometheus/promhttp"
    "go.temporal.io/sdk/client"
    "go.temporal.io/sdk/worker"
)

func main() {
    // ... Temporal client setup

    // Create a worker
    w := worker.New(c, "order-processing-task-queue", worker.Options{})

    // Register workflows and activities
    w.RegisterWorkflow(OrderWorkflow)
    w.RegisterActivity(a.ValidateOrder)
    // ... register other activities

    // Start the worker
    go func() {
        err := w.Run(worker.InterruptCh())
        if err != nil {
            logger.Fatal("Unable to start worker", err)
        }
    }()

    // Expose Prometheus metrics
    http.Handle("/metrics", promhttp.Handler())
    go func() {
        err := http.ListenAndServe(":2112", nil)
        if err != nil {
            logger.Fatal("Unable to start metrics server", err)
        }
    }()

    // ... rest of your application
}

Enter fullscreen mode Exit fullscreen mode

This sets up a /metrics endpoint that Prometheus can scrape to collect our custom metrics along with the built-in Temporal metrics.

Implementing Structured Logging

Structured logging can greatly improve the observability of our system. Let’s update our workflow to use structured logging:

func OrderWorkflow(ctx workflow.Context, order db.Order) error {
    logger := workflow.GetLogger(ctx)
    logger.Info("OrderWorkflow started",
        "OrderID", order.ID,
        "CustomerID", order.CustomerID,
        "TotalAmount", order.TotalAmount,
    )

    // ... workflow implementation

    logger.Info("OrderWorkflow completed",
        "OrderID", order.ID,
        "Duration", workflow.Now(ctx).Sub(workflow.GetInfo(ctx).WorkflowStartTime),
    )

    return nil
}

Enter fullscreen mode Exit fullscreen mode

This approach makes it easier to search and analyze logs, especially when aggregating logs from multiple services.

Setting Up Distributed Tracing

Distributed tracing can provide valuable insights into the flow of requests through our system. While Temporal doesn’t natively support distributed tracing, we can implement it in our activities:

import (
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/trace"
)

func (a *OrderActivities) ProcessPayment(ctx context.Context, order db.Order) error {
    _, span := otel.Tracer("order-processing").Start(ctx, "ProcessPayment")
    defer span.End()

    span.SetAttributes(
        attribute.Int64("order.id", order.ID),
        attribute.Float64("order.amount", order.TotalAmount),
    )

    // ... payment processing logic

    return nil
}

Enter fullscreen mode Exit fullscreen mode

By implementing distributed tracing, we can track the entire lifecycle of an order across multiple services and activities.

9. Testing and Validation

Thorough testing is crucial for ensuring the reliability of our Temporal workflows. Let’s explore some strategies for testing our order processing system.

Unit Testing Workflows

Temporal provides a testing framework that allows us to unit test workflows. Here’s an example of how to test our OrderWorkflow:

func TestOrderWorkflow(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestWorkflowEnvironment()

    // Mock activities
    env.OnActivity(a.ValidateOrder, mock.Anything, mock.Anything).Return(nil)
    env.OnActivity(a.ProcessPayment, mock.Anything, mock.Anything).Return(nil)
    env.OnActivity(a.UpdateInventory, mock.Anything, mock.Anything).Return(nil)
    env.OnActivity(a.ArrangeShipping, mock.Anything, mock.Anything).Return(nil)

    // Execute workflow
    env.ExecuteWorkflow(OrderWorkflow, db.Order{ID: 1, CustomerID: 100, TotalAmount: 99.99})

    require.True(t, env.IsWorkflowCompleted())
    require.NoError(t, env.GetWorkflowError())
}

Enter fullscreen mode Exit fullscreen mode

This test sets up a test environment, mocks the activities, and verifies that the workflow completes successfully.

Testing Saga Compensations

It’s important to test that our saga compensations work correctly. Here’s an example test:

func TestOrderSagaCompensation(t *testing.T) {
    testSuite := &testsuite.WorkflowTestSuite{}
    env := testSuite.NewTestWorkflowEnvironment()

    // Mock activities
    env.OnActivity(a.ReserveInventory, mock.Anything, mock.Anything).Return(nil)
    env.OnActivity(a.ProcessPayment, mock.Anything, mock.Anything).Return(errors.New("payment failed"))
    env.OnActivity(a.ReleaseInventoryReservation, mock.Anything, mock.Anything).Return(nil)

    // Execute workflow
    env.ExecuteWorkflow(OrderSaga, db.Order{ID: 1, CustomerID: 100, TotalAmount: 99.99})

    require.True(t, env.IsWorkflowCompleted())
    require.Error(t, env.GetWorkflowError())

    // Verify that compensation was called
    env.AssertExpectations(t)
}

Enter fullscreen mode Exit fullscreen mode

This test verifies that when the payment processing fails, the inventory reservation is released as part of the compensation.

10. Challenges and Considerations

As we implement and operate our advanced order processing system, there are several challenges and considerations to keep in mind:

  1. Workflow Complexity : As workflows grow more complex, they can become difficult to understand and maintain. Regular refactoring and good documentation are crucial.

  2. Testing Long-Running Workflows : Testing workflows that may run for days or weeks can be challenging. Consider implementing mechanisms to speed up time in your tests.

  3. Handling External Dependencies : External services may fail or become unavailable. Implement circuit breakers and fallback mechanisms to handle these scenarios.

  4. Monitoring and Alerting : Set up comprehensive monitoring and alerting to quickly identify and respond to issues in your workflows.

  5. Data Consistency : Ensure that your saga implementations maintain data consistency across services, even in the face of failures.

  6. Performance Tuning : As your system scales, you may need to tune Temporal’s performance settings, such as the number of workflow and activity workers.

  7. Workflow Versioning : Carefully manage workflow versions to ensure smooth updates without breaking running instances.

11. Next Steps and Preview of Part 3

In this post, we’ve delved deep into advanced Temporal workflow concepts, implementing complex order processing logic, saga patterns, and robust error handling. We’ve also covered monitoring, observability, and testing strategies for our workflows.

In the next part of our series, we’ll focus on advanced database operations with sqlc. We’ll cover:

  1. Implementing complex database queries and transactions
  2. Optimizing database performance
  3. Implementing batch operations
  4. Handling database migrations in a production environment
  5. Implementing database sharding for scalability
  6. Ensuring data consistency in a distributed system

Stay tuned as we continue to build out our sophisticated order processing system!


Need Help?

Are you facing challenging problems, or need an external perspective on a new idea or project? I can help! Whether you're looking to build a technology proof of concept before making a larger investment, or you need guidance on difficult issues, I'm here to assist.

Services Offered:

  • Problem-Solving: Tackling complex issues with innovative solutions.
  • Consultation: Providing expert advice and fresh viewpoints on your projects.
  • Proof of Concept: Developing preliminary models to test and validate your ideas.

If you're interested in working with me, please reach out via email at hungaikevin@gmail.com.

Let's turn your challenges into opportunities!

Top comments (0)