TLDR
Introduction
A few weeks ago, I came across A Cloud Guru's #CloudGuruChallenge. The challenge was to create a simple event-driven Extract-Transform-Load (ETL) pipeline using some publicly available US COVID-19 statistics. As the title of the challenge suggests, it is meant to exercise and demonstrate knowledge and usage of AWS services.
I have recently shifted roles from being a software engineer to being a data engineer so getting to work on ETL pipelines is a good learning exercise for me. Thus, I decided to take on this challenge and see what I can come up with. As an added bonus, the challenge will allow me to experiment with new ideas that I wanted to try out but can't easily do so in my own projects.
The Challenges
I'll summarize them as follows:
- Schedule a daily ETL job that will extract data from two different sources, clean/filter the data, merge the two sources, and store the merged data.
- Subsequent runs should only add/update rows that were added/updated since the last ETL run.
- At each ETL run, notify interested subscribers via email.
- The ETL process should be resilient enough to handle off-by-one issues (when one dataset has an extra row not present in the other one).
- Use Infrastructure-as-Code as much as possible.
- Hook up the ETL results into a dashboard or a reporting tool.
- Use a Continuous Integration/Continuous Deployment (CI/CD) pipeline for updating the infrastructure and application when changes are made to the source code.
Additional challenges I wanted for myself are the following:
- Make a custom frontend to visualize the transformed data.
- Use as very little hard-coded variables in the infrastructure so that multiple developers can work on separate stacks at the same time without sharing/clashing of resources.
I used Serverless Framework to manage the backend. It provides a higher-level of abstraction compared to SAM or CloudFormation. When I need to define custom resources that serverless
does not manage, I can still do so by using CloudFormation in my serverless
configuration.
resources:
Resources:
ReactAppBackendUrlParameter:
Type: AWS::SSM::Parameter
Properties:
Name: REACT_APP_BACKEND_URL
Type: String
Value: !Join [ '', [ 'https://', !Ref HttpApi, '.execute-api.', !Ref 'AWS::Region', '.', !Ref 'AWS::URLSuffix' ] ]
OnRefreshDataFromSourcesNotification:
Type: AWS::SNS::Topic
Properties:
DisplayName: US COVID Stats
Subscription: ${self:custom.subscription.${self:custom.subscriptionEnabled}}
DataTable: ${self:custom.cfResources.DataTable}
DataBucket: ${self:custom.cfResources.DataBucket}
FrontendBucket: ${self:custom.cfResources.FrontendBucket}
FrontendBucketPolicy: ${self:custom.cfResources.FrontendBucketPolicy}
CloudFrontOriginAccessIdentity: ${self:custom.cfResources.CloudFrontOriginAccessIdentity}
FrontendDistribution: ${self:custom.cfResources.FrontendDistribution}
FrontendBucketParameter: ${self:custom.cfResources.FrontendBucketParameter}
FrontendDistributionParameter: ${self:custom.cfResources.FrontendDistributionParameter}
Outputs:
DataBucketName: ${self:custom.cfOutputs.DataBucketName}
FrontendBucketName: ${self:custom.cfOutputs.FrontBucketName}
FrontendDistributionId: ${self:custom.cfOutputs.FrontendDistribution}
FrontendUrl: ${self:custom.cfOutputs.FrontendUrl}
Most of the custom resources above are defined in a separate cloudformation.yml
. This allows me to use CloudFormation tools like cfn-lint
on that file. The only ones defined in serverless.yml
are the ones that depend on serverless
-managed resources.
I am also using a serverless
plugin that handles the packaging of python dependencies.
My serverless
service consists of the following:
- REST API Gateway
- Lambda function that responds to the API Gateway requests.
- Lambda function that is triggered daily to do the ETL process.
- Lambda function that gets triggered after the ETL process (via Lambda Destinations)
- Multiple custom CloudFormation resources such as DynamoDB table, S3 buckets, CloudFront distribution, SNS topic, etc.
Scheduled ETL
The ETL requirement for this challenge can actually be achieved by simply using python's csv
module. However, most ETL tasks in the real-world are not as simple as this challenge. Most tasks would probably involve more sophisticated transforms than just simple merging. This is where pandas
(a data analysis library for Python) can come in handy.
With pandas
, data cleanup and transformation were a breeze. Putting a big dependency (in Lambda's environment) for a requirement so small makes the power of pandas
wasted so I made sure to use more of pandas' features in my REST API.
For example, I calculated the changes in each day and aggregated them by week and by month.
As the data querying requirements are simple, I chose DynamoDB to be the primary datastore. It's fast, easy to use, very cheap, and very scaleable.
Incremental Loading
A full load only happens at the first ETL run. Succeeding runs do not need to recreate all rows that did not change. To achieve this, I keep a CSV snapshot of the same transformed data. After the transformation process, I compare the newly-transformed dataset with the previous dataset and get the changed rows.
Only the changed rows will then be saved in DynamoDB in a batch write operation. This process makes sure that it only consumes enough DynamoDB WCUs as necessary. Also, then the datasets get updated retroactively, it also gets to store the changes in the previous rows instead of only the latest row.
Email Notifications
Email notifications are implemented via SNS. The ETL Lambda (RefreshDataFromSources
) is configured to use the OnRefreshDataFromSources
as an asynchronous destination for both success and failure states. The OnRefreshDataFromSources
in turn will publish a corresponding message to an SNS topic which then send a message to subscribed emails.
functions:
RefreshDataFromSources:
handler: us_covid_stats/etl/handler.refresh_data_from_sources
events:
- schedule: rate(1 day)
destinations:
onSuccess: OnRefreshDataFromSources
onFailure: OnRefreshDataFromSources
OnRefreshDataFromSources:
handler: us_covid_stats/etl/handler.on_refresh_data_from_sources
ETL Resiliency
Since we are dealing with two different datasources that we need to merge using the date as joining condition, it may happen that one datasource is more updated than the other. Our transformation has to account for that and ignore rows that are not present in both datasets.
It's literally just a one-liner with pandas.join()
. The rest are just trivial cleanup.
def merge_cases_with_recoveries(cases: DataFrame, recoveries: DataFrame) -> DataFrame:
return (
cases.join(recoveries, how="inner", on="date")
.fillna(0)
.rename(columns={"Recovered": "recoveries"})
.astype({"recoveries": "int"})
)
In the code, there are unit tests to check the functionality against different scenarios.
Infrastructure-as-Code
I am a fan of Infrastructure-as-Code. If done well, it saves so much time and spinning up and tearing down environments.
In this project, I am using Serverless Framework and CloudFormation to manage the infrastructure. Everything that needs to be provisioned is just two deploy commands away.
- Backend
- When the backend gets deployed (through
serverless deploy
), it creates all the necessary backend services it needs plus the infrastructure that the frontend will eventually need (i.e. S3 bucket, CloudFront distribution). The autogenerated REST API endpoints, the S3 bucket name, and the CloudFront distribution ID gets stored in SSM Parameter store. This makes it easy for thefrontend
deployment process to retrieve them.
- When the backend gets deployed (through
- Frontend
- During the build process, it uses SSM to retrieve the autogenerated API endpoints of the backend.
- During the deployment process, it uses SSM to get the S3 bucket name to use in storing the built artifacts and to get the CloudFront distribution ID for invalidating the CDN cache.
Dashboard/Report App
I have made a simple React frontend to demonstrate the transformed data. It is available in https://d21xiw2qs8azw2.cloudfront.net/ (It's hosted on an S3 bucket and a CloudFront distribution).
For the REST API, I used pandas
in order to provide daily, weekly, and monthly aggregate counts. I made 4 GET
endpoints -- /data
, /daily
, /weekly
, /monthly
. They are all handled by the same Lambda function for simplicity.
Continuous Integration
I used GitHub actions to run multiple checks that run on pull requests and on the main branch. There are checks for frontend, backend, and for cloudformation.yml
files. I've also added GitHub's Code Scanning service and SonarCloud's Code Analysis.
For the frontend, we simply use create-react-app
's build process because it does lint, typechecking, and build all-in-one command.
For the backend, we check for linting errors using flake8
, check for type errors using mypy
, run unit tests using pytest
and then build the deployment package using serverless
.
With this workflow, I can write features and push it as a PR. All the actions will trigger and perform checks on my code. If all passes, I can then merge it to the main branch which will then trigger deployment.
Continuous Deployment
For deployment, I use CodeBuild. The CodeBuild project configurations for both frontend and backend are defined in a separate cloudformation.yml
.
With this deployment pipeline in place, I can simply merge PRs to main branch or just commit trivial changes directly to main and they will deploy automatically.
Sometimes, changes in the ETL logic may require re-triggering the ETL process afterwards so it is useful to make the ETL process listen to the CodeBuild event via CloudWatch events.
functions:
RefreshDataFromSources:
handler: us_covid_stats/etl/handler.refresh_data_from_sources
events:
- schedule: rate(1 day)
- cloudwatchEvent:
event:
source:
- aws.codebuild
detail-type:
- CodeBuild Build State Change
detail:
build-status:
- SUCCEEDED
Things I wanted to add but didn't have time to.
It's a shame I have run out of time. There are still a couple of things I wanted to try (I may still try them in the future).
- Provide a SageMaker notebook instance with some initial code that loads our CSV snapshot.
- Provide a few Athena named queries that uses our CSV snapshot.
- Implement a post-deployment test using CloudWatch Synthetics to test our app if it works as intended. This test can also be triggered after each ETL run.
- Provision all of the above using CloudFormation, of course.
- Switch from CloudFormation to CDK.
- Using realtime updates for data that refreshes only once a day is not so exciting. So, I wanted to implement a 1990s-style realtime views counter. Or, maybe a guestbook. :-)
Conclusion
I found this challenge to be a fun one. I learned a lot especially with things that I don't use day-to-day. I look forward to the next one.
Thank you A Cloud Guru for organizing this.
Top comments (0)