KafkaJS is the native Node.js client for Apache Kafka. Its API lets you produce, consume, and manage topics with pure JavaScript.
Producer: Send Events
import { Kafka } from "kafkajs";
const kafka = new Kafka({ clientId: "scraper", brokers: ["localhost:9092"] });
const producer = kafka.producer();
await producer.connect();
// Send a single message
await producer.send({
topic: "scraped-products",
messages: [
{
key: "product-123",
value: JSON.stringify({ title: "Widget", price: 29.99, url: "https://..." }),
headers: { source: "amazon", scrapedAt: Date.now().toString() },
},
],
});
// Batch send
await producer.sendBatch({
topicMessages: [
{
topic: "scraped-products",
messages: products.map(p => ({ key: p.id, value: JSON.stringify(p) })),
},
{
topic: "scrape-metrics",
messages: [{ value: JSON.stringify({ total: products.length, duration: elapsed }) }],
},
],
});
Consumer: Process Events
const consumer = kafka.consumer({ groupId: "product-processors" });
await consumer.connect();
await consumer.subscribe({ topics: ["scraped-products"], fromBeginning: false });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const product = JSON.parse(message.value.toString());
console.log(`[${partition}] ${product.title}: $${product.price}`);
// Process: save to DB, check price alerts, etc.
await db.product.upsert({
where: { url: product.url },
update: { price: product.price },
create: product,
});
},
});
Batch Processing
await consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat }) => {
const products = batch.messages.map(m => JSON.parse(m.value.toString()));
// Bulk insert
await db.product.createMany({ data: products, skipDuplicates: true });
// Mark processed
for (const message of batch.messages) {
resolveOffset(message.offset);
}
await heartbeat();
},
});
Admin: Manage Topics
const admin = kafka.admin();
await admin.connect();
// Create topic
await admin.createTopics({
topics: [{
topic: "scraped-products",
numPartitions: 6,
replicationFactor: 3,
configEntries: [{ name: "retention.ms", value: "604800000" }], // 7 days
}],
});
// List topics
const topics = await admin.listTopics();
// Describe consumer group
const groupDesc = await admin.describeGroups(["product-processors"]);
// Get offsets
const offsets = await admin.fetchOffsets({ groupId: "product-processors", topics: ["scraped-products"] });
Error Handling and Retries
const kafka = new Kafka({
clientId: "scraper",
brokers: ["localhost:9092"],
retry: {
initialRetryTime: 300,
retries: 10,
maxRetryTime: 30000,
factor: 2,
},
connectionTimeout: 3000,
requestTimeout: 25000,
});
Stream scraped data in real-time? My Apify tools + Kafka = enterprise data pipelines.
Custom streaming solution? Email spinov001@gmail.com
Top comments (0)