Apache Kafka is the backbone of event-driven architectures at scale. Whether you're streaming clickstream data, syncing microservices, or building real-time pipelines, the n8n Kafka node lets you produce and consume Kafka messages directly inside your workflows — no custom code required.
This guide covers everything: broker setup, credentials, producing messages, consuming topics, offset management, error handling, and real-world patterns.
What the n8n Kafka Node Does
The Kafka node has two operations:
| Operation | Direction | Use case |
|---|---|---|
| Produce | n8n → Kafka | Publish events, trigger downstream consumers |
| Consume (Trigger) | Kafka → n8n | React to messages, process event streams |
For consuming, n8n uses a Kafka Trigger node (a separate trigger node), not the standard Kafka node. The standard node only produces.
Prerequisites
- A running Kafka broker (local, Confluent Cloud, AWS MSK, Redpanda, etc.)
- At least one topic created
- Optional: SSL certificates and/or SASL credentials if your broker requires auth
Step 1: Configure Kafka Credentials
In n8n, go to Credentials → New → Apache Kafka.
Fields
| Field | Description |
|---|---|
| Brokers | Comma-separated list: broker1:9092,broker2:9092
|
| Client ID | Identifier for this n8n client (e.g., n8n-producer) |
| SSL | Toggle on for TLS connections; paste CA cert, client cert, client key |
| SASL | Toggle for auth; choose PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 |
| Username / Password | For SASL auth |
Local Kafka (no auth)
Brokers: localhost:9092
Client ID: n8n-local
SSL: off
SASL: off
Confluent Cloud
Brokers: pkc-xxxxx.us-east-1.aws.confluent.cloud:9092
Client ID: n8n-confluent
SSL: on (CA cert from Confluent dashboard)
SASL: on, PLAIN
Username: <API key>
Password: <API secret>
Step 2: Produce Messages
Add a Kafka node (not the trigger). Set operation to Send Message.
Core fields
| Field | Example | Notes |
|---|---|---|
| Topic | orders.created |
Must exist unless auto-create is enabled on broker |
| Message | {{ JSON.stringify($json) }} |
Any string; JSON recommended |
| Key | {{ $json.orderId }} |
Optional; controls partition routing |
| Headers |
source=n8n, version=1
|
Key-value pairs; useful for tracing |
| Partition | (leave blank) | Leave blank to let Kafka pick via key hash |
Minimal "publish event" example
Topic: user.signup
Message: {"userId":"{{$json.id}}","email":"{{$json.email}}","ts":"{{$now}}"}
Key: {{$json.id}}
Sending an array as multiple messages
The Kafka node sends one message per item when your input has multiple items. Wire a Split In Batches or Loop Over Items if you need per-item messages from a single array field.
Step 3: Consume Messages with the Kafka Trigger
For consuming, use the Kafka Trigger node as the workflow's starting node.
Trigger fields
| Field | Example | Notes |
|---|---|---|
| Topic | orders.created |
The topic to subscribe to |
| Group ID | n8n-order-processor |
Consumer group; Kafka tracks offsets per group |
| Session Timeout | 30000 |
ms before broker considers consumer dead |
| Heartbeat Interval | 3000 |
ms between heartbeats; must be < Session Timeout / 3 |
| Read Messages From Beginning | off | Toggle on to replay from earliest offset |
| JSON Parse Message | on | Auto-parses message body as JSON |
| Return Headers | off | Toggle on if you need message headers downstream |
Consumer group gotchas
- Every Kafka Trigger in n8n with the same Group ID on the same topic competes for partitions. Use distinct group IDs per workflow to avoid message loss.
- If you toggle Read Messages From Beginning after messages have accumulated, you'll replay the entire topic from offset 0. Only use this for initial backfill or replay scenarios.
Step 4: Handling Offsets and At-Least-Once Delivery
Kafka commits offsets after n8n processes the message (when the workflow execution completes). This means:
- If the workflow crashes mid-execution, the message will be re-delivered on next start.
- Design your downstream steps to be idempotent — safe to run twice with the same message.
Idempotency patterns
Stripe payment deduplication:
// In a Code node before calling Stripe
const idempotencyKey = $json.kafkaOffset + '-' + $json.topic + '-' + $json.partition;
// Pass as idempotency_key to Stripe API call
Database upsert instead of insert:
INSERT INTO orders (id, data) VALUES (:id, :data)
ON CONFLICT (id) DO UPDATE SET data = :data;
Step 5: Error Handling
Dead Letter Queue (DLQ) pattern
Route failed message processing to a DLQ topic so you don't lose events:
[Kafka Trigger: orders.created]
→ [Try: process order]
→ [On Error: Kafka Produce → orders.dlq]
In your error branch, republish with the original message plus error context:
// In Code node (error branch)
return [{
json: {
originalMessage: $('Kafka Trigger').item.json,
error: $json.error,
failedAt: new Date().toISOString(),
workflowId: $workflow.id,
}
}];
Retry with backoff
The Kafka Trigger will re-deliver on crash, but for soft errors (API rate limits, temporary downtime) use n8n's built-in retry on fail setting on individual nodes, or use a Wait node + re-publish to the same topic with a retryCount header.
Step 6: Multi-Topic Patterns
Fan-out (one producer, multiple consumers)
One n8n workflow publishes to a topic; multiple separate workflows each have their own Kafka Trigger on the same topic with different Group IDs. Each gets every message independently.
[Workflow A: publishes to payments.processed]
↓
[Workflow B: Group=invoicing → generates invoice]
[Workflow C: Group=analytics → logs to BigQuery]
[Workflow D: Group=fraud → checks risk score]
Topic routing with Switch node
[Kafka Trigger: events.*]
→ [Switch: $json.type]
→ "order.created" → [Order workflow]
→ "payment.done" → [Payment workflow]
→ default → [Log unknown event]
Step 7: Schema and Validation
Kafka doesn't enforce message schemas by default. Add a Code node right after the Kafka Trigger to validate:
const msg = $json;
if (!msg.userId || typeof msg.userId !== 'string') {
throw new Error(`Invalid message: missing userId. Raw: ${JSON.stringify(msg)}`);
}
if (!msg.eventType || !['signup', 'login', 'purchase'].includes(msg.eventType)) {
throw new Error(`Unknown eventType: ${msg.eventType}`);
}
return [{ json: msg }];
Pair this with a DLQ so invalid messages land in events.invalid rather than crashing the workflow.
Monitoring and Observability
| What to watch | How |
|---|---|
| Consumer lag | Kafka's built-in kafka-consumer-groups.sh --describe or Confluent dashboard |
| n8n execution errors | n8n execution log; set up error workflow notifications |
| Message throughput | Kafka topic metrics (messages in/out per second) |
| DLQ depth | Monitor your DLQ topic — growing depth means upstream failures |
Free Workflow JSON
Here's a complete produce + consume pattern you can import directly into n8n:
{
"name": "Kafka Produce + DLQ Pattern",
"nodes": [
{
"id": "trigger",
"name": "Kafka Trigger",
"type": "n8n-nodes-base.kafkaTrigger",
"position": [250, 300],
"parameters": {
"topic": "orders.created",
"groupId": "n8n-order-processor",
"sessionTimeout": 30000,
"heartbeatInterval": 3000,
"readMessagesFromBeginning": false,
"jsonParseMessage": true
}
},
{
"id": "validate",
"name": "Validate Message",
"type": "n8n-nodes-base.code",
"position": [450, 300],
"parameters": {
"jsCode": "const msg = $json;\nif (!msg.orderId) throw new Error('Missing orderId: ' + JSON.stringify(msg));\nreturn [{ json: msg }];"
}
},
{
"id": "process",
"name": "Process Order",
"type": "n8n-nodes-base.httpRequest",
"position": [650, 300],
"parameters": {
"method": "POST",
"url": "https://your-api.com/orders",
"sendBody": true,
"bodyParameters": { "parameters": [{ "name": "orderId", "value": "={{ $json.orderId }}" }] }
}
},
{
"id": "dlq",
"name": "Dead Letter Queue",
"type": "n8n-nodes-base.kafka",
"position": [650, 450],
"parameters": {
"topic": "orders.dlq",
"message": "={{ JSON.stringify({ original: $('Kafka Trigger').item.json, error: $json.error, ts: $now }) }}"
}
}
],
"connections": {
"Kafka Trigger": { "main": [[ { "node": "Validate Message", "type": "main", "index": 0 } ]] },
"Validate Message": { "main": [[ { "node": "Process Order", "type": "main", "index": 0 } ]] },
"Process Order": { "error": [[ { "node": "Dead Letter Queue", "type": "main", "index": 0 } ]] }
}
}
To import: n8n → Workflows → Import from JSON → paste above.
Common Errors and Fixes
| Error | Cause | Fix |
|---|---|---|
KafkaJSConnectionError: Connection timeout |
Wrong broker address or port | Check broker string; verify firewall rules allow 9092 |
KafkaJSProtocolError: TOPIC_AUTHORIZATION_FAILED |
ACL blocks the n8n client | Grant produce/consume ACL to the client ID |
KafkaJSNumberOfRetriesExceeded |
Broker unreachable after retries | Check broker health; increase retry count in credentials |
SyntaxError: Unexpected token |
JSON.stringify missing on message | Wrap object messages: {{ JSON.stringify($json) }}
|
| Messages not appearing | Wrong group ID or offset already consumed | Use a new group ID or toggle Read From Beginning |
Summary
| Task | Node / Setting |
|---|---|
| Publish to Kafka | Kafka node → Send Message |
| Subscribe to topic | Kafka Trigger node |
| Consumer group isolation | Unique Group ID per workflow |
| Replay from start | Read Messages From Beginning: on |
| Dead letter queue | Error branch → Kafka node → dlq topic |
| Schema validation | Code node after trigger |
The Kafka node's simplicity hides some nuance around consumer groups and offset management — get those right and you have a production-grade event pipeline running entirely inside n8n.
Questions? Drop them in the comments — especially if you're hitting Confluent Cloud auth issues or MSK IAM auth (which needs a custom SASL mechanism not yet in n8n's UI).
Top comments (1)
Are you running Kafka with Confluent Cloud, AWS MSK, or self-hosted? Curious which setups people are integrating with n8n — MSK IAM auth in particular has some quirks worth covering.