DEV Community

Mohamed Amin
Mohamed Amin

Posted on

Building Scalable Data Pipelines with Python – A Complete Guide.

What are Data Pipelines

A data pipeline refers to a series of steps used to automate the migration of data from a source to its destination. Sometimes, transformation is performed alongside migration to ensure the data is structured and clean for analysis.

Components of a Pipeline

The components of a pipeline refer to the elements that come together to form a data pipeline. These include:

  1. Data Sources - These can include databases, CSV files, APIs, and other file formats.
  2. Data Ingestion Methods - These refer to how data is loaded into the pipeline. There are two main methods:
    • Batch Processing
    • Stream Processing
  3. Data Processing - This refers to the techniques and tools used to transform data.
  4. Data Storage - This refers to where the data is stored, including data warehouses, data lakes, etc. This is usually the final destination of the data.

Key Functions of a Pipeline

The key functions of a pipeline include:

  1. Extract
  2. Transform
  3. Load

Considerations When Designing a Pipeline

When designing a data pipeline, the following factors should be considered:

  1. Scalability
  2. Maintainability
  3. Security
  4. Automation

Python ETL Implementation

In this section, we will see how to implement a simple ETL pipeline to read data from a CSV file and an API, then write the data to a PostgreSQL database.

1. Reading from a CSV File

Before building an ETL pipeline to read from a CSV file, we need:

  • A CSV file (generated using Mockaroo for dummy data).
  • A PostgreSQL database (created using Aiven and connected using DBeaver).

The following Python script demonstrates how to read from a CSV file and store the data in a PostgreSQL database:

import pandas as pd
from sqlalchemy import create_engine

# Create a connection to PostgreSQL
engine = create_engine("postgresql://username:password@localhost:5432/etl_db")

# Read CSV file
df = pd.read_csv("sales.csv")

# Data Cleaning and Transformation
df = df.dropna()
df = df.rename(columns={'id': 'sales_id'})

# Load data into PostgreSQL
df.to_sql("sales", engine, if_exists="append", index=False)
Enter fullscreen mode Exit fullscreen mode

The above code successfully migrates data from a CSV file to the database, demonstrating how an ETL pipeline works.

2. Reading from an API

We will fetch data from this API: Sample JSON Data. This dummy data represents staff members from a fictional company.

The ETL process is as follows:

import requests
import pandas as pd
from sqlalchemy import create_engine

# API URL
url = "https://raw.githubusercontent.com/LuxDevHQ/LuxDevHQDataEngineeringGuide/refs/heads/main/samplejson.json"

# Fetch data from API
response = requests.get(url)
data = response.json()

# Transform data
df = pd.DataFrame(data)
df = df[['name', 'position', 'country']]
df.columns = ['full_name', 'role', 'country']

# Load data into PostgreSQL
engine = create_engine("postgresql://username:password@localhost:5432/etl_db")
df.to_sql("staff_data", engine, if_exists="append", index=False)
Enter fullscreen mode Exit fullscreen mode

Conclusion

We have learned about data pipelines, their components, key functions, design considerations, and how to implement a simple ETL pipeline to read data from a CSV file and an API into a PostgreSQL database.

Image of Timescale

Timescale – the developer's data platform for modern apps, built on PostgreSQL

Timescale Cloud is PostgreSQL optimized for speed, scale, and performance. Over 3 million IoT, AI, crypto, and dev tool apps are powered by Timescale. Try it free today! No credit card required.

Try free

Top comments (0)

Sentry image

See why 4M developers consider Sentry, “not bad.”

Fixing code doesn’t have to be the worst part of your day. Learn how Sentry can help.

Learn more