DEV Community

Michael Wu for AWS Community Builders

Posted on

Parallel task error handling in Step Functions

The Problem

When you run concurrent tasks in step functions, either as a Parallel state or a Map state, you might want to wait until all tasks complete, regardless of whether they would succeed or fail. By default, Map or Parallel state will wait for all to succeed before ending or proceeding to the next state, however they will be terminated if a single iteration or branch has failed, and all other in-flight executions will be stopped.

While this behaviour might echo the philosophy of fail-fast, there are scenarios in which you might want to hold on terminating such an overarching state should anything fail. For example:

  • Given 3 branches of workload I want to run in parallel, I want a converged response at the end, containing response from each branch, be it a success or failure.
  • Given 5 items in an array that I want to run through a set of steps concurrently, I want to know which ones succeed and which ones fail (and reasoning).

The Solution

The solution is to handle the possible exceptions and use a Pass state to propagate result.

Parallel state

To demonstrate the problem in a parallel state, let's look at what happens if a branch fails.

Alt Text

In this example, you will see I have two branches of workload, long success represents a long-running, eventually successful workload, while quick fail represents a failed workload that fails before long success completes. The screenshot shows how the parallel state (and the entire state machine) has been cancelled because quick fail branch has failed while long success has been stopped too (greyed out). This is inline with what AWS documentation says

If any branch fails, because of an unhandled error or by transitioning to a Fail state, the entire Parallel state is considered to have failed and all its branches are stopped. If the error is not handled by the Parallel state itself, Step Functions stops the execution with an error.

Now let's see what happens if we handle whatever error is thrown from quick fail and use a Pass state to return the result for this branch.

Alt Text

You can see that quick fail step becomes amber, which means it's in a Caught Error status, and it's got a following step of handle failure which is a Pass state that simply returns an output and ends the branch execution.

The big difference here is that while the branch of quick fail has completed, the overall parallel state is still in-flight, as indicated by the blue colour, due to the ongoing long success branch. Eventually, the long success branch will complete, and together with the output from handle failure, both execution results will be aggregated into the Final State.

Alt Text

The definition of the state machine with error handling would look something like this

{
  "StartAt": "Parallel",
  "States": {
    "Parallel": {
      "Type": "Parallel",
      "Next": "Final State",
      "Branches": [
        {
          "StartAt": "long success",
          "States": {
            "long success": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-southeast-2:123456789098:function:long-success",
              "End": true
            }
          }
        },
        {
          "StartAt": "quick fail",
          "States": {
            "quick fail": {
              "Type": "Task",
              "Resource": "arn:aws:lambda:ap-southeast-2:123456789098:function:quick-fail",
              "Catch": [
                {
                  "ErrorEquals": [
                    "Exception"
                  ],
                  "Next": "handle failure",
                  "ResultPath": "$.error"
                }
              ],
              "End": true
            },
            "handle failure": {
              "Type": "Pass",
              "End": true
            }
          }
        }
      ]
    },
    "Final State": {
      "Type": "Pass",
      "End": true
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Given the snippet above, you would expect to see an array output from Final State with two elements in it, something like this

[
  <long-success-response>,
  {
    <input-json-blob>,
    "error": {
      "Error": "Exception",
      "Cause": "{\"errorMessage\": \"quick fail function has failed\", \"errorType\": \"Exception\", \"stackTrace\": [\"  File \\\"/var/task/lambda_function.py\\\", line 6, in lambda_handler\\n    raise Exception(\\\"quick fail function has failed\\\")\\n\"]}"
    }
  }
]
Enter fullscreen mode Exit fullscreen mode

From here, you have the ability to process response from each individual branch without failing the entire state machine.

Map state

Similarly, you can achieve the same result in the Map state, if your parallelism is dynamic.

A successful definition using this approach would look something like this

{
  "StartAt": "Map",
  "States": {
    "Map": {
      "Type": "Map",
      "ItemsPath": "$.array",
      "ResultPath": "$.array",
      "Next": "Final State",
      "Iterator": {
        "StartAt": "mapper",
        "States": {
          "mapper": {
            "Type": "Task",
            "Resource": "arn:aws:lambda:ap-southeast-2:123456789098:function:sfn-mapper",
            "Catch": [
              {
                "ErrorEquals": [
                  "Exception"
                ],
                "Next": "handle mapper error",
                "ResultPath": "$.error"
              }
            ],
            "ResultPath": "$.data",
            "End": true
          },
          "handle mapper error": {
            "Type": "Pass",
            "End": true
          }
        }
      }
    },
    "Final State": {
      "Type": "Pass",
      "End": true
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

In this example, I use "ResultPath": "$.data" and "ResultPath": "$.error" to log success and fail under the key data and error in the response to make them easy to distinguish. The mapper state runs a lambda function that either returns a success response or throws an error, depending on the execution input. The input would look something like this

{
  "array": [
    {
      "wait": 5,
      "throw": true,
      "name": "one"
    },
    {
      "wait": 10,
      "throw": false,
      "name": "two"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

The lambda function sfn-mapper looks like this (using Python as an example)

import json
import time

def lambda_handler(event, context):
    time.sleep(int(event["wait"]))
    if bool(event["throw"]):
        raise Exception(f"{event['name']} has thrown an error")

    return f"{event['name']} has succeeded after {event['wait']} seconds"
Enter fullscreen mode Exit fullscreen mode

When you run such state machine, the end state will look similar to this

Alt Text

Since we are handling exceptions on each iteration, the entire state machine has succeeded. If you check out the response from Final State, you will see something like this based on the above input example.

{
  "array": [
    {
      "wait": 5,
      "throw": true,
      "name": "one",
      "error": {
        "Error": "Exception",
        "Cause": "{\"errorMessage\": \"one has thrown an error\", \"errorType\": \"Exception\", \"stackTrace\": [\"  File \\\"/var/task/lambda_function.py\\\", line 7, in lambda_handler\\n    raise Exception(f\\\"{event['name']} has thrown an error\\\")\\n\"]}"
      }
    },
    {
      "wait": 10,
      "throw": false,
      "name": "two",
      "data": "two has succeeded after 10 seconds"
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

Notice the error and data key respective to the corresponding iteration.

I hope you've enjoyed this walk-through. Error handling in Parallel or Map state could be really useful in instances where you want greater control over how to process individual response in these states.

Oldest comments (1)

Collapse
 
jcoffeynyl profile image
jcoffeyNYL

Good article. I will apply the approach to SQS, lambda trigger, step function, DynamoDB POC where I want to process messages in batches, allow the good one to save to the db and place the bad one on a deal letter queue.