DEV Community

Cover image for TaskFlow API vs. Traditional Operators: Practical Airflow ETL Pipeline
Gathuru_M
Gathuru_M

Posted on • Edited on

TaskFlow API vs. Traditional Operators: Practical Airflow ETL Pipeline

In my last article, we went over the foundational pillars of Apache Airflow—DAGs, Tasks, and why orchestration beats manual scripts.

For this practical Airflow project, we will build
an ETL pipeline to aggregate market data from Massive API. But instead of just writing it one way, we look into writing two different versions of the same DAG:

  1. The Traditional Approach: Using classic standard operators (PythonOperator) and manual XCom pulling/pushing.
  2. The Modern Approach: Using the TaskFlow API (@dag and @task decorators) to make the code clean and Pythonic.

If you’re confused about the difference or wondering which one you should use in your projects, let’s break down both.


The Goal: Aggregating Data from Massive API

The pipeline does three basic things:

  • Extract: Pulls raw daily open/close asset data from our market API client.
  • Transform: Normalizes the heavy, nested JSON payload into a clean structure using pandas.
  • Load: Inserts the structured records into a cloud PostgreSQL database.

Approach 1: The Traditional Way (Standard Operators)

When Airflow was first built, you had to explicitly define every single task using an Operator class and manually stitch them together using the bitshift operators (>>).

The trickiest part here is data sharing. Because tasks run in isolation, we have to use explicit XComs (cross-communications) to pass our API payload from the extract task to the transform task.

Here is how the traditional DAG looks:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
# (Assume our custom API extraction and DB loading logic are imported here)

default_args = {
    'owner': 'my_name',
    'start_date': datetime(2026, 6, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

def _extract(ti):
    # Fetching data from our massive asset API
    raw_data = {"ticker": "BTC", "price": 65000, "timestamp": "2026-06-07T00:00:00Z"} 
    # We MUST explicitly push to XCom so the next task can see it
    ti.xcom_push(key='raw_market_data', value=raw_data)

def _transform(ti):
    # We MUST explicitly pull from XCom
    raw_data = ti.xcom_pull(key='raw_market_data', task_ids='extract_task')
    print(f"Transforming data for: {raw_data['ticker']}")
    # Transformation logic goes here...
    return raw_data  # Returning automatically pushes to default XCom

with DAG(
    dag_id='market_data_traditional_v1',
    default_args=default_args,
    schedule_interval='@hourly',
    catchup=False
) as dag:

    extract_task = PythonOperator(
        task_id='extract_task',
        python_callable=_extract
    )

    transform_task = PythonOperator(
        task_id='transform_task',
        python_callable=_transform
    )

    # Stitching the pipeline dependency together
    extract_task >> transform_task

Enter fullscreen mode Exit fullscreen mode

The Verdict on Traditional:

It works perfectly, but it feels heavy. You have to write a lot of boilerplate code just to set up the tasks, and passing data requires explicitly passing the Task Instance (ti) and remembering exact task_ids string names. If you typo a string, your whole pipeline crashes.


Approach 2: The Modern Way (TaskFlow API)

Introduced in Airflow 2.0, the TaskFlow API uses Python decorators (@dag and @task). It fundamentally changes how we write pipelines by making Airflow handle XComs silently behind the scenes.

Look at how clean this version is:

from airflow.decorators import dag, task
from datetime import datetime, timedelta

default_args = {
    'owner': 'my_name',
    'start_date': datetime(2026, 6, 1),
    'retries': 1,
}

@dag(dag_id='market_data_taskflow_v1', default_args=default_args, schedule='@hourly', catchup=False)
def market_data_etl():

    @task()
    def extract():
        raw_data = {"ticker": "BTC", "price": 65000, "timestamp": "2026-06-07T00:00:00Z"}
        return raw_data  # No manual xcom_push! Airflow handles it.

    @task()
    def transform(raw_data: dict):
        # No manual xcom_pull! We just treat it like a regular Python variable.
        print(f"Transforming data for: {raw_data['ticker']}")
        return raw_data

    # This single clean line sets up dependencies AND passes data!
    market_data = extract()
    transform(market_data)

# Instantiate the DAG
market_data_etl_dag = market_data_etl()

Enter fullscreen mode Exit fullscreen mode

The Verdict on TaskFlow:

This feels like writing native Python! You don’t have to instantiate PythonOperator manually, and dependencies are implicitly built simply by passing outputs into inputs (transform(extract())).


Seeing Both in the Airflow UI

Once I deployed both DAG files to my local environment running on WSL, they popped up immediately in my Airflow Web UI dashboard.

Even though the code looks completely different, Airflow creates the exact same visual graph architecture for both under the hood.

Traditional DAG
Traditional DAG

When I ran them, checking the logs for the TaskFlow DAG showed how cleanly it handled the data context without any missing parameters.


Which One Should You Use?

After building this massive API project using both paradigms, here is my takeaway for fellow beginners:

  • Use TaskFlow API whenever you are working strictly with Python functions (PythonOperator). It reduces boilerplate code significantly, makes your code highly readable, and saves you from the headache of managing raw XCom keys.
  • Use Standard Operators when you need to interact with external tools directly via specialized operators (like PostgresOperator, S3CreateObjectOperator, or BashOperator). TaskFlow is amazing for Python-native workflows, but standard classes are still essential for interacting with cloud infra and heavy enterprise databases.

What's Next?

Now that our pipeline can be scheduled and orchestrated automatically, another major question popped up: Where do we deploy this safely so it runs the exact same way on a remote server as it does on my local machine? Right now, everything is relying on my specific local python setup and WSL configuration. If I send this code to a teammate, they might hit a wall of environment errors.

In the next part of this series, we are diving into Docker to containerize our database and ETL processes so they can run seamlessly anywhere!

Which style do you prefer writing in Airflow? Let me know your thoughts in the comments!

Top comments (0)