DEV Community

Vinicius Fagundes
Vinicius Fagundes

Posted on

Building Your First Pipeline: From Concept to Execution

Introduction

We've covered the theory. We've discussed the tools. We've explored the math.

Now it's time to build.

In this article, you'll create a complete data pipeline from scratch. Not a toy example — a real, working pipeline that follows industry patterns.

By the end, you'll have:

  • Extracted data from an API
  • Transformed it using Python
  • Loaded it into a database
  • Orchestrated everything with Airflow

Let's get to work.


What We're Building

We'll build a pipeline that:

  1. Extracts weather data from a public API
  2. Transforms the raw JSON into a clean structure
  3. Loads the data into a PostgreSQL database
  4. Orchestrates the process to run daily

This pattern — Extract, Transform, Load — applies to 90% of pipelines you'll encounter professionally.

Architecture Overview

[Weather API] → [Python Extract] → [Transform] → [PostgreSQL] 
                            ↑
                    [Airflow Scheduler]
Enter fullscreen mode Exit fullscreen mode

Prerequisites

Before starting, ensure you have:

  • Python 3.8+ installed
  • PostgreSQL installed (or use Docker)
  • Basic command line familiarity
  • A code editor

Project Setup

Create your project structure:

mkdir weather_pipeline
cd weather_pipeline

# Create virtual environment
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

# Install dependencies
pip install requests psycopg2-binary pandas apache-airflow
Enter fullscreen mode Exit fullscreen mode

Create the folder structure:

weather_pipeline/
├── dags/
│   └── weather_dag.py
├── src/
│   ├── __init__.py
│   ├── extract.py
│   ├── transform.py
│   └── load.py
├── config/
│   └── config.py
├── tests/
│   └── test_pipeline.py
└── requirements.txt
Enter fullscreen mode Exit fullscreen mode

Step 1: Configuration

First, centralize your configuration. Never hardcode credentials.

config/config.py

import os

# API Configuration
API_BASE_URL = "https://api.open-meteo.com/v1/forecast"
LOCATIONS = [
    {"name": "London", "lat": 51.5074, "lon": -0.1278},
    {"name": "New York", "lat": 40.7128, "lon": -74.0060},
    {"name": "Tokyo", "lat": 35.6762, "lon": 139.6503},
]

# Database Configuration
DB_CONFIG = {
    "host": os.getenv("DB_HOST", "localhost"),
    "port": os.getenv("DB_PORT", "5432"),
    "database": os.getenv("DB_NAME", "weather_db"),
    "user": os.getenv("DB_USER", "postgres"),
    "password": os.getenv("DB_PASSWORD", "postgres"),
}
Enter fullscreen mode Exit fullscreen mode

Using environment variables for credentials is a best practice. Never commit passwords to version control.


Step 2: Extract

The extraction layer pulls data from the source — in this case, a weather API.

src/extract.py

import requests
from datetime import datetime
from typing import Dict, List, Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def extract_weather_data(
    base_url: str,
    latitude: float,
    longitude: float,
    location_name: str
) -> Optional[Dict]:
    """
    Extract weather data from Open-Meteo API.

    Args:
        base_url: API base URL
        latitude: Location latitude
        longitude: Location longitude
        location_name: Human-readable location name

    Returns:
        Raw API response as dictionary, or None if failed
    """
    params = {
        "latitude": latitude,
        "longitude": longitude,
        "current_weather": True,
        "hourly": "temperature_2m,relative_humidity_2m,wind_speed_10m",
        "timezone": "auto"
    }

    try:
        logger.info(f"Extracting weather data for {location_name}")
        response = requests.get(base_url, params=params, timeout=30)
        response.raise_for_status()

        data = response.json()
        data["location_name"] = location_name
        data["extracted_at"] = datetime.utcnow().isoformat()

        logger.info(f"Successfully extracted data for {location_name}")
        return data

    except requests.RequestException as e:
        logger.error(f"Failed to extract data for {location_name}: {e}")
        return None


def extract_all_locations(base_url: str, locations: List[Dict]) -> List[Dict]:
    """
    Extract weather data for multiple locations.

    Args:
        base_url: API base URL
        locations: List of location dictionaries with name, lat, lon

    Returns:
        List of raw API responses
    """
    results = []

    for location in locations:
        data = extract_weather_data(
            base_url=base_url,
            latitude=location["lat"],
            longitude=location["lon"],
            location_name=location["name"]
        )
        if data:
            results.append(data)

    logger.info(f"Extracted data for {len(results)}/{len(locations)} locations")
    return results
