NestJS with Kafkajs: A Powerful Combination for Building Scalable Applications
https://github.com/tkssharma/nestjs-kafka-monorepo
Introduction
Kafka is a distributed streaming platform that is widely used for real-time data processing and messaging. NestJS, on the other hand, is a progressive Node.js framework for building scalable and efficient applications. Combining Kafka with NestJS can create a powerful and scalable solution for various use cases.
Integrating Kafka with NestJS
To integrate Kafka with NestJS, we can use the @nestjs/microservices
package. This package provides a convenient way to create microservices that communicate using different protocols, including Kafka.
Creating a Kafka Producer
To create a Kafka producer in NestJS, we can use the @nestjs/microservices
module and the KafkaOptions
interface. Here's an example:
import { KafkaOptions, Transport } from '@nestjs/microservices';
@Injectable()
export class KafkaProducerService {
private readonly kafkaClient: ClientKafka;
constructor() {
this.kafkaClient = new ClientKafka({
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
},
});
}
async send(message: any) {
await this.kafkaClient.emit('topic-name', message);
}
}
Creating a Kafka Consumer
To create a Kafka consumer in NestJS, we can use the @nestjs/microservices
module and the KafkaOptions
interface. Here's an example:
import { KafkaOptions, Transport } from '@nestjs/microservices';
@Injectable()
export class KafkaConsumerService {
private readonly kafkaClient: ClientKafka;
constructor() {
this.kafkaClient = new ClientKafka({
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
},
});
}
@EventPattern('topic-name')
async handleEvent(message: any) {
// Process the message
}
}
Using Kafka Producer and Consumer
Once you have created the producer and consumer, you can use them to send and receive messages. Here's an example:
@Controller('kafka')
export class KafkaController {
constructor(private readonly kafkaProducerService: KafkaProducerService) {}
@Post()
async sendMessage(@Body() message: any) {
await this.kafkaProducerService.send(message);
return { message: 'Message sent successfully' };
}
}
Integrate nestjs with simple kafkajs library
import { Module } from '@nestjs/common';
import { KafkaService } from './kafka.service';
import { AppConfigModule } from '@app/config';
import { KafkaConsumerService } from './kafka.consumer.service';
import { KafkaProducerService } from './kafka.producer.service';
@Module({
imports: [AppConfigModule],
providers: [KafkaConsumerService, KafkaProducerService],
exports: [KafkaConsumerService, KafkaProducerService],
})
export class KafkaModule { }
import { AppConfigService } from '@app/config';
import { Injectable, OnApplicationShutdown } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ConsumerConfig, ConsumerSubscribeTopic, KafkaMessage } from 'kafkajs';
import { KafkaConsumer } from './consumer.service';
interface KafkajsConsumerOptions {
topic: ConsumerSubscribeTopic;
config: ConsumerConfig;
onMessage: (message: KafkaMessage) => Promise<void>;
}
export interface IConsumer {
connect: () => Promise<void>;
disconnect: () => Promise<void>;
consume: (message: any) => Promise<void>;
}
@Injectable()
export class KafkaConsumerService implements OnApplicationShutdown {
private readonly consumers: IConsumer[] = [];
constructor(
private readonly configService: AppConfigService,
) { }
async consume({ topic, config, onMessage }: KafkajsConsumerOptions) {
const consumer = new KafkaConsumer(
topic,
config,
this.configService.kafka.broker
);
await consumer.connect();
await consumer.consume(onMessage);
this.consumers.push(consumer);
}
async onApplicationShutdown() {
for (const consumer of this.consumers) {
await consumer.disconnect();
}
}
}
Consumer service
import { Consumer, ConsumerConfig, ConsumerSubscribeTopic, Kafka, KafkaMessage, Producer } from "kafkajs";
import { IProducer } from "./kafka.producer.service";
import { Logger } from "@nestjs/common";
import { IConsumer } from "./kafka.consumer.service";
import * as retry from 'async-retry';
export const sleep = (timeout: number) => {
return new Promise<void>((resolve) => setTimeout(resolve, timeout));
};
export class KafkaConsumer implements IConsumer {
private readonly kafka: Kafka;
private readonly consumer: Consumer;
private readonly logger: Logger;
constructor(
private readonly topic: ConsumerSubscribeTopic,
config: ConsumerConfig,
broker: string) {
this.kafka = new Kafka({
brokers: [broker]
})
this.consumer = this.kafka.consumer(config);
this.logger = new Logger(`${topic}-${config.groupId}`)
}
async consume(onMessage: (message: KafkaMessage) => Promise<void>) {
await this.consumer.subscribe(this.topic)
await this.consumer.run({
eachMessage: async ({ message, partition }) => {
this.logger.debug(`Processing message partition: ${partition}`);
try {
await retry(async () => onMessage(message), {
retries: 3,
onRetry: (error, attempt) =>
this.logger.error(
`Error consuming message, executing retry ${attempt}/3...`,
error,
),
});
} catch (err) {
// handle failure of message
// write then to DB table or log them
// better write to DATABASE
}
},
})
}
async connect() {
try {
await this.consumer.connect();
} catch (err) {
this.logger.error('Failed to connect to Kafka. trying again ...', err);
await sleep(5000);
await this.connect();
}
}
async disconnect() {
this.consumer.disconnect()
}
}
Conclusion
By integrating Kafka with NestJS, you can build scalable and distributed applications that can handle high volumes of data and real-time processing. The @nestjs/microservices
module provides a convenient way to interact with Kafka, making it easy to incorporate into your NestJS projects.
Top comments (1)
i dont understand this implementation, nestjs is doing this under the hood already all you need is the config and a controller as consumer for the topics