DEV Community

Cover image for Beyond the UI: Mastering Airflow 3 with Bare-Metal Postgres and TaskFlow
Damaa-C
Damaa-C

Posted on

Beyond the UI: Mastering Airflow 3 with Bare-Metal Postgres and TaskFlow

In the world of Data Engineering, there is a temptation to rely entirely on "magic" the UI buttons and high-level abstractions that hide how things work. However, when a pipeline fails or a scheduler hits an AirflowTaskTimeout, the engineer who understands the "bare metal" is the one who fixes it.

In this guide, we are going back to basics: configuring Airflow 3 via the .cfg, setting up a production-grade Aiven Postgres bridge, and demystifying the mechanics of XComs through the lens of kwargs.

The Foundation: Hard-Coding Your Database

Airflow is not just a scheduler; it is a database-backed application. Before writing a single DAG, your metadata environment must be solid.

Preparing the Handshake

Whether you are using a local instance or a managed cloud provider like Aiven, your Postgres environment needs a dedicated identity. Isolation is key to security in Data Engineering.

SQL
-- Execute in your Postgres terminal to set up the Airflow Backend
CREATE USER airflow_user WITH PASSWORD 'secure_password';
CREATE DATABASE airflow_db;
GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;

Enter fullscreen mode Exit fullscreen mode

Installing the Translators (Drivers)

Airflow doesn't speak "Postgres" natively; it uses Python drivers.

  • psycopg2-binary: The standard synchronous driver for most operations.
  • asyncpg: Essential for Airflow 3’s asynchronous capabilities and high-performance execution loops.
pip install psycopg2-binary asyncpg
Enter fullscreen mode Exit fullscreen mode

Configuration as Code: The airflow.cfg

While the Airflow UI is convenient, defining connections in airflow.cfgfollows the Configuration as Code (CaC) principle. This makes your environment reproducible, portable, and easier to manage in a CI/CD pipeline.

Locate your airflow.cfg (usually in ~/airflow/) and find the [database] and[connections]sections.

[database]
# Pointing Airflow's own metadata to your local or remote Postgres
sql_alchemy_conn = postgresql+psycopg2://airflow_user:password@localhost:5432/airflow_db

[connections]
# Defining an external Aiven Postgres connection via URI
# Note: 'sslmode=require' is critical for cloud security
AIRFLOW_CONN_AIVEN_PROD = "postgres://avnadmin:pass@pg-damaa.aivencloud.com:24848/defaultdb?sslmode=require"
Enter fullscreen mode Exit fullscreen mode

The Data Bridge: XComs Decoded

In Airflow, tasks run in isolation. They cannot share Python variables in memory. To move small amounts of data (metadata, IDs, or status flags) between tasks, we use** XComs** (Cross-Communications).

The "Old School" Manual Way: kwargs['ti']

In traditional PythonOperator development, every function receives a "suitcase" called kwargs. Inside this suitcase is the Task Instance (ti), which acts as your API to the XCom table in Postgres.

def extract_ticker_data(**kwargs):
    # Pull the Task Instance from the context suitcase
    ti = kwargs['ti'] 
    scraped_data = {"ticker": "BTC", "price": 64000}

    # Manually pushing into the Postgres xcom table
    ti.xcom_push(key='raw_crypto_data', value=scraped_data)

def transform_ticker_data(**kwargs):
    ti = kwargs['ti']
    # Manually reaching into the xcom table to pull data from a specific task
    data = ti.xcom_pull(task_ids='extract_task', key='raw_crypto_data')

    processed_price = data['price'] * 130
    ti.xcom_push(key='price_kes', value=processed_price)
Enter fullscreen mode Exit fullscreen mode

The "Modern" Way: TaskFlow API

Airflow 3 emphasizes the TaskFlow API (@task). Here, the complexity of ti.xcom_pull is abstracted away. Airflow treats the return value of a function as an implicit XCom push.

@task
def extract():
    return {"ticker": "BTC", "price": 64000} # Automatic XCom Push to 'return_value'

@task
def transform(data):
    # Airflow automatically pulls the XCom and passes it as 'data'
    return data['price'] * 130
Enter fullscreen mode Exit fullscreen mode

The "Double-Write" Conflict: Why return and xcom_push Clash

This is the most critical technical nuance for any Airflow developer. You cannot useti.xcom_push and return for the same value without consequences.

The Technical Conflict

When you use TaskFlow, Airflow maps the return statement to a specific XCom key: return_value.

If you write:

@task
def my_task(**kwargs):
    ti = kwargs['ti']
    data = "Success_Flag"

    ti.xcom_push(key='return_value', value=data) # Manual Write #1
    return data                                  # Automatic Write #2
Enter fullscreen mode Exit fullscreen mode

What happens in Postgres?

  • Redundant SQL Commands: Airflow issues two separate SQL INSERT or UPDATE commands to the xcom table for the exact same task and key.

  • Database Bloat: You are doubling the metadata overhead for every single task execution.

  • Race Conditions: In high-concurrency environments, these redundant writes can cause locking issues in your Postgres backend, leading to the very "timeouts" you want to avoid.

The Best Practice

  • Use return for the primary output of your task. It is cleaner and optimized for TaskFlow.

  • Use ti.xcom_push ONLY if you need to push additional, separate pieces of metadata (e.g., a row count and a file path) that are not the main return object.

Conclusion: Engineering for Performance

To build resilient pipelines in Airflow 3, you must respect the metadata database. By configuring your connections at the .cfg level and understanding the "double-write" conflict of XComs, you ensure your Postgres backend remains lean and your scheduler remains fast.

Mastering the manual kwargs['ti']gives you the control; mastering the return statement gives you the efficiency.

Top comments (0)