Enter fullscreen mode Exit fullscreen mode

Key Principles

  • Error handling — Never let one failure crash the entire pipeline
  • Logging — You'll thank yourself when debugging at 2 AM
  • Timeouts — APIs hang; always set timeouts
  • Type hints — Makes code self-documenting

Step 3: Transform

The transformation layer cleans and structures the raw data.

src/transform.py

import pandas as pd
from datetime import datetime
from typing import Dict, List
import logging

logger = logging.getLogger(__name__)


def transform_current_weather(raw_data: Dict) -> Dict:
    """
    Transform raw API response into clean current weather record.

    Args:
        raw_data: Raw API response dictionary

    Returns:
        Cleaned weather record
    """
    current = raw_data.get("current_weather", {})

    return {
        "location_name": raw_data.get("location_name"),
        "latitude": raw_data.get("latitude"),
        "longitude": raw_data.get("longitude"),
        "temperature_celsius": current.get("temperature"),
        "wind_speed_kmh": current.get("windspeed"),
        "wind_direction_degrees": current.get("winddirection"),
        "weather_code": current.get("weathercode"),
        "observation_time": current.get("time"),
        "extracted_at": raw_data.get("extracted_at"),
        "loaded_at": datetime.utcnow().isoformat()
    }


def transform_hourly_weather(raw_data: Dict) -> List[Dict]:
    """
    Transform raw API response into hourly weather records.

    Args:
        raw_data: Raw API response dictionary

    Returns:
        List of hourly weather records
    """
    hourly = raw_data.get("hourly", {})
    location_name = raw_data.get("location_name")

    times = hourly.get("time", [])
    temperatures = hourly.get("temperature_2m", [])
    humidity = hourly.get("relative_humidity_2m", [])
    wind_speeds = hourly.get("wind_speed_10m", [])

    records = []
    for i in range(len(times)):
        records.append({
            "location_name": location_name,
            "forecast_time": times[i],
            "temperature_celsius": temperatures[i] if i < len(temperatures) else None,
            "relative_humidity_percent": humidity[i] if i < len(humidity) else None,
            "wind_speed_kmh": wind_speeds[i] if i < len(wind_speeds) else None,
            "loaded_at": datetime.utcnow().isoformat()
        })

    return records


def transform_all_data(raw_data_list: List[Dict]) -> Dict[str, pd.DataFrame]:
    """
    Transform all raw data into structured DataFrames.

    Args:
        raw_data_list: List of raw API responses

    Returns:
        Dictionary with 'current' and 'hourly' DataFrames
    """
    current_records = []
    hourly_records = []

    for raw_data in raw_data_list:
        current_records.append(transform_current_weather(raw_data))
        hourly_records.extend(transform_hourly_weather(raw_data))

    current_df = pd.DataFrame(current_records)
    hourly_df = pd.DataFrame(hourly_records)

    # Data quality checks
    logger.info(f"Transformed {len(current_df)} current weather records")
    logger.info(f"Transformed {len(hourly_df)} hourly weather records")

    # Remove duplicates
    current_df = current_df.drop_duplicates(
        subset=["location_name", "observation_time"]
    )
    hourly_df = hourly_df.drop_duplicates(
        subset=["location_name", "forecast_time"]
    )

    return {
        "current": current_df,
        "hourly": hourly_df
    }
Enter fullscreen mode Exit fullscreen mode

Key Principles

  • Flatten nested structures — Databases prefer flat tables
  • Handle missing data — Always assume fields might be missing
  • Deduplicate — Never trust source data to be clean
  • Separate concerns — One function, one responsibility

Step 4: Load

The load layer persists data to the destination.

src/load.py

import psycopg2
from psycopg2.extras import execute_values
import pandas as pd
from typing import Dict
import logging

logger = logging.getLogger(__name__)


def get_connection(db_config: Dict):
    """Create database connection."""
    return psycopg2.connect(**db_config)


