DEV Community

Jaime Hernández
Jaime Hernández

Posted on

What is Kafka and How to Implement it in NestJS

Apache Kafka

Apache Kafka is a distributed and scalable data processing platform used for real-time data streaming. It serves as a messaging and event streaming platform, facilitating the secure, reliable, and real-time transfer of data between applications and distributed systems.

Kafka operates on a publish-subscribe model, where data producers publish messages to a topic, and consumers subscribe to that topic to receive messages. Kafka can handle large volumes of real-time data and distribute them to multiple consumers in parallel.

Commonly, Kafka finds applications in enterprise solutions, especially those requiring high-speed, high-availability communication between distributed systems. Additionally, Kafka is a flexible platform that can be integrated with various programming languages and tools, boasting an active developer community contributing to its improvement and expansion.

Key features of Kafka include:

  • Horizontal scalability
  • Fault tolerance
  • Data replication
  • Storage capacity
  • Integration with data processing tools like Apache Spark, Apache Flink, and Apache Storm

Implementing Kafka in NestJS

To implement Kafka in NestJS, follow these steps:

  1. Step one
  2. Step two
  3. Step three
  • 1. Install the @nestjs/microservices and kafkajs packages using npm:
npm install @nestjs/microservices kafkajs
Enter fullscreen mode Exit fullscreen mode
  • 2. Create a Kafka module in NestJS using the NestJS generator:
nest generate module kafka
Enter fullscreen mode Exit fullscreen mode
  • 3. Import the KafkaModule in your main application:
import { KafkaModule } from './kafka/kafka.module';
@Module({
  imports: [KafkaModule],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}
Enter fullscreen mode Exit fullscreen mode
  • 4. Create a Kafka service in the KafkaModule:
import { Injectable } from '@nestjs/common';
import { Kafka } from 'kafkajs';
@Injectable()
export class KafkaService {
  private kafka;
  constructor() {
    this.kafka = new Kafka({
      clientId: 'your-client-id',
      brokers: ['kafka-broker1', 'kafka-broker2'],
    });
  }
  async sendMessage(topic: string, message: string) {
    const producer = this.kafka.producer();
    await producer.connect();
    await producer.send({
      topic,
      messages: [{ value: message }],
    });
    await producer.disconnect();
  }
  async consumeMessages(topic: string) {
    const consumer = this.kafka.consumer({ groupId: 'group-id' });
    await consumer.connect();
    await consumer.subscribe({ topic, fromBeginning: true });
    await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        // Handle the received message
        console.log({
          topic,
          partition,
          value: message.value.toString(),
        });
      },
    });
  }
}
Enter fullscreen mode Exit fullscreen mode
  • 5. Add a controller in the KafkaModule to handle HTTP requests:
import { Controller, Get, Post, Body, Param } from '@nestjs/common';
import { KafkaService } from './kafka.service';
@Controller('kafka')
export class KafkaController {
  constructor(private readonly kafkaService: KafkaService) {}
  @Post('send/:topic')
  async sendKafkaMessage(@Param('topic') topic: string, @Body() message: { value: string }) {
    await this.kafkaService.sendMessage(topic, message.value);
  }
  @Get('consume/:topic')
  async consumeKafkaMessages(@Param('topic') topic: string) {
    await this.kafkaService.consumeMessages(topic);
  }
}
Enter fullscreen mode Exit fullscreen mode
  • 6. Add the controller to the KafkaModule:
@Module({
  controllers: [KafkaController],
  providers: [KafkaService],
})
export class KafkaModule {}
Enter fullscreen mode Exit fullscreen mode
  • 7. Start your application. These are the basic steps to implement Kafka in NestJS. Of course, there are many other configurations and options to consider, depending on your specific use case.

Top comments (0)