DEV Community

Cover image for How to process CSV file at scale using Step Function and Lambda function ?
Sergio Kaz
Sergio Kaz

Posted on

How to process CSV file at scale using Step Function and Lambda function ?

When you need to process CSV files, you usually write a simple script and it's enough, however everything tends to be more complex when you need to run it at scale.

At scale means, tens or hundreds of millions of rows.

Challenges

When you need to process CSV at scale and repetitively, you need to take of these following things :

  • Parallel processing. The idea is to split the work in multiple nodes
  • Storage. Having big CSV you need to have a place to store it.
  • Output. If you need to store the output in a Database, you need to take care of things like DB ( connections, iops, throughput )

Solution

Using S3 + Lambda + Step Functions + MapState + DynamoDB you can solve the mentioned challenges, creating a Serverless solution and only pay for exactly what you used.

For this example I used CDK V2 to write the IaC, however you can implement using any othe IaC

Architecture overview

Architecture

Storage

S3 works really well if you need to store files to process later. It's deeply integrated with AWS services and it's cost-effective

CDK code

const bucket = new Bucket(scope, 'Bucket', {
    bucketName: 'csv.data-lake.example.com',
    encryption: BucketEncryption.S3_MANAGED
});
Enter fullscreen mode Exit fullscreen mode

Github Link

Storage output

DynamoDB is a no relational database which has a serverless provisioning method where you pay per request and you don't need to take care of DB connections, iops, throughput

CDK code

 new Table(scope, 'CsvItemTable', {
    tableName: `CsvItems`,
    partitionKey: {
      name: 'pk',
      type: AttributeType.STRING,
    },
    billingMode: BillingMode.PAY_PER_REQUEST
  })
Enter fullscreen mode Exit fullscreen mode

Github Link

Processing

To process the CSV lines, I used a lambda function because it's really simple to step up. Other options could be ECS task using Fargate / or EKS.

Lambda definition

CDK code

new lambda.NodejsFunction(scope, 'ProcessCsvItems', {
    functionName: 'process-csv-items',
    entry: `${__dirname}/handler.ts`,
    handler: 'handler',
    runtime: Runtime.NODEJS_18_X,
    environment: {
      CSV_ITEM_TABLE_NAME: csvItemTable.tableName,
    },
    bundling: {
      minify: true,
      nodeModules: ['uuid'],
    },
  })
Enter fullscreen mode Exit fullscreen mode

Github Link

Lambda code

Here we receive items as an array and we store in DynamoDB using Batch Write.

export const handler: Handler<HandlerEvent> = async ({ Items }) => {
  const command = new BatchWriteCommand({
    RequestItems: {
      [CSV_ITEM_TABLE_NAME]: Items.map((item) => ({
        PutRequest: {
          Item: {
            pk: uuid(),
            ...item,
          },
        },
      })),
    },
  })
  await docClient.send(command)
}
Enter fullscreen mode Exit fullscreen mode

Github Link

Orchestrator

Step function is an orchestrator, where you can create state machines and integrate a wide range of AWS services, like Lambda function, ECS Task, EMR.

This part is a bit challenging to understand because we're not using a CDK Level 3 constructor, we need to write most of the CloudFormation code.

For this specifically case I used Map State which allows you to process a variety kind of files, like CSV, JSON.

Step Function - State defintion

Here you can specify different things like :

  • CSVHeaderLocation -> Header location
  • MaxConcurrency -> Max lambdas running at the same time
  • MaxItemsPerBatch -> The amount of items that our lambda function is going to receive

CDK code

new CustomState(scope, 'CsvToDynamoDBState', {
    stateJson: {
      Type: 'Map',
      ItemReader: {
        Resource: 'arn:aws:states:::s3:getObject',
        ReaderConfig: {
          InputType: 'CSV',
          CSVHeaderLocation: 'FIRST_ROW',
        },
        Parameters: {
          'Bucket.$': '$.bucket',
          'Key.$': '$.key',
        },
      },
      ItemProcessor: {
        ProcessorConfig: {
          Mode: 'DISTRIBUTED',
          ExecutionType: 'STANDARD',
        },
        StartAt: 'Lambda Invoke',
        States: {
          'Lambda Invoke': {
            Type: 'Task',
            Resource: 'arn:aws:states:::lambda:invoke',
            OutputPath: '$.Payload',
            Parameters: {
              'Payload.$': '$',
              FunctionName: lambda.functionArn,
            },
            Retry: [
              {
                ErrorEquals: [
                  'Lambda.ServiceException',
                  'Lambda.AWSLambdaException',
                  'Lambda.SdkClientException',
                  'Lambda.TooManyRequestsException',
                ],
                IntervalSeconds: 2,
                MaxAttempts: 6,
                BackoffRate: 2,
              },
            ],
            End: true,
          },
        },
      },
      MaxConcurrency: 200,
      Label: 'CsvToDynamoDB',
      End: true,
      ItemBatcher: {
        MaxItemsPerBatch: 25,
      },
    },
  })
Enter fullscreen mode Exit fullscreen mode

Github Link

Step Function - definition

In this part we defined the type of StateMachine, we need to define the permissions and the steps.

Here, you need to create a role which can :

  • Invoke a lambda function
  • Get an object from S3
  • Start the Step Function

CDK code

new StateMachine(scope, 'CsvToDynamoDBStateMachine', {
    stateMachineName: 'parallel-processing-csv-to-dynamodb',
    stateMachineType: StateMachineType.STANDARD,
    definitionBody: DefinitionBody.fromChainable(chain),
    role: new Role(scope, 'StateMachineRole', {
      assumedBy: new ServicePrincipal('states.amazonaws.com'),
      roleName: 'parallel-processing-csv-to-dynamodb-role',
      inlinePolicies: {
        LambdaInvokePolicy: new PolicyDocument({
          statements: [
            new PolicyStatement({
              effect: Effect.ALLOW,
              actions: ['lambda:InvokeFunction'],
              resources: [lambda.functionArn],
            }),
          ],
        }),
        S3GetObjectPolicy: new PolicyDocument({
          statements: [
            new PolicyStatement({
              effect: Effect.ALLOW,
              actions: ['s3:GetObject'],
              resources: [bucket.arnForObjects('*')],
            }),
          ],
        }),
        StepFunctionsStartExecutionPolicy: new PolicyDocument({
          statements: [
            new PolicyStatement({
              effect: Effect.ALLOW,
              actions: ['states:StartExecution', 'states:DescribeExecution', 'states:StopExecution'],
              resources: ['*'],
            }),
          ],
        }),
      },
    }),
  })
Enter fullscreen mode Exit fullscreen mode

Github Link

Here I left the repository link with the whole information about how to make the deploy to your own AWS account. Link

Any question or feedback is welcome !

Top comments (0)