def create_tables(db_config: Dict) -> None:
    """
    Create tables if they don't exist.

    Args:
        db_config: Database configuration dictionary
    """
    create_current_table = """
    CREATE TABLE IF NOT EXISTS current_weather (
        id SERIAL PRIMARY KEY,
        location_name VARCHAR(100),
        latitude DECIMAL(9,6),
        longitude DECIMAL(9,6),
        temperature_celsius DECIMAL(5,2),
        wind_speed_kmh DECIMAL(6,2),
        wind_direction_degrees INTEGER,
        weather_code INTEGER,
        observation_time TIMESTAMP,
        extracted_at TIMESTAMP,
        loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        UNIQUE(location_name, observation_time)
    );
    """

    create_hourly_table = """
    CREATE TABLE IF NOT EXISTS hourly_weather (
        id SERIAL PRIMARY KEY,
        location_name VARCHAR(100),
        forecast_time TIMESTAMP,
        temperature_celsius DECIMAL(5,2),
        relative_humidity_percent INTEGER,
        wind_speed_kmh DECIMAL(6,2),
        loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        UNIQUE(location_name, forecast_time)
    );
    """

    conn = get_connection(db_config)
    try:
        with conn.cursor() as cur:
            cur.execute(create_current_table)
            cur.execute(create_hourly_table)
        conn.commit()
        logger.info("Tables created successfully")
    finally:
        conn.close()


def load_dataframe(
    df: pd.DataFrame,
    table_name: str,
    db_config: Dict
) -> int:
    """
    Load DataFrame into PostgreSQL table using upsert.

    Args:
        df: DataFrame to load
        table_name: Target table name
        db_config: Database configuration

    Returns:
        Number of rows loaded
    """
    if df.empty:
        logger.warning(f"No data to load into {table_name}")
        return 0

    conn = get_connection(db_config)
    try:
        columns = df.columns.tolist()
        values = [tuple(row) for row in df.values]

        insert_query = f"""
            INSERT INTO {table_name} ({', '.join(columns)})
            VALUES %s
            ON CONFLICT DO NOTHING
        """

        with conn.cursor() as cur:
            execute_values(cur, insert_query, values)
            rows_affected = cur.rowcount

        conn.commit()
        logger.info(f"Loaded {rows_affected} rows into {table_name}")
        return rows_affected

    except Exception as e:
        conn.rollback()
        logger.error(f"Failed to load data into {table_name}: {e}")
        raise
    finally:
        conn.close()


def load_all_data(data: Dict[str, pd.DataFrame], db_config: Dict) -> Dict[str, int]:
    """
    Load all transformed data into database.

    Args:
        data: Dictionary with DataFrames
        db_config: Database configuration

    Returns:
        Dictionary with row counts per table
    """
    create_tables(db_config)

    results = {}
    results["current_weather"] = load_dataframe(
        data["current"], "current_weather", db_config
    )
    results["hourly_weather"] = load_dataframe(
        data["hourly"], "hourly_weather", db_config
    )

    return results
Enter fullscreen mode Exit fullscreen mode

Key Principles

  • Idempotency — Running twice should not create duplicates
  • Upsert pattern — INSERT ... ON CONFLICT handles re-runs
  • Transaction management — Commit on success, rollback on failure
  • Connection handling — Always close connections

Step 5: Putting It Together

Create a main script that runs the full pipeline:

src/main.py

from extract import extract_all_locations
from transform import transform_all_data
from load import load_all_data
import sys
sys.path.append('..')
from config.config import API_BASE_URL, LOCATIONS, DB_CONFIG
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


def run_pipeline():
    """Execute the complete ETL pipeline."""
    logger.info("Starting weather pipeline")

    # Extract
    logger.info("Phase 1: Extract")
    raw_data = extract_all_locations(API_BASE_URL, LOCATIONS)

    if not raw_data:
        logger.error("No data extracted. Aborting pipeline.")
        return False

    # Transform
    logger.info("Phase 2: Transform")
    transformed_data = transform_all_data(raw_data)

    # Load
    logger.info("Phase 3: Load")
    results = load_all_data(transformed_data, DB_CONFIG)

    logger.info(f"Pipeline complete. Results: {results}")
    return True


if __name__ == "__main__":
    success = run_pipeline()
    exit(0 if success else 1)
Enter fullscreen mode Exit fullscreen mode

Test it:

python src/main.py
Enter fullscreen mode Exit fullscreen mode

Step 6: Orchestration with Airflow

Now let's schedule this pipeline to run automatically.

dags/weather_dag.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import sys
sys.path.insert(0, '/path/to/weather_pipeline')

from src.extract import extract_all_locations
from src.transform import transform_all_data
from src.load import load_all_data
from config.config import API_BASE_URL, LOCATIONS, DB_CONFIG


default_args = {
    'owner': 'data_engineering',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}


def extract_task(**context):
    """Airflow task for extraction."""
    raw_data = extract_all_locations(API_BASE_URL, LOCATIONS)
    context['task_instance'].xcom_push(key='raw_data', value=raw_data)
    return len(raw_data)


