DEV Community

Cover image for How to build a Data Pipeline for the first time
shellyalmo
shellyalmo

Posted on • Edited on

19 7

How to build a Data Pipeline for the first time

I am fascinated by Machine Learning models and the incredible tools they offer our world, whether for making decisions, predicting trends, improving lives and even saving them. Those models are trained and tested on tremendous amounts of data that is constantly being collected and stored in databases.

I was really curious to learn how raw data gets into programs and what processes are being done to make this raw data useful, even before training a brilliant Machine Learning model.

One type of data that we use on a daily basis for predictions is weather observations. Obviously, weather forecasting is mostly based on complicated physics equations and statistical models. Still, I thought it would be fun to build my own weather database for practicing my Data Science skills.

Instead of connecting temperature sensors to my computer and opening my own meteorological station at home (which could be very cool for my next project), I decided to build a simple data pipeline to see how it's done.

My mission was to build a pipeline from scratch - starting with retrieving weather data from the OpenWeatherMap current weather web API, parsing the data using Pandas (python data analysis) library and storing it in a local SQLite database.

Here's how I did it:

Step 1: Get an API key

OpenWeatherMap offers a few paid API plans, some are monthly subscriptions with great powerful tools. I was happy to start with the free plan which offers access to current weather data from any city.

Step 2: Keep your API key private

You wouldn't want some stranger to have your ATM password, right? API keys can also be used to steal from your wallet. Especially if you are a paying customer for the service this API provides, and if you have limited API calls. Environment variables come in handy when dealing with this problem, since they are variables whose value is set outside the program (highly recommended tutorial here). By making my API key an environment variable, it is hidden outside of my program and can be used without being exposed on my public GitHub repository. I saved my API key into a new file and called it .env .This is what it looks like on the inside:

api-token = "typeyourapikeyhere"
Enter fullscreen mode Exit fullscreen mode

Now we can start coding our pipeline. Before coding I took some time to design my program and eventually decided to separate my program into files by responsibility.
My program design guidelines were:

  • Each function does one thing.
  • Each function is tested on its own, in order to make sure it actually does what it's supposed to do. Also, it saves a big headache when other bugs pile up and you have to figure out which function is the problematic one. Here I chose to use doctest for simple small tests.

For the next steps, make sure that:

  • Python 3 and pip are installed already.
  • If you are using the files on my GitHub repository, you can install the dependencies by running the following command:
pip install -r requirements.txt
Enter fullscreen mode Exit fullscreen mode
  • In the folder where you'll run my program, you need a folder called data_cache. This is where all the data will be saved before storing it in the database.

Step 3: Create a new empty Database

In a python file, I created a SQLite Database with the sqlite3 library:

import sqlite3
def create_DB_connection():
""" Connect to a database and create cursor object for executing SQL statements.
"""
connection = sqlite3.connect('CurrentWeatherDB.db', uri=True)
print("Connection with database established ..........")
connection.cursor()
return connection
def close_connection(connection):
""" Close connection with the database.
"""
connection.commit()
connection.close()
view raw create_db.py hosted with ❤ by GitHub

Step 4: Retrieve the data and save as a json file

At this point you will be able to get the data in json format and save it as a json file in your current folder. Each json file is named after the "dt" value which stands for datetime. Please notice that the datetime format is Unix Epoch Timestamp.

from dotenv import load_dotenv
import os
import requests
import json
def get_current_weather_data_from_OpenWeatherMap(id):
""" Returns current weather data by city id (Tel Aviv by default)
"""
load_dotenv()
json_data = requests.get(
"http://api.openweathermap.org/data/2.5/weather?id="+id+"&appid="+os.environ.get("api-token")).json()
print("Getting current weather data from OpenWeatherMap.org..........")
return json_data
def write_weather_data_in_json_file(json_data):
""" Save current weather data to a json file.
Name the file by the Unix Timestamp.
"""
name = 'data' + str(json_data['dt'])
filename = r"data_cache/%s.json" % name
with open(filename, 'w') as f:
json.dump(json_data, f)
return filename

Step 5: From json file to dictionary to Pandas Dataframe

This might seem a bit Sisyphean, but I preferred to break down the process into as many "baby steps" as possible. To me, it's organized, clear and helps keeping track of each step.

"""
>>> convert_json_to_dict("data1600689959.json")
{'coord': {'lon': 34.78, 'lat': 32.08}, 'weather': [{'id': 800, 'main': 'Clear', 'description': 'clear sky', 'icon': '01d'}], 'base': 'stations', 'main': {'temp': 303.84, 'feels_like': 303.4, 'temp_min': 302.04, 'temp_max': 305.37, 'pressure': 1010, 'humidity': 52}, 'visibility': 10000, 'wind': {'speed': 5.7, 'deg': 310}, 'clouds': {'all': 0}, 'dt': 1600689959, 'sys': {'type': 1, 'id': 6845, 'country': 'IL', 'sunrise': 1600658915, 'sunset': 1600702755}, 'timezone': 10800, 'id': 293397, 'name': 'Tel Aviv', 'cod': 200}
>>> convert_dict_to_df("data1600689959.json")
0 10000
Name: visibility, dtype: int64
>>> convert_dict_to_df("data1601987692.json")
"""
import pandas as pd
import json
from pandas.io.json import json_normalize
def convert_json_to_dict(filename):
""" Convert json file to python dictionary
"""
with open(filename, 'r') as JSON:
json_dict = json.load(JSON)
return json_dict
def convert_dict_to_df(filename):
""" Convert python dictionary to pandas dataframe
"""
return pd.json_normalize(convert_json_to_dict(filename))
if __name__ == "__main__":
import doctest
doctest.testmod()

