DEV Community

Varun Joshi
Varun Joshi

Posted on

From Python to Production Pipeline :A Practical guide to Apache Airflow

You have been using python and you have written scripts that pull data, clean it and load it somewhere, May be you even have a while True loop running on the server somewhere or a cron job you are afraid to touch.

This tutorial is for you

Apache airflow is the tool that takes your python script from "runs on my machine" to "runs reliably at 6 am every day, retries on failure, sends an alert if something breaks and has UI to see what exactly happened.It is the standard orchestration tool at most data engineering teams and more approachable as it looks.

By the end of this post you will understand how airflow thinks and you will have a working DAG that schedules real data pipeline.

What Airflow actually is (and isn't)?

Airflow is an workflow orchestrator. It does not move the data itself but it schedules and monitors the tasks that do.

Think of it like a very smart CRON job:

  1. Knows the order the task should run.
  2. Retries failed tasks automatically.
  3. Keeps full history of every run.
  4. Shows you everything on the UI.
  5. Sends alert when things go wrong.

What it is not: a data processing engine. You still write Python to do the actual work. Airflow decides when to run it and what to do when it fails.

Core Concepts in 2 minutes

Before touching the code three terms that you should know:

DAG(Directed Acyclic Graph):This is your pipeline. A DAG is a python file that defines a set of tasks and the order they run in. The Acyclic part just means that tasks cannot loop back on their own.

Task: A Single unit of work inside a DAG, could be running a python function, executing a SQL query, calling an API or dozens of other things.

Operator: The building block of task, PythonOperator runs the Python function.BashOperator runs a shell command. PostGresOperator runs a SQL, airflow has operator for almost everything.

Setting up Airflow Locally

The quickest way to get Airflow running locally is with pip:

pip install apache-airflow
airflow dbt init
airflow user create \
   --username admin \
   --password admin \
   --firstname your \
   --lastname name \
   --role admin \
   --email admin@x.com \

Enter fullscreen mode Exit fullscreen mode

Start scheduler and server in two different terminals

#Terminal 1
airflow scheduler

#Terminal 2
airflow webserver --port 8080
Enter fullscreen mode Exit fullscreen mode

Now visit http://localhost:8080 -You will see the Airflow UI with a bunch of example DAG's already there.

Your first DAG: A Real Data Pipeline

Lets build something useful and not just a 'Hello world', we will build a pipeline :

1- Fetches data from a public API
2- Cleans and transforms it
3- Saves it in CSV

This mirrors a real ingestion pipeline without enterprise complexity.

Create a file called weather_pipeline.py inside your airflow dags/ folder.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
import pandas as pd
import os

default_args= {
    'owner': 'varun'
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'email_in_future': False,
}

#Define a DAG----

with DAG(
   dag_id='weather_pipeline'
   default_args = default_args,
   description = 'Fetch, transform and save daily weather data'
   schedule_interval = '0 6 * * *' # runs daily at 6 am
   start_date=datetime(2024,1,1)
   catchup = False,
   tags= ['Weather','Tutorial']
) as dag:
   pass
Enter fullscreen mode Exit fullscreen mode

*A few things to note
*

  • schedule_interval is a CRON expression.
  • catchup=False tells airflow not to run all the historical dates between start_date and today. You almost always want it False
  • Retries =2 means airflow will try it twice before marking it as failed.

Adding Tasks

Now adding three tasks that into the place of pass

def fetch_weather(**context):
    """Pull weather from open meteo (free, No API needed)"""
    url = "https://api.open-meteo.com/v1/forecast"
    params = {
       "latitude": 28.6,
       "longitude": 77.2,
       "daily":"temperature_2m_max, temperature_2m_min, precipitation_sum",
       "timezone":"Asia/Kolkata",
       "forecast_days": 1
 }
    response = request.get(url, params=params)
    response.raise_for_status() #raise an exception

    data = response.json()
    #Push data to Xcom so the next task can access it
    context['ti'].xcom_push(key='raw_weather', value =data)
    print(f"Fetched weather data for {data['daily']['time'][0]}")

def transform_weather(**context):
   """clean and reshape the raw API response"""
   raw = context['ti'].xcom_pull(key='raw_weather', task_ids='fetch_weather')
   daily = raw['daily']

   df = pd.DataFrame({
        'date': daily['time']
        'temp_max_c': daily['temperature_2m_max'],
        'temp_min_c':daily['temperature_2m_min'],
        'precipitation_mm':daily['precipitation_mm'],
        })
   context['ti'].xcom_push(key='clean_weather', value=df.to_dict('records'))
        print(f"✓ Transformed {len(df)} rows")


   df save_weather(**context):
      """Append the cleaned data to csv file"""
      records = context["ti"].xcom_pull(key='clean_weather', task_ids='transform_weather')

      df= pd.DataFrame(records)
      output_path= '/tmp/weather_data.csv'
      file_exists = os.path.exists(output_path)

      df.to_csv(output_path, mode='a', header=not file_exists, index=False)

      print(f"✓ Saved to {output_path}")


   #Wiring the tasks
   t1 = PythonOperator(task_id = 'fetch_weather', python_callable=fetch_weather)
   t2 = PythonOperator(task_id = 'transform_weather', python_callable=transform_weather)
   t3 = PythonOperator(task_id = 'Save_weather', python_callable=Save_weather)

Enter fullscreen mode Exit fullscreen mode

The t1>>t2>>t3 at the bottom is a way of saying that 'run these in this order'. This is one of those things that might seems weird but later you will get used to it.

Xcom:How tasks talk to each other

You must have noticed xcom_push and xcom_pull in the code.
This is how airflow task passes data between each other.

XCom(cross-communication) is a small-key store built into Airflow.
One task pushes a value, the next pulls it by TaskID and Key.

#Task-1 Push
context['ti'].xcom_push(key='my_data', value={'rows':42})
#Task-2 Pull
data = context['ti'].xcom_pull(key='my_data', task_ids='task_1')
Enter fullscreen mode Exit fullscreen mode

Xcom is stored in Airflow metadata database, so its not meant for large datasets.Use it for small payloads - counts, file paths,API responses.For large data, write to S3/GCS and pass the file path via Xcom.

How to Run

Once the file is in your dags/ folder.Airflow picks it up automatically. You should see it appear in http://localhost:8080.

To trigger in manually:

  1. Find weather_pipeline in the DAG list.
  2. Toggle it on.
  3. Trigger DAG

Look at the graph view and the task should turn green as it completes.If something fails the task shows Red.

Pattern that Matter in Production

  1. Use execution_date for idempotent pipelines.
def fetch_weather(**context):
    run_date = context['execution_date'].strftime('%Y-%m-%d')
    #fetch data for that specific date
Enter fullscreen mode Exit fullscreen mode

This makes it your job re-runnable so that if this fails for Tuesday, it can be run on Wednesday for fetching Tuesday data.

  1. Always set retries and retry_delay

Network calls fail. APIs go down, Databases go slow.
Setting retries saves you a 2 am call.

default_args = {
      'retries': 3
      'retry_delay' : timedelta(minutes=10)

}
Enter fullscreen mode Exit fullscreen mode
  1. Use on_failure_callback for alerting
def alert_on_failure(context):
    #send a slack message, email etc
    print(f"Task failed:{context['task_instance'].task_id}")

default_args = {
   'on_failure_callback': alert_on_failure,
}
Enter fullscreen mode Exit fullscreen mode
  1. Keep tasks small and focused

One task= one thing, Dont write a single giant function that does everything.Small tasks mean cleaner retries and easy to read logs.

That's it

You went from running the python code manually in your machine to scheduling it with retry functionality along with automatically executing tasks step by step with failure notification.

The learning curve is real, but it is mostly in the setup. Once you have written two or three DAG's, the pattern click and you are not going back to CRON.

Top comments (0)