Introduction
We've covered the theory. We've discussed the tools. We've explored the math.
Now it's time to build.
In this article, you'll create a complete data pipeline from scratch. Not a toy example — a real, working pipeline that follows industry patterns.
By the end, you'll have:
- Extracted data from an API
- Transformed it using Python
- Loaded it into a database
- Orchestrated everything with Airflow
Let's get to work.
What We're Building
We'll build a pipeline that:
- Extracts weather data from a public API
- Transforms the raw JSON into a clean structure
- Loads the data into a PostgreSQL database
- Orchestrates the process to run daily
This pattern — Extract, Transform, Load — applies to 90% of pipelines you'll encounter professionally.
Architecture Overview
[Weather API] → [Python Extract] → [Transform] → [PostgreSQL]
↑
[Airflow Scheduler]
Prerequisites
Before starting, ensure you have:
- Python 3.8+ installed
- PostgreSQL installed (or use Docker)
- Basic command line familiarity
- A code editor
Project Setup
Create your project structure:
mkdir weather_pipeline
cd weather_pipeline
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install requests psycopg2-binary pandas apache-airflow
Create the folder structure:
weather_pipeline/
├── dags/
│ └── weather_dag.py
├── src/
│ ├── __init__.py
│ ├── extract.py
│ ├── transform.py
│ └── load.py
├── config/
│ └── config.py
├── tests/
│ └── test_pipeline.py
└── requirements.txt
Step 1: Configuration
First, centralize your configuration. Never hardcode credentials.
config/config.py
import os
# API Configuration
API_BASE_URL = "https://api.open-meteo.com/v1/forecast"
LOCATIONS = [
{"name": "London", "lat": 51.5074, "lon": -0.1278},
{"name": "New York", "lat": 40.7128, "lon": -74.0060},
{"name": "Tokyo", "lat": 35.6762, "lon": 139.6503},
]
# Database Configuration
DB_CONFIG = {
"host": os.getenv("DB_HOST", "localhost"),
"port": os.getenv("DB_PORT", "5432"),
"database": os.getenv("DB_NAME", "weather_db"),
"user": os.getenv("DB_USER", "postgres"),
"password": os.getenv("DB_PASSWORD", "postgres"),
}
Using environment variables for credentials is a best practice. Never commit passwords to version control.
Step 2: Extract
The extraction layer pulls data from the source — in this case, a weather API.
src/extract.py
import requests
from datetime import datetime
from typing import Dict, List, Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def extract_weather_data(
base_url: str,
latitude: float,
longitude: float,
location_name: str
) -> Optional[Dict]:
"""
Extract weather data from Open-Meteo API.
Args:
base_url: API base URL
latitude: Location latitude
longitude: Location longitude
location_name: Human-readable location name
Returns:
Raw API response as dictionary, or None if failed
"""
params = {
"latitude": latitude,
"longitude": longitude,
"current_weather": True,
"hourly": "temperature_2m,relative_humidity_2m,wind_speed_10m",
"timezone": "auto"
}
try:
logger.info(f"Extracting weather data for {location_name}")
response = requests.get(base_url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
data["location_name"] = location_name
data["extracted_at"] = datetime.utcnow().isoformat()
logger.info(f"Successfully extracted data for {location_name}")
return data
except requests.RequestException as e:
logger.error(f"Failed to extract data for {location_name}: {e}")
return None
def extract_all_locations(base_url: str, locations: List[Dict]) -> List[Dict]:
"""
Extract weather data for multiple locations.
Args:
base_url: API base URL
locations: List of location dictionaries with name, lat, lon
Returns:
List of raw API responses
"""
results = []
for location in locations:
data = extract_weather_data(
base_url=base_url,
latitude=location["lat"],
longitude=location["lon"],
location_name=location["name"]
)
if data:
results.append(data)
logger.info(f"Extracted data for {len(results)}/{len(locations)} locations")
return results
Key Principles
- Error handling — Never let one failure crash the entire pipeline
- Logging — You'll thank yourself when debugging at 2 AM
- Timeouts — APIs hang; always set timeouts
- Type hints — Makes code self-documenting
Step 3: Transform
The transformation layer cleans and structures the raw data.
src/transform.py
import pandas as pd
from datetime import datetime
from typing import Dict, List
import logging
logger = logging.getLogger(__name__)
def transform_current_weather(raw_data: Dict) -> Dict:
"""
Transform raw API response into clean current weather record.
Args:
raw_data: Raw API response dictionary
Returns:
Cleaned weather record
"""
current = raw_data.get("current_weather", {})
return {
"location_name": raw_data.get("location_name"),
"latitude": raw_data.get("latitude"),
"longitude": raw_data.get("longitude"),
"temperature_celsius": current.get("temperature"),
"wind_speed_kmh": current.get("windspeed"),
"wind_direction_degrees": current.get("winddirection"),
"weather_code": current.get("weathercode"),
"observation_time": current.get("time"),
"extracted_at": raw_data.get("extracted_at"),
"loaded_at": datetime.utcnow().isoformat()
}
def transform_hourly_weather(raw_data: Dict) -> List[Dict]:
"""
Transform raw API response into hourly weather records.
Args:
raw_data: Raw API response dictionary
Returns:
List of hourly weather records
"""
hourly = raw_data.get("hourly", {})
location_name = raw_data.get("location_name")
times = hourly.get("time", [])
temperatures = hourly.get("temperature_2m", [])
humidity = hourly.get("relative_humidity_2m", [])
wind_speeds = hourly.get("wind_speed_10m", [])
records = []
for i in range(len(times)):
records.append({
"location_name": location_name,
"forecast_time": times[i],
"temperature_celsius": temperatures[i] if i < len(temperatures) else None,
"relative_humidity_percent": humidity[i] if i < len(humidity) else None,
"wind_speed_kmh": wind_speeds[i] if i < len(wind_speeds) else None,
"loaded_at": datetime.utcnow().isoformat()
})
return records
def transform_all_data(raw_data_list: List[Dict]) -> Dict[str, pd.DataFrame]:
"""
Transform all raw data into structured DataFrames.
Args:
raw_data_list: List of raw API responses
Returns:
Dictionary with 'current' and 'hourly' DataFrames
"""
current_records = []
hourly_records = []
for raw_data in raw_data_list:
current_records.append(transform_current_weather(raw_data))
hourly_records.extend(transform_hourly_weather(raw_data))
current_df = pd.DataFrame(current_records)
hourly_df = pd.DataFrame(hourly_records)
# Data quality checks
logger.info(f"Transformed {len(current_df)} current weather records")
logger.info(f"Transformed {len(hourly_df)} hourly weather records")
# Remove duplicates
current_df = current_df.drop_duplicates(
subset=["location_name", "observation_time"]
)
hourly_df = hourly_df.drop_duplicates(
subset=["location_name", "forecast_time"]
)
return {
"current": current_df,
"hourly": hourly_df
}
Key Principles
- Flatten nested structures — Databases prefer flat tables
- Handle missing data — Always assume fields might be missing
- Deduplicate — Never trust source data to be clean
- Separate concerns — One function, one responsibility
Step 4: Load
The load layer persists data to the destination.
src/load.py
import psycopg2
from psycopg2.extras import execute_values
import pandas as pd
from typing import Dict
import logging
logger = logging.getLogger(__name__)
def get_connection(db_config: Dict):
"""Create database connection."""
return psycopg2.connect(**db_config)
def create_tables(db_config: Dict) -> None:
"""
Create tables if they don't exist.
Args:
db_config: Database configuration dictionary
"""
create_current_table = """
CREATE TABLE IF NOT EXISTS current_weather (
id SERIAL PRIMARY KEY,
location_name VARCHAR(100),
latitude DECIMAL(9,6),
longitude DECIMAL(9,6),
temperature_celsius DECIMAL(5,2),
wind_speed_kmh DECIMAL(6,2),
wind_direction_degrees INTEGER,
weather_code INTEGER,
observation_time TIMESTAMP,
extracted_at TIMESTAMP,
loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(location_name, observation_time)
);
"""
create_hourly_table = """
CREATE TABLE IF NOT EXISTS hourly_weather (
id SERIAL PRIMARY KEY,
location_name VARCHAR(100),
forecast_time TIMESTAMP,
temperature_celsius DECIMAL(5,2),
relative_humidity_percent INTEGER,
wind_speed_kmh DECIMAL(6,2),
loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(location_name, forecast_time)
);
"""
conn = get_connection(db_config)
try:
with conn.cursor() as cur:
cur.execute(create_current_table)
cur.execute(create_hourly_table)
conn.commit()
logger.info("Tables created successfully")
finally:
conn.close()
def load_dataframe(
df: pd.DataFrame,
table_name: str,
db_config: Dict
) -> int:
"""
Load DataFrame into PostgreSQL table using upsert.
Args:
df: DataFrame to load
table_name: Target table name
db_config: Database configuration
Returns:
Number of rows loaded
"""
if df.empty:
logger.warning(f"No data to load into {table_name}")
return 0
conn = get_connection(db_config)
try:
columns = df.columns.tolist()
values = [tuple(row) for row in df.values]
insert_query = f"""
INSERT INTO {table_name} ({', '.join(columns)})
VALUES %s
ON CONFLICT DO NOTHING
"""
with conn.cursor() as cur:
execute_values(cur, insert_query, values)
rows_affected = cur.rowcount
conn.commit()
logger.info(f"Loaded {rows_affected} rows into {table_name}")
return rows_affected
except Exception as e:
conn.rollback()
logger.error(f"Failed to load data into {table_name}: {e}")
raise
finally:
conn.close()
def load_all_data(data: Dict[str, pd.DataFrame], db_config: Dict) -> Dict[str, int]:
"""
Load all transformed data into database.
Args:
data: Dictionary with DataFrames
db_config: Database configuration
Returns:
Dictionary with row counts per table
"""
create_tables(db_config)
results = {}
results["current_weather"] = load_dataframe(
data["current"], "current_weather", db_config
)
results["hourly_weather"] = load_dataframe(
data["hourly"], "hourly_weather", db_config
)
return results
Key Principles
- Idempotency — Running twice should not create duplicates
- Upsert pattern — INSERT ... ON CONFLICT handles re-runs
- Transaction management — Commit on success, rollback on failure
- Connection handling — Always close connections
Step 5: Putting It Together
Create a main script that runs the full pipeline:
src/main.py
from extract import extract_all_locations
from transform import transform_all_data
from load import load_all_data
import sys
sys.path.append('..')
from config.config import API_BASE_URL, LOCATIONS, DB_CONFIG
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def run_pipeline():
"""Execute the complete ETL pipeline."""
logger.info("Starting weather pipeline")
# Extract
logger.info("Phase 1: Extract")
raw_data = extract_all_locations(API_BASE_URL, LOCATIONS)
if not raw_data:
logger.error("No data extracted. Aborting pipeline.")
return False
# Transform
logger.info("Phase 2: Transform")
transformed_data = transform_all_data(raw_data)
# Load
logger.info("Phase 3: Load")
results = load_all_data(transformed_data, DB_CONFIG)
logger.info(f"Pipeline complete. Results: {results}")
return True
if __name__ == "__main__":
success = run_pipeline()
exit(0 if success else 1)
Test it:
python src/main.py
Step 6: Orchestration with Airflow
Now let's schedule this pipeline to run automatically.
dags/weather_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import sys
sys.path.insert(0, '/path/to/weather_pipeline')
from src.extract import extract_all_locations
from src.transform import transform_all_data
from src.load import load_all_data
from config.config import API_BASE_URL, LOCATIONS, DB_CONFIG
default_args = {
'owner': 'data_engineering',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
def extract_task(**context):
"""Airflow task for extraction."""
raw_data = extract_all_locations(API_BASE_URL, LOCATIONS)
context['task_instance'].xcom_push(key='raw_data', value=raw_data)
return len(raw_data)
def transform_task(**context):
"""Airflow task for transformation."""
raw_data = context['task_instance'].xcom_pull(
task_ids='extract', key='raw_data'
)
transformed = transform_all_data(raw_data)
# Convert DataFrames to dict for XCom
transformed_dict = {
'current': transformed['current'].to_dict('records'),
'hourly': transformed['hourly'].to_dict('records')
}
context['task_instance'].xcom_push(key='transformed_data', value=transformed_dict)
return True
def load_task(**context):
"""Airflow task for loading."""
import pandas as pd
transformed_dict = context['task_instance'].xcom_pull(
task_ids='transform', key='transformed_data'
)
transformed_data = {
'current': pd.DataFrame(transformed_dict['current']),
'hourly': pd.DataFrame(transformed_dict['hourly'])
}
results = load_all_data(transformed_data, DB_CONFIG)
return results
with DAG(
dag_id='weather_pipeline',
default_args=default_args,
description='Daily weather data pipeline',
schedule_interval='0 6 * * *', # Run at 6 AM daily
start_date=datetime(2025, 1, 1),
catchup=False,
tags=['weather', 'etl'],
) as dag:
extract = PythonOperator(
task_id='extract',
python_callable=extract_task,
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_task,
)
load = PythonOperator(
task_id='load',
python_callable=load_task,
)
# Define dependencies
extract >> transform >> load
DAG Explanation
- schedule_interval — Cron expression for when to run
- retries — Automatic retry on failure
- XCom — Passes data between tasks
-
Dependencies —
>>defines execution order
Step 7: Testing Your Pipeline
Always test your code.
tests/test_pipeline.py
import pytest
from src.extract import extract_weather_data
from src.transform import transform_current_weather
from config.config import API_BASE_URL
class TestExtract:
def test_extract_weather_data_success(self):
result = extract_weather_data(
base_url=API_BASE_URL,
latitude=51.5074,
longitude=-0.1278,
location_name="London"
)
assert result is not None
assert "current_weather" in result
assert result["location_name"] == "London"
def test_extract_weather_data_invalid_coordinates(self):
result = extract_weather_data(
base_url=API_BASE_URL,
latitude=9999,
longitude=9999,
location_name="Invalid"
)
# API should still return something or None
# This tests error handling
assert result is None or "error" in result
class TestTransform:
def test_transform_current_weather(self):
raw_data = {
"location_name": "London",
"latitude": 51.5074,
"longitude": -0.1278,
"current_weather": {
"temperature": 15.5,
"windspeed": 10.2,
"winddirection": 180,
"weathercode": 1,
"time": "2025-01-15T12:00"
},
"extracted_at": "2025-01-15T12:00:00"
}
result = transform_current_weather(raw_data)
assert result["location_name"] == "London"
assert result["temperature_celsius"] == 15.5
assert result["wind_speed_kmh"] == 10.2
def test_transform_handles_missing_data(self):
raw_data = {
"location_name": "Unknown",
"current_weather": {}
}
result = transform_current_weather(raw_data)
assert result["location_name"] == "Unknown"
assert result["temperature_celsius"] is None
Run tests:
pytest tests/ -v
Best Practices Summary
| Practice | Why It Matters |
|---|---|
| Modular code | Easy to test and maintain |
| Error handling | Pipelines fail gracefully |
| Logging | Debug issues quickly |
| Idempotency | Safe to re-run |
| Configuration | No hardcoded values |
| Testing | Catch bugs early |
| Type hints | Self-documenting code |
Common Mistakes to Avoid
- No error handling — One bad record crashes everything
- Hardcoded credentials — Security nightmare
- No logging — Impossible to debug
- Monolithic functions — Untestable code
- Ignoring data types — Causes silent failures
- No idempotency — Duplicates everywhere
What's Next?
You've built your first pipeline. It extracts, transforms, loads, and runs on a schedule.
In the final article, we'll discuss how to continue your learning journey with recommended courses, certifications, and resources.
Series Overview
- Data Engineering Uncovered: What It Is and Why It Matters
- Pipelines, ETL, and Warehouses: The DNA of Data Engineering
- Tools of the Trade: What Powers Modern Data Engineering
- The Math You Actually Need as a Data Engineer
- Building Your First Pipeline: From Concept to Execution (You are here)
- Charting Your Path: Courses and Resources to Accelerate Your Journey
Built your first pipeline? Have questions? Share in the comments.
Top comments (0)