Step 6: ETL with Pandas

The ETL procedure enables us to extract and transform the data according to our analysis needs, and then load it to our data warehouse. In order to make my data useful for future Data Science projects, I made sure my database will contain necessary parameters for daily temperature prediction (current temperature in Celsius, minimal and maximal temperatures in Celsius, humidity, pressure and wind). Also, I chose the datetime ("dt") column to be the row index as a primary key for my database.

"""
>>> convert_kelvin_to_celsius(df_test_convert_kelvin_to_celsius)
temp temp_min temp_max pressure humidity wind_speed wind_deg dt name radiation
0 71.85 16.85 69.85 12 56 110 180 1600234345 Tel Aviv high
1 15.85 24.85 59.85 13 67 90 189 1600456555 Tel Aviv low
2 59.85 25.85 60.85 14 64 98 178 1600555666 Tel Aviv medium
3 38.85 -55.15 71.85 15 63 99 177 1600567778 Tel Aviv average
4 35.85 14.85 -41.15 11 62 100 190 1600776889 Tel Aviv low
>>> keep_columns(df_test_keep_columns)
main.temp main.temp_min main.temp_max main.pressure main.humidity wind.speed wind.deg dt name
0 345 290 343 12 56 110 180 1600234345 Tel Aviv
1 289 298 333 13 67 90 189 1600456555 Tel Aviv
2 333 299 334 14 64 98 178 1600555666 Tel Aviv
3 312 218 345 15 63 99 177 1600567778 Tel Aviv
4 309 288 232 11 62 100 190 1600776889 Tel Aviv
"""
import pandas as pd
# Fake data for testing
data_test_keep_columns = {'main.temp': [345, 289, 333, 312, 309],
'main.temp_min': [290, 298, 299, 218, 288],
'main.temp_max': [343, 333, 334, 345, 232],
'main.pressure': [12,13,14,15,11],
'main.humidity': [56,67,64,63,62],
'wind.speed': [110,90,98,99,100],
'wind.deg': [180,189,178,177,190],
'dt': [1600234345,1600456555,1600555666,1600567778,1600776889],
'name': 'Tel Aviv',
'radiation':['high','low','medium','average','low']}
df_test_keep_columns = pd.DataFrame(data_test_keep_columns)
data_test_convert_kelvin_to_celsius = {'temp': [345, 289, 333, 312, 309],
'temp_min': [290, 298, 299, 218, 288],
'temp_max': [343, 333, 334, 345, 232],
'pressure': [12,13,14,15,11],
'humidity': [56,67,64,63,62],
'wind_speed': [110,90,98,99,100],
'wind_deg': [180,189,178,177,190],
'dt': [1600234345,1600456555,1600555666,1600567778,1600776889],
'name': 'Tel Aviv',
'radiation':['high','low','medium','average','low']}
df_test_convert_kelvin_to_celsius = pd.DataFrame(data_test_convert_kelvin_to_celsius)
def keep_columns(pandas_df):
""" Keep only the necessary columns of the dataframe
"""
return pandas_df[['main.temp','main.temp_min','main.temp_max','main.pressure','main.humidity','wind.speed','wind.deg','dt','name']]
def change_col_name(pandas_df_kept_columns):
""" Change columns names to match the SQL database.
"""
return pandas_df_kept_columns.rename(columns={'main.temp': 'temp', 'main.temp_min': 'temp_min', 'main.temp_max': 'temp_max', 'main.pressure': 'pressure', 'main.humidity': 'humidity', 'wind.speed': 'wind_speed', 'wind.deg': 'wind_deg'})
def convert_kelvin_to_celsius(pandas_df_kept_renamed_columns):
""" Convert the temp, temp_min, temp_max measurement unit from Kelvin to Celsius.
"""
columns = ["temp", "temp_min", "temp_max"]
for column in columns:
pandas_df_kept_renamed_columns[column] = pandas_df_kept_renamed_columns[column] - 273.15
return pandas_df_kept_renamed_columns
def set_datetime_col_as_row_index(pandas_df_kept_renamed_columns_in_celsius):
""" Set the datetime column to be the dataframe row index.
"""
return pandas_df_kept_renamed_columns_in_celsius.set_index('dt')
if __name__ == "__main__":
import doctest
doctest.testmod()
view raw transform.py hosted with ❤ by GitHub

Step 7: Update the Database

Now that we have the current weather data saved in a dataframe, we can easily load it to our database by using the Pandas library.

