DEV Community

Cover image for nestjs with kafkajs Producer and Consumer
tkssharma
tkssharma

Posted on

nestjs with kafkajs Producer and Consumer

NestJS with Kafkajs: A Powerful Combination for Building Scalable Applications

https://github.com/tkssharma/nestjs-kafka-monorepo

Introduction

Kafka is a distributed streaming platform that is widely used for real-time data processing and messaging. NestJS, on the other hand, is a progressive Node.js framework for building scalable and efficient applications. Combining Kafka with NestJS can create a powerful and scalable solution for various use cases.

Integrating Kafka with NestJS

To integrate Kafka with NestJS, we can use the @nestjs/microservices package. This package provides a convenient way to create microservices that communicate using different protocols, including Kafka.

Creating a Kafka Producer

To create a Kafka producer in NestJS, we can use the @nestjs/microservices module and the KafkaOptions interface. Here's an example:

import { KafkaOptions, Transport } from '@nestjs/microservices';

@Injectable()
export class KafkaProducerService {
  private readonly kafkaClient: ClientKafka;

  constructor() {
    this.kafkaClient = new ClientKafka({
      transport: Transport.KAFKA,
      options: {
        client: {
          brokers: ['localhost:9092'],
        },
      },
    });
  }

  async send(message: any) {
    await this.kafkaClient.emit('topic-name', message);
  }
}
Enter fullscreen mode Exit fullscreen mode

Creating a Kafka Consumer

To create a Kafka consumer in NestJS, we can use the @nestjs/microservices module and the KafkaOptions interface. Here's an example:

import { KafkaOptions, Transport } from '@nestjs/microservices';

@Injectable()
export class KafkaConsumerService {
  private readonly kafkaClient: ClientKafka;

  constructor() {
    this.kafkaClient = new ClientKafka({
      transport: Transport.KAFKA,
      options: {
        client: {
          brokers: ['localhost:9092'],
        },
      },
    });
  }

  @EventPattern('topic-name')
  async handleEvent(message: any) {
    // Process the message
  }
}
Enter fullscreen mode Exit fullscreen mode

Using Kafka Producer and Consumer

Once you have created the producer and consumer, you can use them to send and receive messages. Here's an example:

@Controller('kafka')
export class KafkaController {
  constructor(private readonly kafkaProducerService: KafkaProducerService) {}

  @Post()
  async sendMessage(@Body() message: any) {
    await this.kafkaProducerService.send(message);
    return { message: 'Message sent successfully' };
  }
}
Enter fullscreen mode Exit fullscreen mode

Integrate nestjs with simple kafkajs library

import { Module } from '@nestjs/common';
import { KafkaService } from './kafka.service';
import { AppConfigModule } from '@app/config';
import { KafkaConsumerService } from './kafka.consumer.service';
import { KafkaProducerService } from './kafka.producer.service';

@Module({
  imports: [AppConfigModule],
  providers: [KafkaConsumerService, KafkaProducerService],
  exports: [KafkaConsumerService, KafkaProducerService],
})
export class KafkaModule { }

Enter fullscreen mode Exit fullscreen mode
import { AppConfigService } from '@app/config';
import { Injectable, OnApplicationShutdown } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ConsumerConfig, ConsumerSubscribeTopic, KafkaMessage } from 'kafkajs';
import { KafkaConsumer } from './consumer.service';

interface KafkajsConsumerOptions {
  topic: ConsumerSubscribeTopic;
  config: ConsumerConfig;
  onMessage: (message: KafkaMessage) => Promise<void>;
}

export interface IConsumer {
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  consume: (message: any) => Promise<void>;
}


@Injectable()
export class KafkaConsumerService implements OnApplicationShutdown {
  private readonly consumers: IConsumer[] = [];

  constructor(
    private readonly configService: AppConfigService,
  ) { }

  async consume({ topic, config, onMessage }: KafkajsConsumerOptions) {
    const consumer = new KafkaConsumer(
      topic,
      config,
      this.configService.kafka.broker
    );
    await consumer.connect();
    await consumer.consume(onMessage);
    this.consumers.push(consumer);
  }

  async onApplicationShutdown() {
    for (const consumer of this.consumers) {
      await consumer.disconnect();
    }
  }
}

Enter fullscreen mode Exit fullscreen mode

Consumer service

import { Consumer, ConsumerConfig, ConsumerSubscribeTopic, Kafka, KafkaMessage, Producer } from "kafkajs";
import { IProducer } from "./kafka.producer.service";
import { Logger } from "@nestjs/common";
import { IConsumer } from "./kafka.consumer.service";
import * as retry from 'async-retry';

export const sleep = (timeout: number) => {
  return new Promise<void>((resolve) => setTimeout(resolve, timeout));
};

export class KafkaConsumer implements IConsumer {
  private readonly kafka: Kafka;
  private readonly consumer: Consumer;
  private readonly logger: Logger;

  constructor(
    private readonly topic: ConsumerSubscribeTopic,
    config: ConsumerConfig,
    broker: string) {
    this.kafka = new Kafka({
      brokers: [broker]
    })
    this.consumer = this.kafka.consumer(config);
    this.logger = new Logger(`${topic}-${config.groupId}`)
  }
  async consume(onMessage: (message: KafkaMessage) => Promise<void>) {
    await this.consumer.subscribe(this.topic)
    await this.consumer.run({
      eachMessage: async ({ message, partition }) => {
        this.logger.debug(`Processing message partition: ${partition}`);
        try {
          await retry(async () => onMessage(message), {
            retries: 3,
            onRetry: (error, attempt) =>
              this.logger.error(
                `Error consuming message, executing retry ${attempt}/3...`,
                error,
              ),
          });
        } catch (err) {
          // handle failure of message 
          // write then to DB table or log them 
          // better write to DATABASE 
        }
      },
    })
  }

  async connect() {
    try {
      await this.consumer.connect();
    } catch (err) {
      this.logger.error('Failed to connect to Kafka. trying again ...', err);
      await sleep(5000);
      await this.connect();
    }

  }
  async disconnect() {
    this.consumer.disconnect()
  }
}
Enter fullscreen mode Exit fullscreen mode

Conclusion

By integrating Kafka with NestJS, you can build scalable and distributed applications that can handle high volumes of data and real-time processing. The @nestjs/microservices module provides a convenient way to interact with Kafka, making it easy to incorporate into your NestJS projects.

Top comments (0)