DEV Community

Pirate Prentice
Pirate Prentice

Posted on

n8n Kafka Node: Produce, Consume, and Stream Events in Your Workflows (Free JSON)

Apache Kafka is the backbone of event-driven architectures at scale. Whether you're streaming clickstream data, syncing microservices, or building real-time pipelines, the n8n Kafka node lets you produce and consume Kafka messages directly inside your workflows — no custom code required.

This guide covers everything: broker setup, credentials, producing messages, consuming topics, offset management, error handling, and real-world patterns.


What the n8n Kafka Node Does

The Kafka node has two operations:

Operation Direction Use case
Produce n8n → Kafka Publish events, trigger downstream consumers
Consume (Trigger) Kafka → n8n React to messages, process event streams

For consuming, n8n uses a Kafka Trigger node (a separate trigger node), not the standard Kafka node. The standard node only produces.


Prerequisites

  • A running Kafka broker (local, Confluent Cloud, AWS MSK, Redpanda, etc.)
  • At least one topic created
  • Optional: SSL certificates and/or SASL credentials if your broker requires auth

Step 1: Configure Kafka Credentials

In n8n, go to Credentials → New → Apache Kafka.

Fields

Field Description
Brokers Comma-separated list: broker1:9092,broker2:9092
Client ID Identifier for this n8n client (e.g., n8n-producer)
SSL Toggle on for TLS connections; paste CA cert, client cert, client key
SASL Toggle for auth; choose PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512
Username / Password For SASL auth

Local Kafka (no auth)

Brokers: localhost:9092
Client ID: n8n-local
SSL: off
SASL: off
Enter fullscreen mode Exit fullscreen mode

Confluent Cloud

Brokers: pkc-xxxxx.us-east-1.aws.confluent.cloud:9092
Client ID: n8n-confluent
SSL: on (CA cert from Confluent dashboard)
SASL: on, PLAIN
Username: <API key>
Password: <API secret>
Enter fullscreen mode Exit fullscreen mode

Step 2: Produce Messages

Add a Kafka node (not the trigger). Set operation to Send Message.

Core fields

Field Example Notes
Topic orders.created Must exist unless auto-create is enabled on broker
Message {{ JSON.stringify($json) }} Any string; JSON recommended
Key {{ $json.orderId }} Optional; controls partition routing
Headers source=n8n, version=1 Key-value pairs; useful for tracing
Partition (leave blank) Leave blank to let Kafka pick via key hash

Minimal "publish event" example

Topic: user.signup
Message: {"userId":"{{$json.id}}","email":"{{$json.email}}","ts":"{{$now}}"}
Key: {{$json.id}}
Enter fullscreen mode Exit fullscreen mode

Sending an array as multiple messages

The Kafka node sends one message per item when your input has multiple items. Wire a Split In Batches or Loop Over Items if you need per-item messages from a single array field.


Step 3: Consume Messages with the Kafka Trigger

For consuming, use the Kafka Trigger node as the workflow's starting node.

Trigger fields

Field Example Notes
Topic orders.created The topic to subscribe to
Group ID n8n-order-processor Consumer group; Kafka tracks offsets per group
Session Timeout 30000 ms before broker considers consumer dead
Heartbeat Interval 3000 ms between heartbeats; must be < Session Timeout / 3
Read Messages From Beginning off Toggle on to replay from earliest offset
JSON Parse Message on Auto-parses message body as JSON
Return Headers off Toggle on if you need message headers downstream

Consumer group gotchas

  • Every Kafka Trigger in n8n with the same Group ID on the same topic competes for partitions. Use distinct group IDs per workflow to avoid message loss.
  • If you toggle Read Messages From Beginning after messages have accumulated, you'll replay the entire topic from offset 0. Only use this for initial backfill or replay scenarios.

Step 4: Handling Offsets and At-Least-Once Delivery

Kafka commits offsets after n8n processes the message (when the workflow execution completes). This means:

  • If the workflow crashes mid-execution, the message will be re-delivered on next start.
  • Design your downstream steps to be idempotent — safe to run twice with the same message.

Idempotency patterns

Stripe payment deduplication:

// In a Code node before calling Stripe
const idempotencyKey = $json.kafkaOffset + '-' + $json.topic + '-' + $json.partition;
// Pass as idempotency_key to Stripe API call
Enter fullscreen mode Exit fullscreen mode

Database upsert instead of insert:

INSERT INTO orders (id, data) VALUES (:id, :data)
ON CONFLICT (id) DO UPDATE SET data = :data;
Enter fullscreen mode Exit fullscreen mode

Step 5: Error Handling

Dead Letter Queue (DLQ) pattern

Route failed message processing to a DLQ topic so you don't lose events:

[Kafka Trigger: orders.created]
  → [Try: process order]
  → [On Error: Kafka Produce → orders.dlq]
Enter fullscreen mode Exit fullscreen mode

In your error branch, republish with the original message plus error context:

// In Code node (error branch)
return [{
  json: {
    originalMessage: $('Kafka Trigger').item.json,
    error: $json.error,
    failedAt: new Date().toISOString(),
    workflowId: $workflow.id,
  }
}];
Enter fullscreen mode Exit fullscreen mode

Retry with backoff

The Kafka Trigger will re-deliver on crash, but for soft errors (API rate limits, temporary downtime) use n8n's built-in retry on fail setting on individual nodes, or use a Wait node + re-publish to the same topic with a retryCount header.


Step 6: Multi-Topic Patterns

