What are Data Pipelines
A data pipeline refers to a series of steps used to automate the migration of data from a source to its destination. Sometimes, transformation is performed alongside migration to ensure the data is structured and clean for analysis.
Components of a Pipeline
The components of a pipeline refer to the elements that come together to form a data pipeline. These include:
- Data Sources - These can include databases, CSV files, APIs, and other file formats.
-
Data Ingestion Methods - These refer to how data is loaded into the pipeline. There are two main methods:
- Batch Processing
- Stream Processing
- Data Processing - This refers to the techniques and tools used to transform data.
- Data Storage - This refers to where the data is stored, including data warehouses, data lakes, etc. This is usually the final destination of the data.
Key Functions of a Pipeline
The key functions of a pipeline include:
- Extract
- Transform
- Load
Considerations When Designing a Pipeline
When designing a data pipeline, the following factors should be considered:
- Scalability
- Maintainability
- Security
- Automation
Python ETL Implementation
In this section, we will see how to implement a simple ETL pipeline to read data from a CSV file and an API, then write the data to a PostgreSQL database.
1. Reading from a CSV File
Before building an ETL pipeline to read from a CSV file, we need:
- A CSV file (generated using Mockaroo for dummy data).
- A PostgreSQL database (created using Aiven and connected using DBeaver).
The following Python script demonstrates how to read from a CSV file and store the data in a PostgreSQL database:
import pandas as pd
from sqlalchemy import create_engine
# Create a connection to PostgreSQL
engine = create_engine("postgresql://username:password@localhost:5432/etl_db")
# Read CSV file
df = pd.read_csv("sales.csv")
# Data Cleaning and Transformation
df = df.dropna()
df = df.rename(columns={'id': 'sales_id'})
# Load data into PostgreSQL
df.to_sql("sales", engine, if_exists="append", index=False)
The above code successfully migrates data from a CSV file to the database, demonstrating how an ETL pipeline works.
2. Reading from an API
We will fetch data from this API: Sample JSON Data. This dummy data represents staff members from a fictional company.
The ETL process is as follows:
import requests
import pandas as pd
from sqlalchemy import create_engine
# API URL
url = "https://raw.githubusercontent.com/LuxDevHQ/LuxDevHQDataEngineeringGuide/refs/heads/main/samplejson.json"
# Fetch data from API
response = requests.get(url)
data = response.json()
# Transform data
df = pd.DataFrame(data)
df = df[['name', 'position', 'country']]
df.columns = ['full_name', 'role', 'country']
# Load data into PostgreSQL
engine = create_engine("postgresql://username:password@localhost:5432/etl_db")
df.to_sql("staff_data", engine, if_exists="append", index=False)
Conclusion
We have learned about data pipelines, their components, key functions, design considerations, and how to implement a simple ETL pipeline to read data from a CSV file and an API into a PostgreSQL database.
Top comments (0)