DEV Community

Cover image for Airflow vs n8n for API-driven data pipelines
Raizan
Raizan

Posted on • Originally published at chasebot.online

Airflow vs n8n for API-driven data pipelines

What You'll Need

  • n8n Cloud or self-hosted n8n instance
  • Hetzner VPS or Contabo VPS for hosting (if self-hosting)
  • DigitalOcean as an alternative hosting option
  • Python 3.8+ (for Airflow)
  • PostgreSQL or MySQL database
  • Basic understanding of REST APIs and workflow automation concepts

Table of Contents


Understanding Airflow and n8n

I've spent the last few years building data pipelines, and the Airflow vs n8n question keeps coming up. Both tools solve the same fundamental problem—orchestrating tasks and workflows—but they approach it from completely different angles.

Apache Airflow is a workflow orchestration platform built by Airbnb, now an Apache project. It's written in Python, code-first, and treats workflows as directed acyclic graphs (DAGs). Think of it as a framework for engineers who want maximum control and don't mind writing Python code to define their pipelines.

n8n, on the other hand, is a visual workflow automation platform that's been gaining serious traction. It's node-based, has a browser-friendly UI, and integrates with hundreds of APIs out of the box. You can build complex workflows without touching code, though it supports code nodes when you need them.

For API-driven data pipelines specifically, this distinction matters. A lot.


Architecture and Deployment Models

Here's where the philosophies diverge.

Airflow's Architecture:

Airflow uses a distributed architecture with a scheduler, executor, and metadata database. The scheduler reads your DAGs and decides what to execute. Executors actually run the tasks—you can use the LocalExecutor for development, CeleryExecutor for distributed task queues, or KubernetesExecutor for container-based scaling.

This complexity gives you power. You get fine-grained control over task dependencies, retries, backoffs, and custom operators. But it also means you're responsible for maintaining multiple components.

n8n's Architecture:

n8n runs as a single application (though it supports clustering). It uses a database to store workflow definitions and execution history. There's no separate scheduler/executor split—the application handles orchestration internally. When you deploy n8n Cloud, you're getting fully managed infrastructure. If you self-host on Hetzner VPS or DigitalOcean, it's a single Docker container or Node.js process.

For API-driven pipelines, this matters. n8n's simplicity means you can have a workflow hitting 50 different APIs up and running in an afternoon. Airflow would take longer to set up, but it'd handle 50,000 API calls across thousands of tasks more elegantly at scale.


Building Your First API-Driven Pipeline

Let me show you how both tools handle a common scenario: fetch data from an API, transform it, and save it to a database.

Airflow Approach

In Airflow, you define a DAG in Python:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
import requests
import json
import psycopg2
from psycopg2.extras import execute_values

default_args = {
    'owner': 'data_team',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2024, 1, 1),
}

dag = DAG(
    'fetch_user_data_pipeline',
    default_args=default_args,
    description='Fetch user data from API and store in PostgreSQL',
    schedule_interval='0 */6 * * *',
    catchup=False,
)

def fetch_from_api():
    url = 'https://jsonplaceholder.typicode.com/users'
    try:
        response = requests.get(url, timeout=30)
        response.raise_for_status()
        data = response.json()
        with open('/tmp/user_data.json', 'w') as f:
            json.dump(data, f)
        print(f"Successfully fetched {len(data)} users")
        return len(data)
    except requests.exceptions.RequestException as e:
        print(f"API request failed: {e}")
        raise

def transform_and_validate(ti):
    with open('/tmp/user_data.json', 'r') as f:
        users = json.load(f)

    validated_users = []
    for user in users:
        if all(key in user for key in ['id', 'name', 'email', 'phone']):
            validated_users.append({
                'user_id': user['id'],
                'name': user['name'],
                'email': user['email'],
                'phone': user['phone'],
                'company': user.get('company', {}).get('name', 'Unknown'),
                'fetched_at': datetime.now().isoformat()
            })

    with open('/tmp/validated_users.json', 'w') as f:
        json.dump(validated_users, f)
    print(f"Validated {len(validated_users)} out of {len(users)} users")
    return len(validated_users)

