DEV Community

Cover image for Event-Driven Python on AWS
William Raffaelle
William Raffaelle

Posted on

Event-Driven Python on AWS

Tonight I finished my first data engineering project on AWS. This project was based off of a challenge on the A Cloud Guru website. The challenge involves automating an ETL processing pipeline for COVID-19 data using Python and cloud services. This was my introduction to data engineering and ETL processing on AWS. I got to use my Python skills and learn about different cloud services in completing this project.

ETL

To begin, I created a Python compute job. The job runs once a day thanks to a CloudWatch rule. This means that the function will process COVID-19 data that is updated daily.

DATA

Two datasets were used in this project:

  1. New York Times repository updated daily
  2. Johns Hopkins dataset

The Python function downloads each dataset using the Pandas library. Additionally, a merge is done on both datasets to ensure that days that do not exist in both datasets are removed.

url_data = (r'https://raw.githubusercontent.com/nytimes/covid-19-data/master/us.csv')
data_csv = pd.read_csv(url_data)
url_data2 = (r'https://raw.githubusercontent.com/datasets/covid-19/master/data/time-series-19-covid-combined.csv')
data_csv2 = pd.read_csv(url_data2)
data_csv = data_csv.merge(data_csv2, how='inner', on='date')

DATA CLEANING

Next, the data was transformed. To do so, any non-US data was removed from the dataset.

LOAD

The data was then loaded into a DynamoDB table using the Boto3 put_item method.

DynamoDB

NOTIFY

An SNS topic was created to notify individuals that the database had been loaded. This message includes the number of rows updated in the database.

ETL NOTIFICATION

ERROR HANDLING

Some error handling was implemented to speed up data processing and ensure that the compute job responds to malformed data properly.
To speed up ETL processing, the function determines whether it is to perform an initial load or an update. To do so, the function checks whether the row is already in the database or not. If the row is already in the database, the processing stops. This way only the current day's numbers are updated and not the entire database.

r = table.get_item(Key={'date' : row.date})
if r.get('Item') == None:
batch.put_item(json.loads(row.to_json(), parse_float=Decimal))
new += 1

Second, the function checks for malformed data. If the date format is incorrect, or the case or death numbers are not inputted as numbers, the processing stops and a notification is sent stating that the data is malformed.

SNS Notification

Using Systems Manager Parameter Store the function is able to detect if processing was cancelled last time it ran. If this is the case and the data is still malformed, the function will skip the malformed rows and continue with processing.

INFRASTRUCTURE

A CloudFormation template was then written to define the project's resources. This way the project can be deployed in any AWS environment. The resources include the following: Lambda function, CloudWatch rule, SNS trigger, DynamoDB table.

DASHBOARD

To build a dashboard AWS Quicksight was used. The dashboard includes the following: sum of cases by date, sum of deaths by date, and sum of recovered by date.

Cases

Deaths

Recovered

ADDITIONAL

A second Lambda function was written to download the updated DynamoDB table to a .csv file that is uploaded to S3. This way, QuickSight can use the latest data to build a dashboard. This function is triggered by a CloudWatch rule. It runs daily immediately after the first function runs and after the database has been updated.

The code for this project can be found on [GitHub].(https://github.com/wraffaelle98/Event-Driven-Python-on-AWS)

Top comments (0)