In the world of Data Engineering, there is a temptation to rely entirely on "magic" the UI buttons and high-level abstractions that hide how things work. However, when a pipeline fails or a scheduler hits an AirflowTaskTimeout, the engineer who understands the "bare metal" is the one who fixes it.
In this guide, we are going back to basics: configuring Airflow 3 via the .cfg, setting up a production-grade Aiven Postgres bridge, and demystifying the mechanics of XComs through the lens of kwargs.
The Foundation: Hard-Coding Your Database
Airflow is not just a scheduler; it is a database-backed application. Before writing a single DAG, your metadata environment must be solid.
Preparing the Handshake
Whether you are using a local instance or a managed cloud provider like Aiven, your Postgres environment needs a dedicated identity. Isolation is key to security in Data Engineering.
SQL
-- Execute in your Postgres terminal to set up the Airflow Backend
CREATE USER airflow_user WITH PASSWORD 'secure_password';
CREATE DATABASE airflow_db;
GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;
Installing the Translators (Drivers)
Airflow doesn't speak "Postgres" natively; it uses Python drivers.
- psycopg2-binary: The standard synchronous driver for most operations.
- asyncpg: Essential for Airflow 3ās asynchronous capabilities and high-performance execution loops.
pip install psycopg2-binary asyncpg
Configuration as Code: The airflow.cfg
While the Airflow UI is convenient, defining connections in airflow.cfgfollows the Configuration as Code (CaC) principle. This makes your environment reproducible, portable, and easier to manage in a CI/CD pipeline.
Locate your airflow.cfg (usually in ~/airflow/) and find the [database] and[connections]sections.
[database]
# Pointing Airflow's own metadata to your local or remote Postgres
sql_alchemy_conn = postgresql+psycopg2://airflow_user:password@localhost:5432/airflow_db
[connections]
# Defining an external Aiven Postgres connection via URI
# Note: 'sslmode=require' is critical for cloud security
AIRFLOW_CONN_AIVEN_PROD = "postgres://avnadmin:pass@pg-damaa.aivencloud.com:24848/defaultdb?sslmode=require"
The Data Bridge: XComs Decoded
In Airflow, tasks run in isolation. They cannot share Python variables in memory. To move small amounts of data (metadata, IDs, or status flags) between tasks, we use** XComs** (Cross-Communications).
The "Old School" Manual Way: kwargs['ti']
In traditional PythonOperator development, every function receives a "suitcase" called kwargs. Inside this suitcase is the Task Instance (ti), which acts as your API to the XCom table in Postgres.
def extract_ticker_data(**kwargs):
# Pull the Task Instance from the context suitcase
ti = kwargs['ti']
scraped_data = {"ticker": "BTC", "price": 64000}
# Manually pushing into the Postgres xcom table
ti.xcom_push(key='raw_crypto_data', value=scraped_data)
def transform_ticker_data(**kwargs):
ti = kwargs['ti']
# Manually reaching into the xcom table to pull data from a specific task
data = ti.xcom_pull(task_ids='extract_task', key='raw_crypto_data')
processed_price = data['price'] * 130
ti.xcom_push(key='price_kes', value=processed_price)
The "Modern" Way: TaskFlow API
Airflow 3 emphasizes the TaskFlow API (@task). Here, the complexity of ti.xcom_pull is abstracted away. Airflow treats the return value of a function as an implicit XCom push.
@task
def extract():
return {"ticker": "BTC", "price": 64000} # Automatic XCom Push to 'return_value'
@task
def transform(data):
# Airflow automatically pulls the XCom and passes it as 'data'
return data['price'] * 130
The "Double-Write" Conflict: Why return and xcom_push Clash
This is the most critical technical nuance for any Airflow developer. You cannot useti.xcom_push and return for the same value without consequences.
The Technical Conflict
When you use TaskFlow, Airflow maps the return statement to a specific XCom key: return_value.
If you write:
@task
def my_task(**kwargs):
ti = kwargs['ti']
data = "Success_Flag"
ti.xcom_push(key='return_value', value=data) # Manual Write #1
return data # Automatic Write #2
What happens in Postgres?
Redundant SQL Commands: Airflow issues two separate SQL INSERT or UPDATE commands to the xcom table for the exact same task and key.Database Bloat: You are doubling the metadata overhead for every single task execution.Race Conditions: In high-concurrency environments, these redundant writes can cause locking issues in your Postgres backend, leading to the very "timeouts" you want to avoid.
The Best Practice
Use
returnfor the primary output of your task. It is cleaner and optimized for TaskFlow.Use
ti.xcom_pushONLY if you need to push additional, separate pieces of metadata (e.g., a row count and a file path) that are not the main return object.
Conclusion: Engineering for Performance
To build resilient pipelines in Airflow 3, you must respect the metadata database. By configuring your connections at the .cfg level and understanding the "double-write" conflict of XComs, you ensure your Postgres backend remains lean and your scheduler remains fast.
Mastering the manual kwargs['ti']gives you the control; mastering the return statement gives you the efficiency.
Top comments (0)