def load_to_database(ti):
    try:
        conn = psycopg2.connect(
            host='localhost',
            database='pipeline_db',
            user='pipeline_user',
            password='secure_password',
            port=5432
        )
        cursor = conn.cursor()

        with open('/tmp/validated_users.json', 'r') as f:
            users = json.load(f)

        insert_query = """
        INSERT INTO users (user_id, name, email, phone, company, fetched_at)
        VALUES %s
        ON CONFLICT (user_id) DO UPDATE SET
            name = EXCLUDED.name,
            email = EXCLUDED.email,
            phone = EXCLUDED.phone,
            company = EXCLUDED.company,
            fetched_at = EXCLUDED.fetched_at
        """

        values = [
            (u['user_id'], u['name'], u['email'], u['phone'], u['company'], u['fetched_at'])
            for u in users
        ]

        execute_values(cursor, insert_query, values)
        conn.commit()
        cursor.close()
        conn.close()
        print(f"Loaded {len(users)} users into database")
        return len(users)
    except psycopg2.Error as e:
        print(f"Database error: {e}")
        raise

task_fetch = PythonOperator(
    task_id='fetch_api_data',
    python_callable=fetch_from_api,
    dag=dag,
)

task_transform = PythonOperator(
    task_id='transform_data',
    python_callable=transform_and_validate,
    dag=dag,
)

task_load = PythonOperator(
    task_id='load_to_postgres',
    python_callable=load_to_database,
    dag=dag,
)

task_cleanup = BashOperator(
    task_id='cleanup_temp_files',
    bash_command='rm -f /tmp/user_data.json /tmp/validated_users.json',
    dag=dag,
)

task_fetch >> task_transform >> task_load >> task_cleanup
Enter fullscreen mode Exit fullscreen mode

This DAG runs every 6 hours, fetches user data from an API, validates it, loads it into PostgreSQL, and cleans up temporary files. If any task fails, Airflow retries it twice with 5-minute delays.

n8n Approach

With n8n, you'd build the same workflow visually, but here's how you'd export and manage it via JSON (which is what n8n stores internally):

{
  "name": "Fetch User Data Pipeline",
  "nodes": [
    {
      "parameters": {
        "triggerType": "interval",
        "unit": "hours",
        "value": 6
      },
      "id": "schedule-trigger",
      "name": "Schedule",
      "type": "n8n-nodes-base.scheduleTrigger",
      "typeVersion": 1,
      "position": [250, 300]
    },
    {
      "parameters": {
        "url": "https://jsonplaceholder.typicode.com/users",
        "method": "GET",
        "authentication": "none",
        "responseFormat": "json"
      },
      "id": "http-request-api",
      "name": "Fetch User API",
      "type": "n8n-nodes-base.httpRequest",
      "typeVersion": 4,
      "position": [450, 300]
    },
    {
      "parameters": {
        "functionCode": "return items.map(item => {\n  const user = item.json;\n  if (!user.id || !user.name || !user.email || !user.phone) {\n    return null;\n  }\n  return {\n    json: {\n      user_id: user.id,\n      name: user.name,\n      email: user.email,\n      phone: user.phone,\n      company: user.company?.name || 'Unknown',\n      fetched_at: new Date().toISOString()\n    }\n  };\n}).filter(item => item !== null);"
      },
      "id": "code-transform",
      "name": "Transform & Validate",
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [650, 300]
    },
    {
      "parameters": {
        "host": "localhost",
        "port": 5432,
        "database": "pipeline_db",
        "user": "pipeline_user",
        "password": "secure_password",
        "ssl": false,
        "query": "INSERT INTO users (user_id, name, email, phone, company, fetched_at) VALUES (@user_id, @name, @email, @phone, @company, @fetched_at) ON CONFLICT (user_id) DO UPDATE SET name=EXCLUDED.name, email=EXCLUDED.email, phone=EXCLUDED.phone, company=EXCLUDED.company, fetched_at=EXCLUDED.fetched_at"
      },
      "id": "postgres-insert",
      "name": "Load to PostgreSQL",
      "type": "n8n-nodes-base.postgres",
      "typeVersion": 2,
      "position": [850, 300]
    }
  ],
  "connections": {
    "schedule-trigger": {
      "main": [[{ "node": "http-request-api", "type": "main", "index": 0 }]]
    },
    "http-request-api": {
      "main": [[{ "node": "code-transform", "type": "main", "index": 0 }]]
    },
    "code-transform": {
      "main": [[{ "node": "postgres-insert", "type": "main", "index": 0 }]]
    }
  },
  "settings": {
    "saveDataErrorExecution": "all",
    "save
Enter fullscreen mode Exit fullscreen mode

Originally published on Automation Insider.

Top comments (0)