import sqlite3
import pandas as pd
def update_db(pandas_df, connection):
""" Upload pandas dataframe to sql database
"""
pandas_df.to_sql('Weather', connection, if_exists='append', index=True)
print("Uploaded to database!")
if __name__ == "__main__":
import doctest
doctest.testmod()
view raw update_db.py hosted with ❤ by GitHub

For flexibility, I used the argparse library, which lets you run main.py from the command line and give it a city id as an optional argument. So even though I defined Tel Aviv city by default, the user can still run the program for any city in the world. For example, if you would like to get the weather data of Detroit, US:

python3 src/main.py --city_id "4990729"
Enter fullscreen mode Exit fullscreen mode

When running main.py, steps 3-7 are executed:

from create_db import create_DB_connection, close_connection
from create_pandas_df_from_json import convert_dict_to_df
from update_db import update_db
from transform import keep_columns, change_col_name, convert_kelvin_to_celsius, set_datetime_col_as_row_index
from get_weather_data_from_OpenWeatherMap import get_current_weather_data_from_OpenWeatherMap, write_weather_data_in_json_file
import argparse
import time
parser = argparse.ArgumentParser()
parser.add_argument("--city_id", default="293397", help="Fetch weather data for a specified city. Find the id of the city on OpenWeatherMap.org")
parser.add_argument("--frequency", default=900, type=int, help="How often does the program run in seconds")
args = parser.parse_args()
if args.city_id:
print("city id inserted")
if args.city_id:
print("program frequency inserted")
def start():
returned_connection = create_DB_connection()
filename = write_weather_data_in_json_file(
get_current_weather_data_from_OpenWeatherMap(args.city_id))
new_df_to_sql = set_datetime_col_as_row_index(convert_kelvin_to_celsius(
change_col_name(keep_columns(convert_dict_to_df(filename)))))
update_db(new_df_to_sql, returned_connection)
close_connection(returned_connection)
while True:
start()
time.sleep(args.frequency)
view raw main.py hosted with ❤ by GitHub

Done!

And there you have it! A pipeline you can build on your own. Eventually, the program is meant to be run on a schedule to build a database over time for my next Data Science project. For now, Windows Scheduler is a great way to start, but I recommend checking out Python Scheduler as well. There are some wonderful tutorials out there, just waiting to be explored.

Edit 10/18/20 :

  • The program runs on a default schedule every 15 minutes by using a while loop, and you can set the frequency to any you would like. For example, in order to run it every 5 seconds, run the command:
python3 src/main.py --frequency 5
Enter fullscreen mode Exit fullscreen mode
  • All the Python files are currently stored in a local folder called src. In the next post I will explain how to create a docker image that only copies the Python files and dependencies that are necessary for this project.

API Trace View

How I Cut 22.3 Seconds Off an API Call with Sentry 👀

Struggling with slow API calls? Dan Mindru walks through how he used Sentry's new Trace View feature to shave off 22.3 seconds from an API call.

Get a practical walkthrough of how to identify bottlenecks, split tasks into multiple parallel tasks, identify slow AI model calls, and more.

Read more →

Top comments (10)

Collapse
 
waylonwalker profile image
Waylon Walker

This might seem a bit Sisyphean, but I preferred to break down the process into as many "baby steps" as possible. To me, it's organized, clear and helps keeping track of each step

Love this idea. I often make my pipeline nodes from a single line as well! Its more about each node doing a single operation that is understandable. I like that I can go to your start function and read through exactly what the pipeline does.

This is not too far off of how I would run projects before moving to kedro. Not that its much different with kedro, just a bit more is taken care of for me, which is really nice as these things grow in run time and number of nodes. Going all in on a 40hr pipeline only to see errors can be a nightmare to debug without being able to see what happened.

Collapse
 
rafa761 profile image
Rafael Ferreira

Excelent article, thanks

Collapse
 
shellyalmo profile image
shellyalmo

Thank you very much!

Collapse
 
hottabxp profile image
Sergey

Hello! Thank you. I will be glad to read your articles about python.

Collapse
 
shellyalmo profile image
shellyalmo

Thank you Sergey! I will keep writing more tutorials like this for sure. Hopefully I will include Docker and python scheduler next time.

Collapse
 
konstructa profile image
Konstructa

Really good for begging python users. Thank you

Collapse
 
shellyalmo profile image
shellyalmo

Glad to hear! Thank you

Collapse
 
ericfleming profile image
Eric Fleming

Great article! Those scheduler links at the bottom were also super helpful.

Collapse
 
shellyalmo profile image
shellyalmo

Thank you Eric! I'm glad it helped!

Collapse
 
skyviaservice profile image
Skyvia

Great example of building a data pipeline from scratch. Here are some useful tips that you can also consider - skyvia.com/blog/what-is-data-pipeline

Billboard image

The Next Generation Developer Platform

Coherence is the first Platform-as-a-Service you can control. Unlike "black-box" platforms that are opinionated about the infra you can deploy, Coherence is powered by CNC, the open-source IaC framework, which offers limitless customization.

Learn more

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay