TL; DR.
Unlock the potential of serverless architecture with AWS Lambda and DynamoDB to revolutionize your sales data processing:
- Streamline data transformations: Say goodbye to complex ETL pipelines.
- Enrich data dynamically: Calculate prices with taxes, totals, and unique IDs automatically.
- Scalability and cost-efficiency: AWS Lambda adapts seamlessly to your workload.
- Embrace serverless efficiency for transformative insights into your sales data.
- Ready to transform your data? Dive into the article for a step-by-step guide!
Introduction
When a large volume of files needs to be processed, the costs can skyrocket as well as the capacity of handling this volume. Many options will not be suitable in this case as a server-centric architecture or manual scalability. Imagine being able to automate this process seamlessly, without the need for complex infrastructure. That’s the power of building a serverless file processing system using AWS Lambda and Amazon S3 Events.
Enter AWS Lambda and Amazon S3 Events — powerful tools that, when combined, offer a serverless solution for file processing. In this article, we’ll explore the step-by-step process of setting up a serverless file processing system using AWS Lambda and S3 Events. I will use Terraform as IaC tool.
You can find all the sources of this project on Github.
Donovan1905 / s3-serverless-file-processing
Serverless file processing using Lambda and S3 on AWS
s3-serverless-file-processing
Serverless file processing using Lambda and S3 on AWS
Use case
In the realm of data-driven enterprises, processing and extracting insights from sales data is a pivotal challenge. Imagine a scenario where CSV files containing sales details—client IDs, products, quantities, and prices—arrive in an Amazon S3 bucket. Traditional methods might involve complex setups, but we’re about to redefine efficiency with the simplicity of serverless architecture.
The Challenge: Data Transformation Complexity:
- Simplify the transformation of CSV data into a structured format.
- Dynamic Data Enrichment: Automate tasks like calculating prices with taxes, determining total transaction values, and assigning unique command IDs.
- Scalability and Cost Efficiency: Ensure scalability without the overhead of managing idle infrastructure.
The Solution:
AWS Lambda and DynamoDB take center stage. Lambda responds to CSV uploads, transforming data seamlessly, while DynamoDB becomes the canvas for storing and enriching sales information.
The Use Case:
- CSV Upload to S3: Sales data lands in an S3 bucket, triggering serverless processing.
- Lambda Takes the Stage: Responding to S3 events, Lambda transforms raw CSV data.
- DynamoDB, the Canvas of Transformation: Enriched data is stored with additional computed information.
- Scalability in Action: Lambda scales dynamically, ensuring efficiency and cost-effectiveness.
In the next sections, we delve into the technical details of implementing this serverless file processing system. From configuring Lambda functions to designing DynamoDB tables, let’s unlock the potential of AWS Lambda, DynamoDB and S3 to elevate sales data processing.
CSV processing with Python
To extract and process the CSV file in our Lambda, we are going to use Python runtime with csv and boto3 libraries.
from datetime import datetime
import boto3
import os
import botocore
import json
import csv
import uuid
s3 = boto3.client('s3')
def lambda_handler(event, context):
local_filename = "/tmp/filename.csv"
dynamodb = boto3.resource('dynamodb')
item_table = dynamodb.Table(os.environ['ITEM_TABLE'])
command_table = dynamodb.Table(os.environ['COMMAND_TABLE'])
s3 = boto3.client('s3')
BUCKET_NAME = event["Records"][0]["s3"]["bucket"]["name"]
S3_KEY = event["Records"][0]["s3"]["object"]["key"]
print(BUCKET_NAME)
print(S3_KEY)
try:
s3.download_file(BUCKET_NAME, S3_KEY, local_filename)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
print("The object does not exist: s3://" + BUCKET_NAME + S3_KEY)
with open(local_filename, mode='r') as csv_file:
csv_reader = csv.DictReader(csv_file)
total_price = 0
command_id = str(uuid.uuid1())
for row in csv_reader:
total_price += float(row["unit_price"]) * float(row["quantity"])
item_table.put_item(
Item={
'id': str(uuid.uuid1()),
'command_id': command_id,
'item_name': row["item_name"],
'unit_price': row["unit_price"],
'quantity': row["quantity"],
'customer': row["customer"],
'salesman': row["salesman"]
}
)
command_table.put_item(
Item={
'id': command_id,
'date': str(datetime.now()),
'status': "Pending"
}
)
return {
"statusCode": 200,
"body": json.dumps({
"message": "data processed with command_id " + command_id,
}),
}
Next, build the zip package that will be uploaded for Lambda with this script:
# lambda_build_script.sh
python3 -m pip install --platform manylinux2014_x86_64 --implementation cp --only-binary=:all: --upgrade --target venv/lib/python3.11/site-packages/ -r requirements.txt
mkdir lambda_package
cp -r venv/lib/python3.11/site-packages/* lambda_package/
cp -r src lambda_package/
cd lambda_package && zip -r ../lambda.zip * && cd ../
Deploy with Terraform
S3 Buckets
We need two buckets here, one to collect the sales CSV files and one to store the Lambda code. Also, we define a bucket notification to trigger the Lambda function during certain event, in our case when a new file is uploaded in our S3.
# /tf/s3.tf
resource "aws_s3_bucket" "builds" {
bucket = "${var.project_name}-lambda-builds"
}
resource "aws_s3_bucket" "bucket" {
bucket = local.bucket_name
}
resource "aws_s3_bucket_notification" "s3-lambda-trigger" {
bucket = aws_s3_bucket.bucket.id
lambda_function {
lambda_function_arn = module.lambda.lambda_function_arn
events = ["s3:ObjectCreated:*"]
}
}
IAM roles and policies
In this part, you will need to create the appropriate role to allow your Lambda function to use the S3 bucket.
# /tf/iam.tf
data "aws_iam_policy_document" "assume_role_policy_lambda" {
statement {
sid = ""
effect = "Allow"
principals {
identifiers = ["lambda.amazonaws.com"]
type = "Service"
}
actions = ["sts:AssumeRole"]
}
}
data "aws_iam_policy_document" "policy_s3" {
statement {
sid = "S3AccessPolicy"
effect = "Allow"
actions = [
"s3:*"
]
resources = [
"arn:aws:s3:::${local.bucket_name}",
"arn:aws:s3:::${local.bucket_name}/*"
]
}
statement {
sid = "DynamoDbAccessPolicy"
effect = "Allow"
actions = [
"dynamodb:PutItem",
"dynamodb:GetItem"
]
resources = [
aws_dynamodb_table.command_table.arn,
aws_dynamodb_table.item_table.arn
]
}
statement {
sid = "LambdaLoggingPolicy"
effect = "Allow"
actions = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
]
resources = [
"arn:aws:logs:*:*:*"
]
}
}
resource "aws_iam_role" "iam_for_lambda" {
name = "${var.project_name}-lambda-role"
assume_role_policy = data.aws_iam_policy_document.assume_role_policy_lambda.json
}
resource "aws_iam_role_policy" "lambda_policy" {
policy = data.aws_iam_policy_document.policy_s3.json
role = aws_iam_role.iam_for_lambda.id
}
Lambda function
Next, let’s define the Lambda function itself. Sending the lambda.zip as an S3 object and referencing it in the Lambda definition.
# /tf/lambda.tf
module "lambda" {
source = "terraform-aws-modules/lambda/aws"
version = "5.2.0"
function_name = "${var.project_name}-process-sales-data"
runtime = "python3.11"
handler = "src.file_processing_handler.lambda_handler"
lambda_role = aws_iam_role.iam_for_lambda.arn
create_role = false
environment_variables = {
"REGION" = var.region,
"ITEM_TABLE" = aws_dynamodb_table.item_table.name,
"COMMAND_TABLE" = aws_dynamodb_table.command_table.name
}
memory_size = 1028
timeout = 30
create_package = false
s3_existing_package = {
bucket = aws_s3_bucket.builds.id
key = aws_s3_object.lambda_package.id
}
}
resource "aws_s3_object" "lambda_package" {
bucket = aws_s3_bucket.builds.id
key = "${filemd5(local.lambda_package)}.zip"
source = local.lambda_package
}
resource "aws_lambda_permission" "s3_lambda_permission" {
statement_id = "AllowS3Invoke"
action = "lambda:InvokeFunction"
function_name = module.lambda.lambda_function_name
principal = "s3.amazonaws.com"
source_arn = "arn:aws:s3:::${aws_s3_bucket.bucket.id}"
}
DynamoDB tables
In our scenario, we will use two DynamoDB table. One command table and one item table.
# /tf/dynamodb.tf
resource "aws_dynamodb_table" "command_table" {
name = "${var.project_name}-command-table"
hash_key = "id"
billing_mode = "PAY_PER_REQUEST"
attribute {
name = "id"
type = "S"
}
deletion_protection_enabled = true
}
resource "aws_dynamodb_table" "item_table" {
name = "${var.project_name}-item-table"
hash_key = "id"
billing_mode = "PAY_PER_REQUEST"
attribute {
name = "id"
type = "S"
}
deletion_protection_enabled = true
}
Let’s test it !
First you will need to build our lambda sources.
$ python3 -m venv venv
$ source ./venv/bin/activate
$ sh lambda_build_script.sh
Now you should have a lambda.zip folder.
Make sure to apply the terraform configuration with terraform apply.
Now, we will take this CSV example referencing all the product that a client bought (example/sales_sample.csv
).
And now let the magic happen ! Just drop the CSV file into the S3 bucket and take a look into your DynamoDb tables to see your processed data.
Thanks for reading ! Hope this helped you to use or understand the power of AWS services. Don’t hesitate to give me your feedback or suggestions.
Top comments (1)
An excellent use case for serverless!