As organizations become increasingly data-driven, the scale of their pipelines has grown from modest daily batches to continuous, high-volume streams. What appears to be overwhelming complexity is, in practice, a matter of structure and discipline; imposed through the right tools. Apache Airflow embodies this principle as a batch-oriented orchestration framework, enabling the construction of scheduled, reliable data pipelines in Python while seamlessly integrating the diverse technologies that define modern data ecosystems.
What Airflow actually does
Apache Airflow is an open source tool used to write, schedule, and manage workflows as code. Whenever you have actions that depend on one another and must be performed in a specific order, you can define them as a workflow in Airflow.
Workflows in Airflow are modelled as DAGs (Directed Acyclic Graphs). A DAG is simply a collection of tasks with defined dependencies between them. The "acyclic" part just means there are no circular loops; Task A might trigger Task B, but Task B can never circle back and trigger Task A. This constraint keeps pipelines predictable and debuggable.
At its core, Airflow does three things well: it defines workflows as DAGs, schedules and executes tasks on a timeline and tracks state and dependencies so you always know what ran, when, and whether it succeeded.
Setting up Airflow on Linux
A stable Airflow deployment starts with a clean Python environment. Skipping this step is a common source of dependency conflicts down the road.
Start by creating and activating a virtual environment:
python3 -m venv airflow_env
source airflow_env/bin/activate
Then install Airflow and initialise its metadata database. This is the internal database Airflow uses to track DAG runs, task states, and logs:
pip install apache-airflow
airflow db init
Once that's done, start the webserver and scheduler as separate processes:
airflow webserver --port 8080
airflow scheduler
The web UI will be available at http://localhost:8080. If this is a fresh installation, you'll need to create an admin user before you can log in:
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com
Getting your DAG to appear in the UI
A surprisingly common stumbling block is writing a DAG that simply doesn't show up in the Airflow interface. If that happens to you, run through this checklist before assuming something deeper is wrong.
Your DAG file must live in the ~/airflow/dags/ directory, have a .py extension, contain no syntax errors and instantiate the DAG object at the global scope of the file (not inside a function). After placing the file correctly, restart the scheduler and refresh the UI. The scheduler needs to re-parse the DAGs folder before new files become visible.
Designing the ETL pipeline
The pipeline in this guide follows a classic three-stage structure:
- Extract — fetch stock data from an external API (such as Polygon.io)
- Transform — clean and reshape the data using Pandas
- Load — write the results to a PostgreSQL database
TaskFlow API vs. Operators and XComs
Airflow offers two styles for wiring tasks together. The TaskFlow API uses Python decorators (@task) and handles data passing automatically; it's clean and concise, but abstracts away some of what's happening under the hood. Operators with XComs, by contrast, give you explicit control over how data flows between tasks.
XCom (short for "cross-communication") is Airflow's built-in message bus for passing small pieces of data between tasks. Here's how the pattern looks across all three stages:
# Extract
def extract(**context):
data = fetch_api_data()
context['ti'].xcom_push(key='raw_data', value=data)
# Transform
def transform(**context):
raw = context['ti'].xcom_pull(key='raw_data')
df = process_data(raw)
context['ti'].xcom_push(key='clean_data', value=df.to_json())
# Load
def load(**context):
data = context['ti'].xcom_pull(key='clean_data')
df = pd.read_json(data)
df.to_sql(...)
One important caveat: XCom is designed for small payloads only. If you're passing large DataFrames between tasks, store them in external storage (an S3 bucket or a staging table) and pass only a reference through XCom.
Connecting to PostgreSQL
Airflow manages external connections through its Connections store. Add your PostgreSQL connection via the CLI:
airflow connections add postgres_default \
--conn-uri "postgresql+psycopg2://airflow_user:password@localhost:5432/stocks_db"
In your DAG, reference this connection by ID:
PostgresHook(postgres_conn_id="postgres_default")
The connection ID in your DAG must match exactly what you registered in Airflow's Connections store. If they don't match, the task will fail with a connection error — and the error message won't always make it obvious why.
Fixing database permission errors
If your pipeline fails with a message like ERROR: permission denied for schema public, the issue is almost certainly that your database user lacks the necessary privileges. Fix it by granting the required permissions in PostgreSQL:
GRANT ALL ON SCHEMA public TO airflow_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO airflow_user;
This is easy to miss because the user might be able to connect to the database but still be blocked from creating or writing to tables in the public schema.
Handling API constraints
When fetching data from an external API, you may encounter a 403 NOT_AUTHORIZED error even when your credentials are correct. This usually means your API subscription doesn't cover the time range you're requesting. Many free or basic tiers restrict historical data access.
A simple fix is to narrow your query window to recent data only:
end = datetime.now(timezone.utc)
start = end - timedelta(hours=1)
If you need broader historical access, you'll need to upgrade your API plan.
Scheduling and time management
Airflow operates entirely in UTC internally, which is worth keeping in mind when debugging timing issues. When defining a DAG, always specify start_date, schedule_interval, and catchup explicitly:
start_date=datetime(2025, 1, 1)
schedule_interval='@hourly'
catchup=False
Setting catchup=False is particularly important. Without it, Airflow will attempt to backfill all the runs it "missed" between start_date and now, which can trigger dozens or hundreds of unexpected runs the first time you enable a DAG.
The DAG execution lifecycle
Understanding what happens when a DAG runs helps enormously when something goes wrong. The sequence is: the scheduler parses your DAG file, creates a DAG Run, queues the individual tasks, hands them to the executor, and then updates task states (success, failed, retrying) as they complete.
You can configure retry behaviour in the default_args dictionary:
default_args = {
"retries": 2,
"retry_delay": timedelta(minutes=3),
}
This means any failed task will automatically retry twice, with a three-minute gap between attempts.
Observability and Debugging
Airflow's UI gives you a clear view into what's happening at every level. The Graph View shows task dependencies at a glance, the Tree View shows run history over time, and clicking into any task gives you access to its full execution logs.
You can also pull logs directly from the command line:
airflow tasks logs <dag_id> <task_id> <execution_date>
When debugging a failure, always start with the task logs. They usually tell you exactly what went wrong, whether it's a Python exception, a connection error, or an API rejection.
Common failure points at a glance
Most issues in an Airflow workflow fall into a handful of categories:
DAG not appearing in UI — wrong directory, syntax error in the file, or the scheduler hasn't been restarted since the file was added.
Connection issues — mismatched connection ID between your DAG code and Airflow's Connections store, or incorrect credentials.
Database permission errors — the database user hasn't been granted the right privileges on the target schema.
API failures — rate limiting, subscription restrictions, or requesting data outside the allowed time window.
Moving toward production
Running Airflow locally on SQLite is fine for development, but not suitable for production use. When you're ready to take things further, consider these changes:
Swap SQLite for PostgreSQL or MySQL as Airflow's metadata database — SQLite has locking issues under concurrent load. Move from the default SequentialExecutor to a CeleryExecutor or KubernetesExecutor to run tasks in parallel. Add a remote logging backend (S3, Elasticsearch) so logs persist beyond the local machine. And implement secrets management — environment variables, HashiCorp Vault, or Airflow's built-in Secrets Backend — rather than storing credentials in plain text.
Insight
At a high level, building this pipeline involved: setting up a Linux environment with Airflow, configuring the metadata database, writing a DAG using Operators and XComs, integrating an external stock data API, transforming the results with Pandas, loading into PostgreSQL with the right permissions, wiring up Airflow connections, and monitoring everything through the scheduler and web UI.
The patterns used here include modular tasks, explicit data flow, external system integration, scheduled automation and observability. These are essential procedures that are also applicable at production-level data pipelines. The choice to use Operators over the TaskFlow API gives one a deeper understanding of how Airflow manages execution and state, which pays dividends when things inevitably break in unexpected ways.
Top comments (0)