DEV Community

Bansikumar Mendapara
Bansikumar Mendapara

Posted on • Edited on

Event-Driven Python on AWS

Around 15 days ago, A Cloud Guru launched #CloudGuruChallenge - Event-Driven Python on AWS. It might sound like a simple task, but I faced a few challenges and learned so many things. This blog will completely guide through this challenge.

Overview of Challenge

Forrest Brazeal is the creator of this challenge. The main goal of this challenge is to automate an ETL processing pipeline for COVID-19 data using Python and Cloud services. More about this challenge, you can find here.

Challenge Steps

Now, I would like to go through all the steps of this challenge that I followed.

  1. Transformation
  2. Load transformed data into the database
  3. Notify customers when the database gets any updates
  4. Error handling
  5. Trigger function once a daily
  6. Infrastructure as Code
  7. CI/CD pipeline and Source control
  8. Quicksight dashboard

You can find a full working code in my GitHub repository.

Step 1 - Transformation

One of the requirements of this challenge was to create a separate python module for the transformation task. Therefore, I have created a separate python code for the transformation of data.

Firstly, I filtered both data according to the country name because here we want only COVID-19 data of US only. Next, I dropped unnecessary columns if present and converted date in python DateTime data type. Lastly, I wanted to join both data according to report date and to make that happen, I made some changes in data frame indexes. This function is completely abstracted as it does not care where data is stored and don't know about database also.

def transform(dfNYT,dfJH):
    dfJH = dfJH[dfJH['Country/Region']=='US'].drop(columns='Country/Region')
    dfJH.columns = ['date','recovered']
    dfNYT['date'] = pd.to_datetime(dfNYT['date'],format='%Y-%m-%d')
    dfJH['date'] = pd.to_datetime(dfJH['date'],format='%Y-%m-%d')
    dfNYT.set_index('date', inplace=True)
    dfJH.set_index('date',inplace=True)
    dfJH['recovered'] = dfJH['recovered'].astype('int64')
    dfFinal = dfNYT.join(dfJH, how='inner')
    dfFinal.reset_index(inplace=True)
    return dfFinal
Enter fullscreen mode Exit fullscreen mode

Step 2 - Load transformed data into the database

To fulfil this task, I used the Lambda function and the RDS PostgreSQL database. PostgreSQL has a database adapter for python called psycopg2. It allows us to connect to the PostgreSQL database and perform various SQL queries.

I have used Lambda environment variables to store necessary values like URL of both CSV files, database information(endpoint, port, username, password, region) etc. To make code more structured, I have divided this task into small subtasks and created a function for them.

  • Database connection - connect to database
  • First-time data insertion - insert all data at once
  • Daily data insertion - only insert new data available

I want to minimize manual interaction with AWS Management Console. Therefore, firstly I checked if there is any table called etl available in the database. If it is not available, then the lambda function will create the table first and do first-time data insertion. If etl table is already available then it will check for daily data insertion.

For optimal data insertion, I tried several methods like executemany(), mogrify() and many others. At the end, I went with the below approach.

for i in dfFinal.index:
    row = (dfFinal.loc[i,'date'], int(dfFinal.loc[i,'cases']),int(dfFinal.loc[i,'deaths']),int(dfFinal.loc[i,'recovered']))
    data.append(row)
records = ','.join(['%s'] * len(data))
query = "insert into etl (reportdate,cases,deaths,recovered) values{}".format(records)
Enter fullscreen mode Exit fullscreen mode

Step 3 - Notify customers when the database gets any updates

To do this, I have used SNS service of AWS. I created a function in lambda to publish the message to SNS whenever there is any update in the database.

def notify(text):
    try:
        sns = boto3.client('sns')
        sns.publish(TopicArn = snsARN, Message = text)
    except Exception as e:
        print("Not able to send SMS due to {}".format(e))
        exit(1)
Enter fullscreen mode Exit fullscreen mode

Step 4 - Error handling

For a seamless workflow, error handling is important. My code is able to manage various errors. Like,

  • If there is any unexpected or malfunction input in data, then transformation function will raise an exception and will notify.
  • If if there is any issue while connecting to the database, then database_connection() function will notify with reason.
  • If there is any problem while table creation or data insertion, then a notification will be sent with the raised exception.

If we are doing daily insertion then there is no need to insert whole data again. It is better to insert only new data. For that, I have checked the maximum(last) report date in the table and insert subsequent data. It will also notify customers with the number of updated rows.

Step 5 - Trigger function once a daily

As these CSV files get updated once a day. So there is no need to check for updates in data continuously. That's why I configure event rule which invokes lambda function once a day. You can find a tutorial for the same here.

Step 6 - Infrastructure as Code

It was one of the most challenging parts for me as I didn't play around CloudFormation much. I went from creating one by one element in YAML. In the end, I didn't find that much tough task as AWS has very well-oriented and well-described documentation for each type of AWS service. Even I end up creating RDS instance by CloudFormation and that's why in lambda function I checked first if the table exists or not. Overall, this part gave me more idea about IaC. You can check my CloudFormation template here.

Alt Text

Step 7 - CI/CD pipeline and Source control

Even though this part was optional, I wanted to do it because I do not want to do a manual update in AWS whenever I change my python code or infrastructure. For source control, I used GitHub as it is very handy. To configure CI/CD pipeline, I went for GitHub actions. Now, whenever I push a change to my GitHub repository, GitHub actions will run the pipeline and perform the necessary update.

Step 8 - Quicksight dashboard

The final task was to generate virtualization of US case counts, recoveries, death ratio, daily increase in cases etc.

QuickSight Dashboard image
QuickSight Dashboard image
QuickSight Dashboard image
QuickSight Dashboard image

Major Challenges and Learning

Throughout this project, I faced so many challenges but even learned from them.

  • To do a transformation and database connectivity, I used pandas and psycopg2 module of python. This module is not readily available in lambda. I have to add as layers in lambda. In starting, I faced an issue with that, but at the end, I found a curated list of awesome AWS Lambda Layers here which made my task easy.
  • As I wanted to create a table by code only, I was not able to make a way for this in lambda. But after trying a few things, I got this.
  • I do not have much experience with CloudFormation. So, I learned first by going through various AWS documentation and was able to make it at the end.
  • CI/CD pipeline is new for me, but I found it really interesting. I configured pipeline with GitHub actions still I want to explore AWS CodePipeline for this.

Conclusion

Overall, I enjoyed this challenge and learned so many things. In this journey, so many people helped me. I would like to thank Forrest Brazeal for creating this amazing challenge. I really appreciate any feedback for this challenge or blog. Thank you.

GitHub Repository

Top comments (2)

Collapse
 
rajanpanchal profile image
Rajan Panchal

Great work! Very nice.

Collapse
 
bansimendapara profile image
Bansikumar Mendapara

Thank you so much.