Data engineers know the pain: cron jobs that fail silently, Python ETL scripts that break on schema changes, Airflow clusters that need a dedicated ops team just to keep running.
n8n is a self-hosted workflow automation platform that gives you a visual pipeline builder with scheduling, retry logic, error notifications, and 400+ integrations — without per-operation pricing or vendor lock-in. If you already self-host tools, n8n fits right in: it runs in Docker, workflows are JSON you can version in Git, and there's no usage meter ticking.
Here are 5 data engineering workflows you can steal today. Each comes with import-ready JSON.
1. Daily ETL Pipeline with Error Alerts
What it does: Pulls data from a source API on a schedule, transforms and validates it in a Code node, upserts into Postgres, and sends a Slack alert if anything fails.
Why it beats cron + script: Built-in retry, visual step-by-step debugging, structured error output, and Slack alerts without wrapping everything in try/except.
Nodes: Schedule Trigger → HTTP Request (paginated) → Code (transform + validate) → Postgres (upsert) → IF (error?) → Slack (#data-alerts)
{
"name": "Daily ETL Pipeline with Error Alerts",
"nodes": [
{
"parameters": {
"rule": { "interval": [{ "field": "cronExpression", "expression": "0 5 * * *" }] }
},
"name": "5AM Daily",
"type": "n8n-nodes-base.scheduleTrigger",
"typeVersion": 1,
"position": [250, 300]
},
{
"parameters": {
"url": "https://api.yoursource.com/v1/records",
"authentication": "headerAuth",
"options": {
"pagination": {
"paginationType": "offsetPaginate",
"limitField": "limit",
"offsetField": "offset",
"pageSize": 200
}
}
},
"name": "Fetch Source Data",
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4,
"position": [450, 300]
},
{
"parameters": {
"jsCode": "const items = $input.all().flatMap(i => i.json.data || [i.json]);\nconst valid = items.map(r => ({\n id: r.id,\n name: r.name?.trim() || null,\n amount: parseFloat(r.amount) || 0,\n created_at: new Date(r.created_at).toISOString(),\n loaded_at: new Date().toISOString()\n})).filter(r => r.id);\nreturn valid.map(json => ({ json }));"
},
"name": "Transform + Validate",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [650, 300]
},
{
"parameters": {
"operation": "upsert",
"schema": "public",
"table": "source_records",
"columns": {
"mappingMode": "autoMapInputData",
"matchingColumns": ["id"]
}
},
"name": "Upsert to Postgres",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2,
"position": [850, 300]
}
],
"connections": {
"5AM Daily": { "main": [[{ "node": "Fetch Source Data", "type": "main", "index": 0 }]] },
"Fetch Source Data": { "main": [[{ "node": "Transform + Validate", "type": "main", "index": 0 }]] },
"Transform + Validate": { "main": [[{ "node": "Upsert to Postgres", "type": "main", "index": 0 }]] }
}
}
Customise: Replace the HTTP Request URL and auth. Adjust the Code node transform for your schema. Change the Postgres table name. Add an error handler node for Slack alerts on failure.
2. Data Quality Monitor
What it does: After your ETL loads data, this workflow queries the warehouse for quality issues — nulls, duplicates, out-of-range values, unexpected row counts. Sends a Slack digest only when issues exist. Silent on clean runs.
Nodes: Schedule → Postgres (quality check queries) → Code (evaluate rules, build report) → IF (issues?) → Slack (#data-quality) → Gmail (data lead)
{
"name": "Data Quality Monitor",
"nodes": [
{
"parameters": {
"rule": { "interval": [{ "field": "cronExpression", "expression": "0 6 * * *" }] }
},
"name": "6AM Daily",
"type": "n8n-nodes-base.scheduleTrigger",
"typeVersion": 1,
"position": [250, 300]
},
{
"parameters": {
"operation": "executeQuery",
"query": "SELECT\n COUNT(*) as total_rows,\n COUNT(*) FILTER (WHERE name IS NULL) as null_names,\n COUNT(*) FILTER (WHERE amount < 0) as negative_amounts,\n COUNT(*) FILTER (WHERE created_at > NOW()) as future_dates,\n COUNT(*) - COUNT(DISTINCT id) as duplicate_ids\nFROM source_records\nWHERE loaded_at >= NOW() - INTERVAL '25 hours'"
},
"name": "Quality Check Queries",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2,
"position": [450, 300]
},
{
"parameters": {
"jsCode": "const q = $input.first().json;\nconst issues = [];\nif (q.null_names > 0) issues.push('⚠️ ' + q.null_names + ' null names');\nif (q.negative_amounts > 0) issues.push('⚠️ ' + q.negative_amounts + ' negative amounts');\nif (q.future_dates > 0) issues.push('🚨 ' + q.future_dates + ' future-dated records');\nif (q.duplicate_ids > 0) issues.push('🚨 ' + q.duplicate_ids + ' duplicate IDs');\nif (q.total_rows < 100) issues.push('⚠️ Low row count: ' + q.total_rows + ' (expected 100+)');\nreturn [{ json: { issues, issueCount: issues.length, total_rows: q.total_rows, summary: issues.join('\\n') } }];"
},
"name": "Evaluate Rules",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [650, 300]
},
{
"parameters": {
"conditions": {
"number": [{ "value1": "={{ $json.issueCount }}", "operation": "larger", "value2": 0 }]
}
},
"name": "Issues Found?",
"type": "n8n-nodes-base.if",
"typeVersion": 1,
"position": [850, 300]
},
{
"parameters": {
"channel": "#data-quality",
"text": "=*Data Quality Alert — {{ new Date().toDateString() }}*\n\nTable: source_records | Rows loaded: {{ $json.total_rows }}\n\n{{ $json.summary }}\n\nCheck the warehouse and ETL logs."
},
"name": "Slack Alert",
"type": "n8n-nodes-base.slack",
"typeVersion": 1,
"position": [1050, 200]
}
],
"connections": {
"6AM Daily": { "main": [[{ "node": "Quality Check Queries", "type": "main", "index": 0 }]] },
"Quality Check Queries": { "main": [[{ "node": "Evaluate Rules", "type": "main", "index": 0 }]] },
"Evaluate Rules": { "main": [[{ "node": "Issues Found?", "type": "main", "index": 0 }]] },
"Issues Found?": { "main": [[{ "node": "Slack Alert", "type": "main", "index": 0 }], []] }
}
}
Pro tip: Add more checks over time — referential integrity, freshness (no new rows in 6h), distribution outliers. Each rule is one if statement in the Code node.
3. Paginated API-to-Warehouse Pipeline with Run Logging
What it does: Pulls data from a REST API with cursor/offset pagination, deduplicates against existing warehouse data, batch-inserts new records, and logs each pipeline run (start time, records fetched, status) to a pipeline_runs table.
Why you want run logging: When your pipeline fails at 3 AM, you want to know exactly which run failed, how many records it was processing, and how long it ran. The run log also doubles as a data freshness check.
Nodes: Schedule → HTTP Request (paginate) → Code (transform + deduplicate) → Postgres (batch upsert) → Postgres (INSERT into pipeline_runs)
{
"name": "Paginated API to Warehouse",
"nodes": [
{
"parameters": {
"rule": { "interval": [{ "field": "hours", "hoursInterval": 1 }] }
},
"name": "Hourly",
"type": "n8n-nodes-base.scheduleTrigger",
"typeVersion": 1,
"position": [250, 300]
},
{
"parameters": {
"url": "https://api.source.com/events",
"sendQuery": true,
"queryParameters": {
"parameters": [
{ "name": "limit", "value": "200" },
{ "name": "since", "value": "={{ $now.minus({ hours: 2 }).toISO() }}" }
]
},
"authentication": "headerAuth"
},
"name": "Fetch Events",
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4,
"position": [450, 300]
},
{
"parameters": {
"jsCode": "const events = $input.first().json.data || [];\nconst runStart = new Date().toISOString();\nconst transformed = events.map(e => ({\n event_id: e.id,\n event_type: e.type,\n user_id: e.user_id,\n properties: JSON.stringify(e.properties || {}),\n occurred_at: new Date(e.timestamp).toISOString(),\n loaded_at: runStart\n}));\nreturn transformed.map(json => ({ json }));"
},
"name": "Transform Events",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [650, 300]
},
{
"parameters": {
"operation": "executeQuery",
"query": "INSERT INTO pipeline_runs (source, records_fetched, run_at, status)\nVALUES ('api_events', {{ $('Fetch Events').item.json.data.length || 0 }}, NOW(), 'success')"
},
"name": "Log Run",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2,
"position": [850, 300]
}
],
"connections": {
"Hourly": { "main": [[{ "node": "Fetch Events", "type": "main", "index": 0 }]] },
"Fetch Events": { "main": [[{ "node": "Transform Events", "type": "main", "index": 0 }]] },
"Transform Events": { "main": [[{ "node": "Log Run", "type": "main", "index": 0 }]] }
}
}
4. dbt Cloud Job Trigger + Status Notifier
What it does: After your raw data lands in the warehouse, this workflow triggers a dbt Cloud job via API, waits for it to complete, then posts the result (success/failure + runtime) to Slack. Replaces manual dbt Cloud checks and custom polling scripts.
Nodes: Schedule (or HTTP trigger from ETL workflow) → HTTP Request (trigger dbt job) → Wait 5m → HTTP Request (poll run status) → IF (success?) → Slack success → Slack failure
{
"name": "dbt Cloud Trigger + Status Notifier",
"nodes": [
{
"parameters": {
"rule": { "interval": [{ "field": "cronExpression", "expression": "30 5 * * *" }] }
},
"name": "5:30 AM Daily",
"type": "n8n-nodes-base.scheduleTrigger",
"typeVersion": 1,
"position": [250, 300]
},
{
"parameters": {
"method": "POST",
"url": "https://cloud.getdbt.com/api/v2/accounts/YOUR_ACCOUNT_ID/jobs/YOUR_JOB_ID/run/",
"authentication": "headerAuth",
"sendBody": true,
"bodyParameters": {
"parameters": [{ "name": "cause", "value": "n8n scheduled trigger" }]
}
},
"name": "Trigger dbt Job",
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4,
"position": [450, 300]
},
{
"parameters": { "amount": 5, "unit": "minutes" },
"name": "Wait 5 min",
"type": "n8n-nodes-base.wait",
"typeVersion": 1,
"position": [650, 300]
},
{
"parameters": {
"url": "=https://cloud.getdbt.com/api/v2/accounts/YOUR_ACCOUNT_ID/runs/{{ $('Trigger dbt Job').item.json.data.id }}/",
"authentication": "headerAuth"
},
"name": "Poll Run Status",
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4,
"position": [850, 300]
},
{
"parameters": {
"conditions": {
"string": [{ "value1": "={{ $json.data.status_humanized }}", "operation": "equals", "value2": "Success" }]
}
},
"name": "Success?",
"type": "n8n-nodes-base.if",
"typeVersion": 1,
"position": [1050, 300]
},
{
"parameters": {
"channel": "#data-eng",
"text": "=✅ dbt job finished successfully in {{ Math.round($json.data.duration_humanized) }}s. Models updated: {{ $json.data.run_steps?.length || 'N/A' }}"
},
"name": "Slack Success",
"type": "n8n-nodes-base.slack",
"typeVersion": 1,
"position": [1250, 200]
},
{
"parameters": {
"channel": "#data-eng",
"text": "=🚨 dbt job FAILED. Status: {{ $json.data.status_humanized }}. Check: https://cloud.getdbt.com/runs/{{ $json.data.id }}"
},
"name": "Slack Failure",
"type": "n8n-nodes-base.slack",
"typeVersion": 1,
"position": [1250, 400]
}
],
"connections": {
"5:30 AM Daily": { "main": [[{ "node": "Trigger dbt Job", "type": "main", "index": 0 }]] },
"Trigger dbt Job": { "main": [[{ "node": "Wait 5 min", "type": "main", "index": 0 }]] },
"Wait 5 min": { "main": [[{ "node": "Poll Run Status", "type": "main", "index": 0 }]] },
"Poll Run Status": { "main": [[{ "node": "Success?", "type": "main", "index": 0 }]] },
"Success?": {
"main": [
[{ "node": "Slack Success", "type": "main", "index": 0 }],
[{ "node": "Slack Failure", "type": "main", "index": 0 }]
]
}
}
}
Extension: Add a second poll loop if 5 minutes isn't enough for your models. Use a Split In Batches + Loop node pattern to poll up to 10 times with 2-minute waits.
5. Analytics Slack Bot — Query on Demand
What it does: A Slack slash command hits an n8n webhook. n8n parses the query type (revenue, signups, churn), fetches fresh data from your warehouse, formats it, and replies to Slack instantly. Your whole team can pull data without writing SQL or bothering the data team.
Nodes: Webhook (Slack slash command) → Code (parse command) → Switch (query type) → Postgres (query by type) → Code (format response) → HTTP Request (Slack reply_url)
{
"name": "Analytics Slack Bot",
"nodes": [
{
"parameters": {
"httpMethod": "POST",
"path": "analytics-bot",
"responseMode": "responseNode"
},
"name": "Slack Slash Command",
"type": "n8n-nodes-base.webhook",
"typeVersion": 1,
"position": [250, 300]
},
{
"parameters": {
"respondWith": "text",
"responseText": "Fetching your data... 📊"
},
"name": "Immediate 200 OK",
"type": "n8n-nodes-base.respondToWebhook",
"typeVersion": 1,
"position": [450, 300]
},
{
"parameters": {
"jsCode": "const body = $('Slack Slash Command').item.json.body || {};\nconst text = (body.text || '').toLowerCase().trim();\nconst replyUrl = body.response_url;\nlet queryType = 'unknown';\nif (text.includes('revenue') || text.includes('mrr')) queryType = 'revenue';\nelse if (text.includes('signup') || text.includes('user')) queryType = 'users';\nelse if (text.includes('churn')) queryType = 'churn';\nreturn [{ json: { queryType, replyUrl, text } }];"
},
"name": "Parse Command",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [650, 300]
},
{
"parameters": {
"rules": {
"values": [
{ "outputKey": "revenue", "conditions": { "string": [{ "value1": "={{ $json.queryType }}", "operation": "equals", "value2": "revenue" }] } },
{ "outputKey": "users", "conditions": { "string": [{ "value1": "={{ $json.queryType }}", "operation": "equals", "value2": "users" }] } },
{ "outputKey": "churn", "conditions": { "string": [{ "value1": "={{ $json.queryType }}", "operation": "equals", "value2": "churn" }] } }
]
},
"fallbackOutput": "extra"
},
"name": "Route Query",
"type": "n8n-nodes-base.switch",
"typeVersion": 3,
"position": [850, 300]
}
],
"connections": {
"Slack Slash Command": { "main": [[{ "node": "Immediate 200 OK", "type": "main", "index": 0 }]] },
"Immediate 200 OK": { "main": [[{ "node": "Parse Command", "type": "main", "index": 0 }]] },
"Parse Command": { "main": [[{ "node": "Route Query", "type": "main", "index": 0 }]] }
}
}
Setup notes: Configure the Slack slash command to point to your n8n webhook URL. The response_url in the body lets n8n send a delayed follow-up reply after it fetches the data — this is the right pattern for Slack commands that take >3 seconds.
Why n8n fits data engineering workflows
| Feature | n8n | Cron + Python | Airflow |
|---|---|---|---|
| Setup time | Minutes (Docker) | Minutes | Hours/days |
| Ops overhead | Low (self-hosted) | Low | High |
| Visual debugging | ✅ | ❌ | Limited |
| Built-in retry | ✅ | ❌ (manual) | ✅ |
| 400+ integrations | ✅ | Manual API code | Via operators |
| Git-versionable | ✅ (JSON export) | ✅ | ✅ |
| Per-operation cost | $0 (self-hosted) | $0 | $0 |
n8n isn't a replacement for Airflow on complex DAG-heavy pipelines — but for the 80% of data engineering work that's "fetch this API, transform it, load it, alert me if it breaks," n8n is dramatically faster to build and maintain.
Get pre-built versions
Two of these workflows are available as pre-built, documented templates at stripeai.gumroad.com:
- Webhook to Database ($12) — battle-tested pattern for any webhook → transform → Postgres/MySQL pipeline
- Daily Report Generator ($19) — schedule → multi-source data → formatted HTML report → email/Slack
Download, import the JSON, swap in your credentials, and you're live in minutes.
Built with n8n. All workflow JSONs are import-ready — paste into your n8n instance via Settings → Import Workflow.
Top comments (0)