Fan-out (one producer, multiple consumers)

One n8n workflow publishes to a topic; multiple separate workflows each have their own Kafka Trigger on the same topic with different Group IDs. Each gets every message independently.

[Workflow A: publishes to payments.processed]
  ↓
[Workflow B: Group=invoicing  → generates invoice]
[Workflow C: Group=analytics  → logs to BigQuery]
[Workflow D: Group=fraud      → checks risk score]
Enter fullscreen mode Exit fullscreen mode

Topic routing with Switch node

[Kafka Trigger: events.*]
  → [Switch: $json.type]
      → "order.created"  → [Order workflow]
      → "payment.done"   → [Payment workflow]
      → default          → [Log unknown event]
Enter fullscreen mode Exit fullscreen mode

Step 7: Schema and Validation

Kafka doesn't enforce message schemas by default. Add a Code node right after the Kafka Trigger to validate:

const msg = $json;

if (!msg.userId || typeof msg.userId !== 'string') {
  throw new Error(`Invalid message: missing userId. Raw: ${JSON.stringify(msg)}`);
}

if (!msg.eventType || !['signup', 'login', 'purchase'].includes(msg.eventType)) {
  throw new Error(`Unknown eventType: ${msg.eventType}`);
}

return [{ json: msg }];
Enter fullscreen mode Exit fullscreen mode

Pair this with a DLQ so invalid messages land in events.invalid rather than crashing the workflow.


Monitoring and Observability

What to watch How
Consumer lag Kafka's built-in kafka-consumer-groups.sh --describe or Confluent dashboard
n8n execution errors n8n execution log; set up error workflow notifications
Message throughput Kafka topic metrics (messages in/out per second)
DLQ depth Monitor your DLQ topic — growing depth means upstream failures

Free Workflow JSON

Here's a complete produce + consume pattern you can import directly into n8n:

{
  "name": "Kafka Produce + DLQ Pattern",
  "nodes": [
    {
      "id": "trigger",
      "name": "Kafka Trigger",
      "type": "n8n-nodes-base.kafkaTrigger",
      "position": [250, 300],
      "parameters": {
        "topic": "orders.created",
        "groupId": "n8n-order-processor",
        "sessionTimeout": 30000,
        "heartbeatInterval": 3000,
        "readMessagesFromBeginning": false,
        "jsonParseMessage": true
      }
    },
    {
      "id": "validate",
      "name": "Validate Message",
      "type": "n8n-nodes-base.code",
      "position": [450, 300],
      "parameters": {
        "jsCode": "const msg = $json;\nif (!msg.orderId) throw new Error('Missing orderId: ' + JSON.stringify(msg));\nreturn [{ json: msg }];"
      }
    },
    {
      "id": "process",
      "name": "Process Order",
      "type": "n8n-nodes-base.httpRequest",
      "position": [650, 300],
      "parameters": {
        "method": "POST",
        "url": "https://your-api.com/orders",
        "sendBody": true,
        "bodyParameters": { "parameters": [{ "name": "orderId", "value": "={{ $json.orderId }}" }] }
      }
    },
    {
      "id": "dlq",
      "name": "Dead Letter Queue",
      "type": "n8n-nodes-base.kafka",
      "position": [650, 450],
      "parameters": {
        "topic": "orders.dlq",
        "message": "={{ JSON.stringify({ original: $('Kafka Trigger').item.json, error: $json.error, ts: $now }) }}"
      }
    }
  ],
  "connections": {
    "Kafka Trigger": { "main": [[ { "node": "Validate Message", "type": "main", "index": 0 } ]] },
    "Validate Message": { "main": [[ { "node": "Process Order", "type": "main", "index": 0 } ]] },
    "Process Order": { "error": [[ { "node": "Dead Letter Queue", "type": "main", "index": 0 } ]] }
  }
}
Enter fullscreen mode Exit fullscreen mode

To import: n8n → Workflows → Import from JSON → paste above.


Common Errors and Fixes

Error Cause Fix
KafkaJSConnectionError: Connection timeout Wrong broker address or port Check broker string; verify firewall rules allow 9092
KafkaJSProtocolError: TOPIC_AUTHORIZATION_FAILED ACL blocks the n8n client Grant produce/consume ACL to the client ID
KafkaJSNumberOfRetriesExceeded Broker unreachable after retries Check broker health; increase retry count in credentials
SyntaxError: Unexpected token JSON.stringify missing on message Wrap object messages: {{ JSON.stringify($json) }}
Messages not appearing Wrong group ID or offset already consumed Use a new group ID or toggle Read From Beginning

Summary

Task Node / Setting
Publish to Kafka Kafka node → Send Message
Subscribe to topic Kafka Trigger node
Consumer group isolation Unique Group ID per workflow
Replay from start Read Messages From Beginning: on
Dead letter queue Error branch → Kafka node → dlq topic
Schema validation Code node after trigger

The Kafka node's simplicity hides some nuance around consumer groups and offset management — get those right and you have a production-grade event pipeline running entirely inside n8n.

Questions? Drop them in the comments — especially if you're hitting Confluent Cloud auth issues or MSK IAM auth (which needs a custom SASL mechanism not yet in n8n's UI).

Top comments (1)

Collapse
 
pirateprentice profile image
Pirate Prentice

Are you running Kafka with Confluent Cloud, AWS MSK, or self-hosted? Curious which setups people are integrating with n8n — MSK IAM auth in particular has some quirks worth covering.