Step Functions distributed map is a powerful feature that helps building highly parallel serverless data processing workflows. It has a good integration with S3 where it enables the processing of millions of objects in an efficient way.
This feature relies on the “Distributed” mode of the Map State in order to process, in parallel, a list of S3 Objects in the bucket:
However, at the time of writing, the ItemReader step of the Map state does not support S3 buckets that are on other or accounts or regions:
In this article, we will see how to work around this limitation. In fact, many solutions are possible:
Using S3 bucket replication: We can replicate the source S3 bucket and sync it with a bucket in the target account where we want to run the distributed map job.
Another solution is to initiate the workflow with an initial step. This step synchronously lists the keys of objects in the source bucket and subsequently writes this list to an intermediate bucket in the target account. This file is then configured as the data source for the distributed map.
Alternatively, a third solution similar to the second one, involves configuring an S3 inventory on the source bucket and using it to get the list of the keys.
In this article we will focus on the second solution.
Solution overview
🔍 Here are the relevant parts:
Both of the Lambda functions “List objects in source bucket” and “Process objects” require cross-account access to the S3 bucket on the source account.
“List objects in source bucket” Lambda function uses S3
ListObjectsV2
to get the list of the keys in the source bucket. It writes that list in JSON format in the “Object keys inventory bucket” in the target account.The map state is configured in «Distributed» mode and uses the JSON file containing the list as the source.
The distributed map state’s iterations run in parallel. Each iteration creates a child execution workflow that invokes the «Process objects» Lambda function with a batch of keys.
TL;DR
You will find the complete source code here 👇
GitHub - ziedbentahar/stepfunctions-distributed-map-cross-account-s3-access
In this example I will use NodeJs, typescript and CDK for IaC.
Let’s see the code
1- “List objects in source bucket” and “Process objects“ Lambda functions
The “List objects in source bucket” Lambda function requires two parameters: A prefix, used to list only the keys starting with it, and an output file that will contain the list of keys. These parameters are supplied by the state machine.
The function getKeysFromBucketByPrefix
calls ListObjectsV2
. It iterates through all objects in the bucket that start with the given prefix. The loop continues until there are no more continuation tokens, indicating that all keys have been retrieved. The function then returns the list of keys in an array, which can be written to the "Object keys inventory bucket" by the writeKeysAsJSONIntoBucket
function.
The “Process Objects” Lambda function will be invoked by the workflow’s map execution with a batch of item keys as its input. The size of this batch is configurable on the distributed map state. In fact, by batching items we can improve performance and reduce cost for large datasets.
We will need to update the source account’s bucket policy to allows the two Lambda function roles in the target account to perform ListObjectsV2
and GetObject
operations, respectively.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::<your-account-id>:role/<list-bucket-lambda-role-name>"
},
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::<source-bucket-name>"
},
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::<your-account-id>:role/<process-object-lambda-role-name>"
},
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::<source-bucket-name>/*"
}
]
}
⚠️ ️Important: Using a Lambda function to list objects from an S3 bucket might not be most cost effective solution when dealing with tens of millions of items. It’s also important to keep in mind the Lambda function’s 15 minutes execution time limit. It’s worth exploring alternative solutions such as running the list objects operation as an ECS Task or, as I mentioned on the previous section, configuring and relying on the S3 source bucket inventory.
You can find the complete CDK definition of these two lambda functions following this link.
2- State machine definition
Alright, let’s have a look into the workflow definition:
Here, we use the state machine’s execution name property, provided by the context object, $$.Execution.Name
, as the filename to store the list of keys from the source bucket. We also pass the state machine’s input property, $.prefix
, to the “List objects in source bucket” Lambda function.
At the time of writing, CDK does not provide a native Distributed Map state implementation. We will use CustomState
where we pass the ASL JSON definition:
We configure the
ItemProcessor
inDistributed
Mode.We set the
ItemReader
as a JSON file in the list S3 bucket and we use the$$.Execution.Name
as the Key of the JSON file to read from the bucket.
☝️ Depending on your use case, you may want to configure the maximum number of concurrent executions as well as the maximum number of items per batch. This will have an impact on the overall execution time of the process.
You can find here the full state machine definition.
Once you execute the state machine, you can monitor the items processing status on the Map Run page:
Wrapping up
Step Functions distributed map a valuable service to include in your toolkit. In this article, we’ve seen how to use the Step Functions distributed map with S3 buckets that are not in the same account as the state machine.Hopefully, AWS will address this limitation!
You can find a complete sample application repository here:
GitHub - ziedbentahar/stepfunctions-distributed-map-cross-account-s3-access
Further readings
Using Map state in Distributed mode to orchestrate large-scale parallel workloads
Top comments (0)