When you need to process CSV files, you usually write a simple script and it's enough, however everything tends to be more complex when you need to run it at scale.
At scale means, tens or hundreds of millions of rows.
Challenges
When you need to process CSV at scale and repetitively, you need to take of these following things :
- Parallel processing. The idea is to split the work in multiple nodes
- Storage. Having big CSV you need to have a place to store it.
- Output. If you need to store the output in a Database, you need to take care of things like DB ( connections, iops, throughput )
Solution
Using S3
+ Lambda
+ Step Functions
+ MapState
+ DynamoDB
you can solve the mentioned challenges, creating a Serverless solution and only pay for exactly what you used.
For this example I used CDK V2 to write the IaC, however you can implement using any othe IaC
Architecture overview
Storage
S3 works really well if you need to store files to process later. It's deeply integrated with AWS services and it's cost-effective
CDK code
const bucket = new Bucket(scope, 'Bucket', {
bucketName: 'csv.data-lake.example.com',
encryption: BucketEncryption.S3_MANAGED
});
Storage output
DynamoDB is a no relational database which has a serverless provisioning method where you pay per request and you don't need to take care of DB connections, iops, throughput
CDK code
new Table(scope, 'CsvItemTable', {
tableName: `CsvItems`,
partitionKey: {
name: 'pk',
type: AttributeType.STRING,
},
billingMode: BillingMode.PAY_PER_REQUEST
})
Processing
To process the CSV lines, I used a lambda function because it's really simple to step up. Other options could be ECS task using Fargate / or EKS.
Lambda definition
CDK code
new lambda.NodejsFunction(scope, 'ProcessCsvItems', {
functionName: 'process-csv-items',
entry: `${__dirname}/handler.ts`,
handler: 'handler',
runtime: Runtime.NODEJS_18_X,
environment: {
CSV_ITEM_TABLE_NAME: csvItemTable.tableName,
},
bundling: {
minify: true,
nodeModules: ['uuid'],
},
})
Lambda code
Here we receive items as an array and we store in DynamoDB using Batch Write.
export const handler: Handler<HandlerEvent> = async ({ Items }) => {
const command = new BatchWriteCommand({
RequestItems: {
[CSV_ITEM_TABLE_NAME]: Items.map((item) => ({
PutRequest: {
Item: {
pk: uuid(),
...item,
},
},
})),
},
})
await docClient.send(command)
}
Orchestrator
Step function is an orchestrator, where you can create state machines and integrate a wide range of AWS services, like Lambda function, ECS Task, EMR.
This part is a bit challenging to understand because we're not using a CDK Level 3 constructor, we need to write most of the CloudFormation code.
For this specifically case I used Map State which allows you to process a variety kind of files, like CSV, JSON.
Step Function - State defintion
Here you can specify different things like :
- CSVHeaderLocation -> Header location
- MaxConcurrency -> Max lambdas running at the same time
- MaxItemsPerBatch -> The amount of items that our lambda function is going to receive
CDK code
new CustomState(scope, 'CsvToDynamoDBState', {
stateJson: {
Type: 'Map',
ItemReader: {
Resource: 'arn:aws:states:::s3:getObject',
ReaderConfig: {
InputType: 'CSV',
CSVHeaderLocation: 'FIRST_ROW',
},
Parameters: {
'Bucket.$': '$.bucket',
'Key.$': '$.key',
},
},
ItemProcessor: {
ProcessorConfig: {
Mode: 'DISTRIBUTED',
ExecutionType: 'STANDARD',
},
StartAt: 'Lambda Invoke',
States: {
'Lambda Invoke': {
Type: 'Task',
Resource: 'arn:aws:states:::lambda:invoke',
OutputPath: '$.Payload',
Parameters: {
'Payload.$': '$',
FunctionName: lambda.functionArn,
},
Retry: [
{
ErrorEquals: [
'Lambda.ServiceException',
'Lambda.AWSLambdaException',
'Lambda.SdkClientException',
'Lambda.TooManyRequestsException',
],
IntervalSeconds: 2,
MaxAttempts: 6,
BackoffRate: 2,
},
],
End: true,
},
},
},
MaxConcurrency: 200,
Label: 'CsvToDynamoDB',
End: true,
ItemBatcher: {
MaxItemsPerBatch: 25,
},
},
})
Step Function - definition
In this part we defined the type of StateMachine
, we need to define the permissions and the steps.
Here, you need to create a role which can :
- Invoke a lambda function
- Get an object from S3
- Start the Step Function
CDK code
new StateMachine(scope, 'CsvToDynamoDBStateMachine', {
stateMachineName: 'parallel-processing-csv-to-dynamodb',
stateMachineType: StateMachineType.STANDARD,
definitionBody: DefinitionBody.fromChainable(chain),
role: new Role(scope, 'StateMachineRole', {
assumedBy: new ServicePrincipal('states.amazonaws.com'),
roleName: 'parallel-processing-csv-to-dynamodb-role',
inlinePolicies: {
LambdaInvokePolicy: new PolicyDocument({
statements: [
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['lambda:InvokeFunction'],
resources: [lambda.functionArn],
}),
],
}),
S3GetObjectPolicy: new PolicyDocument({
statements: [
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['s3:GetObject'],
resources: [bucket.arnForObjects('*')],
}),
],
}),
StepFunctionsStartExecutionPolicy: new PolicyDocument({
statements: [
new PolicyStatement({
effect: Effect.ALLOW,
actions: ['states:StartExecution', 'states:DescribeExecution', 'states:StopExecution'],
resources: ['*'],
}),
],
}),
},
}),
})
Here I left the repository link with the whole information about how to make the deploy to your own AWS account. Link
Any question or feedback is welcome !
Top comments (0)