DEV Community

Manny Rivero
Manny Rivero

Posted on • Updated on

Cloud ETL Pipeline in Azure

A close friend in the tech world recommended to me to take on the Cloud Resume Challenge by Forrest Brazeal. You can find the challenge here. Forrest created this challenge to help people make career transitions into the world of cloud computing. He lays a great foundation for getting started and has many success stories on his website. I decided to take on the challenge. About a little over halfway through, I started to realize that I was becoming more interested in the data engineering side of the cloud. In light of this, I started doing some digging. Turns out Forrest created another challenge during his days at A Cloud Guru. This challenge was definitely more geared towards data engineering/data pipelines. I decided to take a break from the Cloud Resume Challenge and take this one on full time.

The challenge can be found here. In summary, the purpose of the challenge is to harvest data from a couple of different data sources on Covid-19 tracking, one from John Hopkins and the other from the New York Times. You then essentially set up a data pipeline by creating an event driven Python function that ingests, transforms, and then loads the processed data into a database of choice. Notifications for pipeline runs need to be set up, failed runs and successful runs. Some unit testing. Then it all flows down into a reporting and analytics tool, such as Tableau. Oh, and you need to eventually have all your cloud resources configured with IaC (Infrastructure as Code). No pointing and clicking (sad news for all the 90's born kids out there. We became experts at that once the digital age got ushered in. Someone is always making it harder for us now).

The original challenge posed that all of the work be done in AWS. However, Microsoft Azure is where I've really honed my skills and gotten certified. So I decided I would do an Azure version of this challenge. Below, I will try to concisely discuss what I did for each step along with code snippets and visuals.

Visual Diagrams

Pipeline

Image description

GitHub Actions

Image description

ETL Job

I set up the ETL Job as a pipeline in Azure Data Factory. Azure Data Factory is the main ETL service used in Azure. The pipeline was set up on a schedule to run daily around 9 a.m. Within the pipeline I set up an Azure Function activity linked to the serverless Python function I had set up within Azure Functions (more on that below).

Extract, Transform, Load

The challenge calls for the extraction/fetching of Covid Data from the links provided. Then, the data needs to be transformed according to the requirements outlined in the challenge, followed by a load into a database of choice. The challenge requires you use a serverless Python function. My function is hosted on Azure Functions utilizing a serverless consumption plan with a Linux OS. Currently, Linux is the only OS supported on Azure for serverless Python functions. As well, the database I chose was an Azure SQL Database. This is essentially the Azure PaaS version of MS SQL Server.

For these ETL steps, I used Pandas as the main workhorse along with pyodbc for the database load. Abstraction is needed for this part. These steps can't know about each other, in other words they need to be in separate files/modules and then imported. I decided to create a separate Python module for the extraction, the transformation, and the loading step. The code of each file I turned into a function/utility. Then in each file I imported the appropriate function. You can see this below. Finally, I wrote a very simple main function. This function was simply a clean way to execute all the other functions in the execution chain.

The main data structure used along the way was a Pandas dataframe. This made it very simple and efficient to transform different columns and manipulate the data. Several unnecessary columns were dropped, date fields converted to date objects and not strings, and finally filtered to only show U.S. data. The two datasets that were extracted (NYT Covid Data and John Hopkins Covid Data) were merged to form one final dataset. John Hopkins had relevant data on Covid recoveries that NYT dataset did not.

Each function/utility module is listed here below:

Fetch_Covid_Data

import pandas as pd


def fetch_nyt_covid_data(src_path: str = "https://raw.githubusercontent.com/nytimes/covid-19-data/master/us.csv"):
    covid_data = pd.read_csv(src_path)

    return covid_data


def fetch_covid_recovered_data(src_path: str = 'https://raw.githubusercontent.com/datasets/covid-19/master/data/time-series-19-covid-combined.csv') -> pd.DataFrame:
    covid_recovered = pd.read_csv(src_path)

    return covid_recovered
Enter fullscreen mode Exit fullscreen mode

Transform_Covid_Data

from Fetch_Covid_Data import fetch_nyt_covid_data
from Fetch_Covid_Data import fetch_covid_recovered_data
import pandas as pd


def transform() -> pd.DataFrame:
    #Fetch Core Data
    covid_data = fetch_nyt_covid_data()
    covid_recovered = fetch_covid_recovered_data()

    #Convert date columns to a datetime object from string
    covid_data['date'] = covid_data['date'].apply(pd.to_datetime)

    covid_recovered['Date'] = covid_recovered['Date'].apply(pd.to_datetime)

    #Extra date row dropped on merge using inner join
    covid_data = pd.merge(covid_data, covid_recovered, how='inner', left_on='date', right_on='Date')

    #Drop unneccessary columns
    covid_data = covid_data.drop(['Date','Province/State','Confirmed','Deaths'], axis=1)

    #Filter out non-US data
    covid_data = covid_data[covid_data['Country/Region'] == 'US']

    covid_data.rename(columns= {"Country/Region": "Country"}, inplace=True)

    covid_data = covid_data.drop_duplicates()

    covid_data = covid_data.reset_index(drop=True)

    return covid_data

Enter fullscreen mode Exit fullscreen mode

Load_Covid_Data

from Transform_Covid_Data import transform
import pandas as pd
import pyodbc 
import os


def load_covid_data():
    # Connect to the Azure SQL Database with the use of keys to 
 prevent exposing credentials
    AZ_TF_SQL_UN = os.environ['AZ_TF_SQL_UN']
    AZ_TF_SQL_PW = os.environ['AZ_TF_SQL_PW']
    username = AZ_TF_SQL_UN
    password = AZ_TF_SQL_PW
    database= '**********'
    server='******************'
    driver= '{***********}'
    conn = pyodbc.connect('DRIVER='+driver+';SERVER='+server+';PORT=****;DATABASE='+database+';UID='+username+';PWD='+ password)


    # Define table name and schema
    table_name = 'Covid_US'
    schema_name = 'dbo'

    # Define the SQL statement to create the table
    create_table_query = f"""
    IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES 
        WHERE TABLE_SCHEMA = '{schema_name}' AND TABLE_NAME = '{table_name}')
        BEGIN
        CREATE TABLE {schema_name}.{table_name} ( 
    "Date" Date PRIMARY KEY, Cases INT, Deaths INT, Country VARCHAR(5), Recovered DECIMAL);
        END
        """

    # Execute the SQL statement to create the table if it doesn't exist
    cursor = conn.cursor()
    cursor.execute(create_table_query)
    conn.commit()

    # Loop over each row and insert it into the table
    cursor = conn.cursor()
    covid_data = transform()

    for index, row in covid_data.iterrows():
        cursor.execute(f"""SELECT * FROM {table_name} WHERE Date='{row['date']}' AND Cases='{row['cases']}' AND Deaths='{row['deaths']}' AND Country='{row['Country']}' AND Recovered='{row['Recovered']}'""")
        result = cursor.fetchone()
        if not result:
            # If the row doesn't exist, insert it into the table
            cursor.execute(f"INSERT INTO {table_name} (Date, Cases, Deaths, Country, Recovered) VALUES ('{row['date']}', '{row['cases']}', '{row['deaths']}', '{row['Country']}', '{row['Recovered']}')")
            conn.commit()

    conn.commit()

    conn.close

    return None
Enter fullscreen mode Exit fullscreen mode

Main Function hosted on Azure Functions

from Load_Covid_Data import load_covid_data
import pandas as pd
import pyodbc 
import azure.functions as func  


def main(req: func.HttpRequest) -> func.HttpResponse:

    azure_sql_load = load_covid_data()


    return func.HttpResponse("ETL Job Successful")


if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Notification and Error Handling

The pipeline run needs to send out notifications based on success or failure. I determined the simplest way of doing this in Azure was to go into the Azure Data Factory resource I had created, select my pipeline, and then set up an Azure Monitor Alert. One alert will be if the pipeline run succeeds, and of course the other will be if it fails. I set up the Azure Monitor Alert to send an email to my personal email with a simple message letting me know what the outcome of the pipeline run was.

Additionally, in the challenge it is noted that "you should be able to load the entire historical data set into your database the first time the job is run, and then update with only the most recent day’s data thereafter." In the load function module below, I accomplished this by looping through the final pandas dataframe of Covid Data, checking whether each row already exists in the database, and then inserting it if it doesn't.

Source Control, Iac, and CI/CD

I used Terraform with the Azure RM provider to configure my Azure resources as Infrastructure as Code. I then stored my Terraform files and all of my function modules on GitHub. To go a step further in the challenge, I decided to implement a CI/CD stage into this project. Having stored all my project's files on GitHub, I have setup GitHub Actions to deploy any of the files in my repository if I have pushed a change to one of them.

Dashboard

For the reporting dashboard, I was going to stay within the Microsoft Ecosystem and use Power BI. However, I wanted to use a tool that is considered more universal and not specific to one single provider (Microsoft, AWS, Google, etc.). It seems that Tableau has gained traction for being that tool that many organizations use regardless of their main cloud workhouse or in house software infrastructure. I have posted a picture of the dashboard below.

Image description

Personal Reflection and Conclusion

Throughout this challenge, I encountered my fair share of frustrations. Sometimes it's a simple and obvious fix that you keep glossing over, or it may be something that requires more in depth analysis. You can find yourself staring at a traceback error for what seems like an eternity, have 30 Google tabs open, and still be more lost then when you started. It compiles when you have redeployed your code for what seems like 100 times only to still have it wrong. Sometimes, when attempting a project that may be used to showcase your skills or improve your resume, you can get in your head with perfectionist thinking and frustrated reactions when code doesn't run correctly the first time around. There is a slight internal pressure to get it done quickly and without error. However, partway through this project, I began to realize something. This is the life of a data engineer/tech professional/software developer/programmer/fill in the blank. It is one constant journey of trial and error in order to find success. I began to think of the project as a reflection of the daily work of those roles. I love problem solving and troubleshooting, but as I mentioned above, sometimes amidst all the noise of a large project, you get fed up and just want it done, instead of truly learning. So I went back to my roots. How I felt when I first learned to code using Python, learned big data concepts, cloud principles, and more. When I first started out, I was hungry to learn and enjoyed finding out more every day. Each error I encountered was an opportunity to grow my skills and increase my knowledge. I began shifting my mindset to this approach during the challenge. Not only did it make it more enjoyable, but it cleared out so much of the mental clutter.

The second takeaway from this challenge was realizing the importance of organization and cleanliness in your code. This ranges from how you group your modules, to functions within those modules, and the organization of your overall project. Taking the time to do this well will help you keep your sanity and aid with collaboration. This is why resource templates within cloud environments (ARM templates in Azure for example) are so important and why Terraform has become so popular. Pointing and clicking your way to victory might be tempting, but trying to store details about every cloud resource in a word document is maddening. IaC, as well as reusable modules in Python, is a must.

I want to thank Forrest Brazeal for creating this challenge and providing a great way to showcase some solid data engineering skills. Connect with me on LinkedIn and feel free to send any questions my way.

Cheers

Top comments (0)