Data Pipeline Orchestration with Apache Airflow
A dedicated repository for orchestrating enterprise data workflows, automating dependencies, and scheduling ETL pipelines using Apache Airflow. This repository serves as the centralized automation layer that handles job scheduling, failure retries, and sequential execution for data engineering tasks.
Project Overview
This project showcases an automated Apache Airflow Directed Acyclic Graph (DAG) designed to manage a modular Python ETL pipeline. Instead of relying on manual script invocation or cron jobs, this architecture leverages Airflow's core workflow management capabilities to build a resilient, observable data infrastructure.
Core Architecture Components
- Orchestrator (DAG.py): The primary workflow engine definition that schedules, monitors, and structures task groups.
- Upstream Modules: Connected nodes executing individual pipeline stages—integrating APIs, custom transformation wrappers, and database loader clients.
Workflow Topology (DAG)
The workflow breaks down the weather processing pipeline into three decoupled, sequential tasks. Each task executes within an isolated transaction layer, ensuring that failure states are trapped immediately before processing downstream.
[extract_weather_data] ──► [transform_weather_data] ──► [load_weather_data]
Task Definitions
extract_weather_data: Invokes the upstream data extractor, managing structural payload captures from the designated edge nodes.
transform_weather_data: Handles downstream formatting, mapping unstructured data keys into deterministic relational data properties.
load_weather_data: Manages final storage delivery targets, committing structured logs into target analytical engines.
Infrastructure & Tech Stack
Core Engine: Apache Airflow
Runtime Environment: Python 3.8+ (Ubuntu / Linux Environment)
Execution Libraries: airflow.models, airflow.operators.python
Schedule Target: Hourly (@hourly)
Getting Started & Local Installation
Follow these steps to deploy and execute this DAG locally within your Apache Airflow environment on an Ubuntu system.
Clone the Tracking Directory
git clone [https://github.com/wangecindovu-lab/DAGS-using-Apache-Airflow.git](https://github.com/wangecindovu-lab/DAGS-using-Apache-Airflow.git)
cd DAGS-using-Apache-Airflow
Configure Airflow Environment Variables
Set your default home path for Airflow before initializing the database layout:
export AIRFLOW_HOME=~/airflow
Install Apache Airflow Dependencies
pip install "apache-airflow==2.8.1" --constraint "[https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt](https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt)"
Link the DAG to Your Airflow Deployment
Airflow scans the ~/airflow/dags directory by default. Symlink or copy DAG.py to your deployments path:
mkdir -p ~/airflow/dags
cp DAG.py ~/airflow/dags/weather_etl_dag.py
Initialize Airflow Database & Start Services
Initialize the underlying relational metadata engine, register an administrative user, and start both the scheduler and webserver:
# Initialize metadata database
airflow db init
Create an admin user account
airflow users create \
--username admin \
--firstname Wangeci \
--lastname Ndovu \
--role Admin \
--email Wangecindovu@gmail.com \
--password admin
Start the web server (accessible via http://localhost:8080)
airflow webserver --port 8080 -D
Start the pipeline scheduler engine
airflow scheduler
Configuration & Operational Safeguards
The pipeline uses strict execution constraints defined within the DAG default properties layer:
retries: Configured to safely try again up to 2 consecutive times before raising system alert flags.
retry_delay: Enforces a 5-minute dead-interval spacing between retries to allow external services or target nodes to recover from temporary downtime.
catchup=False: Prevents the scheduler from back-triggering historical executions for missed time intervals when the pipeline is activated for the first time.
Top comments (0)