Since AWS Step Functions added support for AWS SDK integration, it has become very powerful for serverless integration of AWS services. Previously, the go-to approach would be a simple Lambda function, but Step Functions involves even less maintenance. However, Step Functions has (mostly deliberate) limitations that we need to work with/around.
Some AWS API actions return paginated responses, meaning you get a chunk of the total response and you have to ask for the next chunk if you want it. So if there are 150 items, you may need to pull them 25 at a time. How do we handle this in Step Functions, where each state only gets to make one call?
The key to handling pagination is to loop based on the presence of a continuation token item, usually named NextToken
. When a paginated response is returned, the JSON includes this token, which (1) indicates there additional items to be retrieved and (2) needs to be provided in the subsequent API calls to indicate you want the next page. Unfortunately the token name isn’t consistent, see Ian McKay’s complete list of rules for AWS SDK pagination.
Here's a concrete example. Let's say we want to perform an action on every instance of the Amazon WorkSpaces service, each representing a virtual desktop, like updating it to use the latest image. workspaces:DescribeWorkspaces
returns 25 at a time, but we have several hundred.
We call DescribeWorkspaces
to get the first set of items and map each one to our desired action, in this case RebuildWorkspaces
. Then a Choice
state checks if NextToken
is present in the task result. If so, call DescribeWorkspaces
again, this time using the NextToken
parameter. Critically, this overwrites the task result from the first call, so the next time we reach the Choice
state, it can check if there are even more items to retrieve (NextToken
is still present). If we've reached the end (NextToken
isn't present), we can move on with the workflow.
Here's the full definition of this Step Functions state machine in AWS CDK with Python.
import aws_cdk.aws_stepfunctions as sfn
import aws_cdk.aws_stepfunctions_tasks as sfn_tasks
describe_workspaces = sfn_tasks.CallAwsService(
self,
id="DescribeWorkspaces",
comment="Get workspaces",
service="workspaces",
action="describeWorkspaces",
result_path="$.DescribeWorkspacesResult",
iam_resources=["*"],
)
describe_more_workspaces = sfn_tasks.CallAwsService(
self,
id="DescribeMoreWorkspaces",
comment="Get workspaces with NextToken",
service="workspaces",
action="describeWorkspaces",
parameters={
"NextToken": sfn.JsonPath.string_at(
"$.DescribeWorkspacesResult.NextToken"
)
},
result_path="$.DescribeWorkspacesResult",
iam_resources=["*"],
)
rebuild_workspaces = sfn_tasks.CallAwsService(
self,
id="RebuildWorkspaces",
comment="Rebuild workspaces",
service="workspaces",
action="rebuildWorkspaces",
parameters={
"RebuildWorkspaceRequests": [
{"WorkspaceId": sfn.JsonPath.string_at("$.WorkspaceId")}
]
},
result_path="$.RebuildWorkspacesResult",
iam_resources=["*"],
)
rebuild_each_workspace = sfn.Map(
self,
id="RebuildEachWorkspace",
comment="Rebuild each workspace",
items_path="$.DescribeWorkspacesResult.Workspaces",
output_path=sfn.JsonPath.DISCARD,
)
rebuild_each_workspace.iterator(sfn.Pass(self, "Map State"))
definition = describe_workspaces.next(rebuild_each_workspace).next(
sfn.Choice(self, "ChoiceMoreWorkspaces")
.when(
sfn.Condition.is_present("$.DescribeWorkspacesResult.NextToken"),
describe_more_workspaces.next(rebuild_each_workspace),
)
.otherwise(sfn.Succeed(self, "Done"))
)
state_machine = sfn.StateMachine(
self,
id="WorkSpacesRebuilderStateMachine",
state_machine_type=sfn.StateMachineType.STANDARD,
definition=definition,
)
Thanks to Karsten Lang for fixing an error in a previous version
Top comments (2)
The example drops an error that leads to a Github issue which can be solved by adding the following line:
The finished
cdk deploy
-able example is available here ..Thanks for this write up
Have you ever stumbled upon this error:
You specified an invalid value for nextToken. You must get the value from the response to a previous call to the API. (Service: Organizations, Status Code: 400, Request ID: 4e0f3bc2-7219-4869-848a-f2b2f6423523)
Related to docs.aws.amazon.com/organizations/...
Do you know the structure of NextToken? How to validate that a given NextToken is valid and so forth?