DEV Community

Batrudin Jamaludin
Batrudin Jamaludin

Posted on

Building Scalable Data Pipelines with Python – A Complete Guide

Image description

What is a Data Pipeline?

Ever wondered how data is moved from a source to its destination or how messy data is transformed into clean and ready for analysis without doing it manually? Well, that is what a data pipeline is. It's a series of steps that automate the movement of data from various sources to a destination, ensuring that it is structured, clean, and ready for analysis.

Key Functions of a Data Pipeline

  • Extract: The process of retrieving raw data from multiple sources (databases, APIs, files, etc.).
  • Transform: Clean, filter, aggregate, or modify the data to make it useful.
  • Load: Store the transformed data into a target system, the destination (data warehouse, database, or analytics tool).

Hands-on Python ETL Implementation

In this section, we will implement ETL Pipeline examples using Python:

Reading from CSV and Writing to PostgreSQL

  1. Generate sales data (I have used Mockaroo to generate dummy data).
  2. Create a PostgreSQL database. I have used aiven.io to store my transformed data and DBeaver to connect to the database.
  3. Implement the ETL process using Python. Make sure to install the necessary libraries (pandas and sqlalchemy).
import pandas as pd
from sqlalchemy import create_engine

# Database connection
engine = create_engine("postgresql://username:password@localhost:5432/etl_db")

# Extract
df = pd.read_csv("sales.csv")

# Transform the data
df = df.rename(columns={'id': 'sales_id'})  # Rename columns
df = df.dropna()  # Drop null values

# Load
df.to_sql("sales", engine, if_exists="append", index=False)

print("Data successfully loaded into PostgreSQL!")
Enter fullscreen mode Exit fullscreen mode

Reading from API and Writing to Database

We are going to use this endpoint to fetch JSON data (Staff data of a fictitious company): Sample JSON Data.

Python Code to Fetch and Load API Data:

import requests
import json
import pandas as pd
from sqlalchemy import create_engine

# Step 1: Fetch data from API
url = "https://raw.githubusercontent.com/LuxDevHQ/LuxDevHQDataEngineeringGuide/refs/heads/main/samplejson.json"
response = requests.get(url)
data = response.json()

# Step 2: Extract - Convert JSON data to DataFrame
df = pd.DataFrame(data)

# Step 3: Transform - Clean and reshape the data
df = df[['name', 'position', 'country']]  # Select relevant columns
df.columns = ['full_name', 'role', 'country']  # Rename columns for clarity

# Step 4: Load - Insert the data into PostgreSQL
engine = create_engine("postgresql://username:password@localhost:5432/etl_db")
df.to_sql("api_staff", engine, if_exists="append", index=False)

print("API Data Loaded Successfully!")
Enter fullscreen mode Exit fullscreen mode

Congratulations! 🎉

If you've followed each step correctly, you've successfully implemented an ETL data pipeline using Python!

This hands-on example demonstrates how to automate the process of moving data from CSV files and APIs into a database, streamlining your data processing workflows and making them more efficient and scalable.

Sentry image

See why 4M developers consider Sentry, “not bad.”

Fixing code doesn’t have to be the worst part of your day. Learn how Sentry can help.

Learn more

Top comments (0)

Image of Docusign

🛠️ Bring your solution into Docusign. Reach over 1.6M customers.

Docusign is now extensible. Overcome challenges with disconnected products and inaccessible data by bringing your solutions into Docusign and publishing to 1.6M customers in the App Center.

Learn more