DEV Community

Alex Spinov
Alex Spinov

Posted on

KafkaJS Has a Free API That Brings Event Streaming to Any Node.js App

KafkaJS is the native Node.js client for Apache Kafka. Its API lets you produce, consume, and manage topics with pure JavaScript.

Producer: Send Events

import { Kafka } from "kafkajs";

const kafka = new Kafka({ clientId: "scraper", brokers: ["localhost:9092"] });
const producer = kafka.producer();

await producer.connect();

// Send a single message
await producer.send({
  topic: "scraped-products",
  messages: [
    {
      key: "product-123",
      value: JSON.stringify({ title: "Widget", price: 29.99, url: "https://..." }),
      headers: { source: "amazon", scrapedAt: Date.now().toString() },
    },
  ],
});

// Batch send
await producer.sendBatch({
  topicMessages: [
    {
      topic: "scraped-products",
      messages: products.map(p => ({ key: p.id, value: JSON.stringify(p) })),
    },
    {
      topic: "scrape-metrics",
      messages: [{ value: JSON.stringify({ total: products.length, duration: elapsed }) }],
    },
  ],
});
Enter fullscreen mode Exit fullscreen mode

Consumer: Process Events

const consumer = kafka.consumer({ groupId: "product-processors" });

await consumer.connect();
await consumer.subscribe({ topics: ["scraped-products"], fromBeginning: false });

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    const product = JSON.parse(message.value.toString());
    console.log(`[${partition}] ${product.title}: $${product.price}`);

    // Process: save to DB, check price alerts, etc.
    await db.product.upsert({
      where: { url: product.url },
      update: { price: product.price },
      create: product,
    });
  },
});
Enter fullscreen mode Exit fullscreen mode

Batch Processing

await consumer.run({
  eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
    const products = batch.messages.map(m => JSON.parse(m.value.toString()));

    // Bulk insert
    await db.product.createMany({ data: products, skipDuplicates: true });

    // Mark processed
    for (const message of batch.messages) {
      resolveOffset(message.offset);
    }
    await heartbeat();
  },
});
Enter fullscreen mode Exit fullscreen mode

Admin: Manage Topics

const admin = kafka.admin();
await admin.connect();

// Create topic
await admin.createTopics({
  topics: [{
    topic: "scraped-products",
    numPartitions: 6,
    replicationFactor: 3,
    configEntries: [{ name: "retention.ms", value: "604800000" }], // 7 days
  }],
});

// List topics
const topics = await admin.listTopics();

// Describe consumer group
const groupDesc = await admin.describeGroups(["product-processors"]);

// Get offsets
const offsets = await admin.fetchOffsets({ groupId: "product-processors", topics: ["scraped-products"] });
Enter fullscreen mode Exit fullscreen mode

Error Handling and Retries

const kafka = new Kafka({
  clientId: "scraper",
  brokers: ["localhost:9092"],
  retry: {
    initialRetryTime: 300,
    retries: 10,
    maxRetryTime: 30000,
    factor: 2,
  },
  connectionTimeout: 3000,
  requestTimeout: 25000,
});
Enter fullscreen mode Exit fullscreen mode

Stream scraped data in real-time? My Apify tools + Kafka = enterprise data pipelines.

Custom streaming solution? Email spinov001@gmail.com

Top comments (0)