What is a Data Pipeline?
Ever wondered how data is moved from a source to its destination or how messy data is transformed into clean and ready for analysis without doing it manually? Well, that is what a data pipeline is. It's a series of steps that automate the movement of data from various sources to a destination, ensuring that it is structured, clean, and ready for analysis.
Key Functions of a Data Pipeline
- Extract: The process of retrieving raw data from multiple sources (databases, APIs, files, etc.).
- Transform: Clean, filter, aggregate, or modify the data to make it useful.
- Load: Store the transformed data into a target system, the destination (data warehouse, database, or analytics tool).
Hands-on Python ETL Implementation
In this section, we will implement ETL Pipeline examples using Python:
Reading from CSV and Writing to PostgreSQL
- Generate sales data (I have used Mockaroo to generate dummy data).
-
Create a PostgreSQL database. I have used
aiven.io
to store my transformed data andDBeaver
to connect to the database. -
Implement the ETL process using Python. Make sure to install the necessary libraries (
pandas
andsqlalchemy
).
import pandas as pd
from sqlalchemy import create_engine
# Database connection
engine = create_engine("postgresql://username:password@localhost:5432/etl_db")
# Extract
df = pd.read_csv("sales.csv")
# Transform the data
df = df.rename(columns={'id': 'sales_id'}) # Rename columns
df = df.dropna() # Drop null values
# Load
df.to_sql("sales", engine, if_exists="append", index=False)
print("Data successfully loaded into PostgreSQL!")
Reading from API and Writing to Database
We are going to use this endpoint to fetch JSON data (Staff data of a fictitious company): Sample JSON Data.
Python Code to Fetch and Load API Data:
import requests
import json
import pandas as pd
from sqlalchemy import create_engine
# Step 1: Fetch data from API
url = "https://raw.githubusercontent.com/LuxDevHQ/LuxDevHQDataEngineeringGuide/refs/heads/main/samplejson.json"
response = requests.get(url)
data = response.json()
# Step 2: Extract - Convert JSON data to DataFrame
df = pd.DataFrame(data)
# Step 3: Transform - Clean and reshape the data
df = df[['name', 'position', 'country']] # Select relevant columns
df.columns = ['full_name', 'role', 'country'] # Rename columns for clarity
# Step 4: Load - Insert the data into PostgreSQL
engine = create_engine("postgresql://username:password@localhost:5432/etl_db")
df.to_sql("api_staff", engine, if_exists="append", index=False)
print("API Data Loaded Successfully!")
Congratulations! 🎉
If you've followed each step correctly, you've successfully implemented an ETL data pipeline using Python!
This hands-on example demonstrates how to automate the process of moving data from CSV files and APIs into a database, streamlining your data processing workflows and making them more efficient and scalable.
Top comments (0)