def transform_task(**context):
    """Airflow task for transformation."""
    raw_data = context['task_instance'].xcom_pull(
        task_ids='extract', key='raw_data'
    )
    transformed = transform_all_data(raw_data)

    # Convert DataFrames to dict for XCom
    transformed_dict = {
        'current': transformed['current'].to_dict('records'),
        'hourly': transformed['hourly'].to_dict('records')
    }
    context['task_instance'].xcom_push(key='transformed_data', value=transformed_dict)
    return True


def load_task(**context):
    """Airflow task for loading."""
    import pandas as pd

    transformed_dict = context['task_instance'].xcom_pull(
        task_ids='transform', key='transformed_data'
    )

    transformed_data = {
        'current': pd.DataFrame(transformed_dict['current']),
        'hourly': pd.DataFrame(transformed_dict['hourly'])
    }

    results = load_all_data(transformed_data, DB_CONFIG)
    return results


with DAG(
    dag_id='weather_pipeline',
    default_args=default_args,
    description='Daily weather data pipeline',
    schedule_interval='0 6 * * *',  # Run at 6 AM daily
    start_date=datetime(2025, 1, 1),
    catchup=False,
    tags=['weather', 'etl'],
) as dag:

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_task,
    )

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_task,
    )

    load = PythonOperator(
        task_id='load',
        python_callable=load_task,
    )

    # Define dependencies
    extract >> transform >> load
Enter fullscreen mode Exit fullscreen mode

DAG Explanation

  • schedule_interval — Cron expression for when to run
  • retries — Automatic retry on failure
  • XCom — Passes data between tasks
  • Dependencies>> defines execution order

Step 7: Testing Your Pipeline

Always test your code.

tests/test_pipeline.py

import pytest
from src.extract import extract_weather_data
from src.transform import transform_current_weather
from config.config import API_BASE_URL


class TestExtract:
    def test_extract_weather_data_success(self):
        result = extract_weather_data(
            base_url=API_BASE_URL,
            latitude=51.5074,
            longitude=-0.1278,
            location_name="London"
        )

        assert result is not None
        assert "current_weather" in result
        assert result["location_name"] == "London"

    def test_extract_weather_data_invalid_coordinates(self):
        result = extract_weather_data(
            base_url=API_BASE_URL,
            latitude=9999,
            longitude=9999,
            location_name="Invalid"
        )

        # API should still return something or None
        # This tests error handling
        assert result is None or "error" in result


class TestTransform:
    def test_transform_current_weather(self):
        raw_data = {
            "location_name": "London",
            "latitude": 51.5074,
            "longitude": -0.1278,
            "current_weather": {
                "temperature": 15.5,
                "windspeed": 10.2,
                "winddirection": 180,
                "weathercode": 1,
                "time": "2025-01-15T12:00"
            },
            "extracted_at": "2025-01-15T12:00:00"
        }

        result = transform_current_weather(raw_data)

        assert result["location_name"] == "London"
        assert result["temperature_celsius"] == 15.5
        assert result["wind_speed_kmh"] == 10.2

    def test_transform_handles_missing_data(self):
        raw_data = {
            "location_name": "Unknown",
            "current_weather": {}
        }

        result = transform_current_weather(raw_data)

        assert result["location_name"] == "Unknown"
        assert result["temperature_celsius"] is None
Enter fullscreen mode Exit fullscreen mode

Run tests:

pytest tests/ -v
Enter fullscreen mode Exit fullscreen mode

Best Practices Summary

Practice Why It Matters
Modular code Easy to test and maintain
Error handling Pipelines fail gracefully
Logging Debug issues quickly
Idempotency Safe to re-run
Configuration No hardcoded values
Testing Catch bugs early
Type hints Self-documenting code

Common Mistakes to Avoid

  1. No error handling — One bad record crashes everything
  2. Hardcoded credentials — Security nightmare
  3. No logging — Impossible to debug
  4. Monolithic functions — Untestable code
  5. Ignoring data types — Causes silent failures
  6. No idempotency — Duplicates everywhere

What's Next?

You've built your first pipeline. It extracts, transforms, loads, and runs on a schedule.

In the final article, we'll discuss how to continue your learning journey with recommended courses, certifications, and resources.


Series Overview

  1. Data Engineering Uncovered: What It Is and Why It Matters
  2. Pipelines, ETL, and Warehouses: The DNA of Data Engineering
  3. Tools of the Trade: What Powers Modern Data Engineering
  4. The Math You Actually Need as a Data Engineer
  5. Building Your First Pipeline: From Concept to Execution (You are here)
  6. Charting Your Path: Courses and Resources to Accelerate Your Journey

Built your first pipeline? Have questions? Share in the comments.

Top comments (0)