DEV Community

Anthony Gicheru
Anthony Gicheru

Posted on

Refactoring Airflow Pipelines: From PythonOperator to TaskFlow

Actually Embracing TaskFlow After a Year of Doing It the “Old Way”

1. Introduction: This Isn’t New… But It Feels New

If you’ve been using Airflow for a while-like I have-you probably didn’t start with the TaskFlow API.

You likely started with the classic Airflow 2.x style:

  • PythonOperator
  • **kwargs
  • ti.xcom_push() and ti.xcom_pull()
  • Explicit task chaining with >>

I spent over a year building pipelines this way. And to be clear-it works. It’s stable, production-ready, and widely used.

But here’s the interesting part:

The TaskFlow API has existed since Airflow 2.0. I just didn’t fully adopt it.

Honestly, I ignored TaskFlow for a long time because I thought it was just ‘syntactic sugar’. That’s more common than people admit.

Most production systems and tutorials still rely on operators, so you naturally stay in that pattern. It’s only later-when readability and maintainability start to matter-that TaskFlow becomes interesting.

And once it clicks, it changes how you think about Airflow.


2. Core Concepts: Same Engine, Different Experience

TaskFlow doesn’t replace Airflow concepts-it abstracts them.

You still work with:

  • Tasks
  • DAGs
  • Scheduling
  • XComs

The difference is how you express them.

Traditional Approach

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract(**kwargs):
    data = [1, 2, 3]
    kwargs['ti'].xcom_push(key='data', value=data)

def transform(**kwargs):
    ti = kwargs['ti']
    data = ti.xcom_pull(task_ids='extract', key='data')
    return [x * 2 for x in data]

with DAG('traditional_dag', start_date=datetime(2023, 1, 1)) as dag:

    t1 = PythonOperator(task_id='extract', python_callable=extract)
    t2 = PythonOperator(task_id='transform', python_callable=transform)

    t1 >> t2
Enter fullscreen mode Exit fullscreen mode

This works, but it introduces a lot of orchestration boilerplate into your business logic.

TaskFlow Approach

from airflow.sdk import dag, task
from datetime import datetime

@dag(start_date=datetime(2023, 1, 1), schedule='@daily', catchup=False)
def taskflow_dag():

    @task
    def extract():
        return [1, 2, 3]

    @task
    def transform(data):
        return [x * 2 for x in data]

    transform(extract())

dag = taskflow_dag()
Enter fullscreen mode Exit fullscreen mode

This feels simpler because it is.

TaskFlow removes explicit XCom handling and lets function returns define data flow.


3. The Real Shift: From Wiring Tasks to Modeling Data Flow

With the traditional approach, your mental model looks like this:

Task A - XCom - Task B - XCom - Task C
Enter fullscreen mode Exit fullscreen mode

With TaskFlow, it becomes:

data = extract()
result = transform(data)
Enter fullscreen mode Exit fullscreen mode

Airflow DAG: Traditional vs Taskflow

Same execution engine. Different abstraction.

The shift is from task orchestration to data flow composition.

4. XComs: Manual vs Automatic

Manual XComs

def extract(**kwargs):
    kwargs['ti'].xcom_push(key='data', value=[1, 2, 3])
Enter fullscreen mode Exit fullscreen mode
def transform(**kwargs):
    ti = kwargs['ti']
    data = ti.xcom_pull(task_ids='extract', key='data')
Enter fullscreen mode Exit fullscreen mode

You manage everything explicitly.

TaskFlow XComs

@task
def extract():
    return [1, 2, 3]

@task
def transform(data):
    return [x * 2 for x in data]
Enter fullscreen mode Exit fullscreen mode

Airflow handles:

  • serialization
  • storage
  • retrieval

You focus on logic.

When You Still Need Control

TaskFlow still allows explicit control when needed:

from airflow.models.xcom_arg import XComArg

@task
def extract():
    return {"numbers": [1, 2, 3]}

@task
def transform(data):
    return [x * 2 for x in data["numbers"]]

transform(XComArg(extract()))
Enter fullscreen mode Exit fullscreen mode

5. Real-World Example: Gas Prices ETL Refactor

I didn’t build two versions of this pipeline at once.

I originally built it using the traditional Airflow 2.x approach and later refactored it using TaskFlow.

That’s when the difference became clear.

Pipeline Overview

API - Extract gas prices - Transform - Store in PostgreSQL
Enter fullscreen mode Exit fullscreen mode

GitHub Reference

Full project: Github Link to the project

It includes both the original DAG and the TaskFlow refactor.

Traditional Version

def fetch_gas_prices(**kwargs):
    kwargs['ti'].xcom_push(key='raw_gas_data', value=decoded_data)
Enter fullscreen mode Exit fullscreen mode
def transform_gas_prices(**kwargs):
    raw_data = kwargs['ti'].xcom_pull(
        task_ids='fetch_gas_prices',
        key='raw_gas_data'
    )
Enter fullscreen mode Exit fullscreen mode

This approach tightly couples logic with Airflow internals.

Data must be serialized manually:

json_data = df.to_json(orient='records')
Enter fullscreen mode Exit fullscreen mode

TaskFlow Version

@task
def fetch_gas_prices():
    return decoded_data
Enter fullscreen mode Exit fullscreen mode
@task
def transform_gas_prices(raw_data: str):
    return df.to_json(orient='records')
Enter fullscreen mode Exit fullscreen mode

And the pipeline becomes:

raw = fetch_gas_prices()
cleaned = transform_gas_prices(raw)
store_gas_prices(cleaned)
Enter fullscreen mode Exit fullscreen mode

This reads like standard Python.

What Changed

The logic stayed the same. The structure changed completely.

Instead of manually managing XComs, data flows naturally between functions.

Before vs After

Aspect Traditional TaskFlow
Task definition PythonOperator @task
Data passing Manual XCom Automatic
Readability Medium High
Boilerplate High Low
Mental model Wiring tasks Data flow

6. Lessons From the Refactor

1. TaskFlow doesn’t remove XComs

It only hides them.

You still need to respect serialization limits:

return big_dataframe  # still not ideal
Enter fullscreen mode Exit fullscreen mode

2. Passing data is easier-but not always better

TaskFlow makes it easy to pass data between tasks, but large payloads should still live in external storage.

3. Refactoring was mostly structural

Most of the work was:

  • removing **kwargs
  • replacing XCom logic with returns
  • simplifying task boundaries

4. The biggest change is mental

The shift was not technical-it was conceptual.
From:

How do I connect tasks?
to:

How does data flow through this pipeline?

7. Pitfalls to Avoid

  • Don’t push large objects through XCom
  • Don’t mix styles without intention
  • Don’t overuse TaskFlow just because it’s cleaner
  • Don’t forget serialization still exists

8. Conclusion

TaskFlow isn’t new-but adopting it after using the traditional approach makes its benefits clearer.

It moves you from writing orchestration-heavy DAGs to writing clean Python workflows.

And that shift improves:

  • readability
  • maintainability
  • reasoning about pipelines

Key Takeaways

  • TaskFlow simplifies DAG structure without changing Airflow’s core engine
  • XComs still exist but are abstracted
  • The real improvement is cleaner data flow modeling
  • Refactoring old DAGs is one of the best ways to understand it

Top comments (0)