DEV Community

Cover image for Work with Apache Kafka in Spring Boot
Neeraj Kumar
Neeraj Kumar

Posted on

Work with Apache Kafka in Spring Boot

Step 1: Create a New Spring Boot Starter Project

creating a new Spring Boot Starter Project using STS. While configuring the project, select Spring Web, Spring for Apache Kafka, and Spring Boot DevTools as dependencies
Step 2: Enable Kafka in the Main Class

To integrate Apache Kafka with Spring Boot,

package com.dev.spring.kafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;

@SpringBootApplication
@EnableKafka
public class SpringBoot2ApacheKafkaTestApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringBoot2ApacheKafkaTestApplication.class, args);
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 3: Create a Custom MessageRepository Class

Next, create a MessageRepository class to store incoming messages.

package com.dev.spring.kafka.message.repository;

import java.util.ArrayList;
import java.util.List;
import org.springframework.stereotype.Component;

@Component
public class MessageRepository {

    private List<String> list = new ArrayList<>();

    public void addMessage(String message) {
        list.add(message);
    }

    public String getAllMessages() {
        return list.toString();
    }
}

Enter fullscreen mode Exit fullscreen mode

Step 4: Create a MessageProducer Class

Create a MessageProducer class to send messages to the Kafka topic.

package com.dev.spring.kafka.sender;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class MessageProducer {

    private Logger log = LoggerFactory.getLogger(MessageProducer.class);

    @Autowired 
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${myapp.kafka.topic}")
    private String topic;

    public void sendMessage(String message) {
        log.info("MESSAGE SENT FROM PRODUCER END -> " + message);
        kafkaTemplate.send(topic, message);
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 5: Create a MessageConsumer Class

Now, create a MessageConsumer class to consume messages from the Kafka topic.

package com.dev.spring.kafka.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import com.dev.spring.kafka.message.repository.MessageRepository;

@Component
public class MessageConsumer {

    private Logger log = LoggerFactory.getLogger(MessageConsumer.class);

    @Autowired
    private MessageRepository messageRepo;

    @KafkaListener(topics = "${myapp.kafka.topic}", groupId = "xyz")
    public void consume(String message) {
        log.info("MESSAGE RECEIVED AT CONSUMER END -> " + message);
        messageRepo.addMessage(message);
    }
}

Enter fullscreen mode Exit fullscreen mode

Step 6: Create a KafkaRestController Class

Finally, create a KafkaRestController class to handle REST requests for sending and retrieving messages.

package com.dev.spring.kafka.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.dev.spring.kafka.message.repository.MessageRepository;
import com.dev.spring.kafka.sender.MessageProducer;

@RestController
public class KafkaRestController {

    @Autowired
    private MessageProducer producer;

    @Autowired
    private MessageRepository messageRepo;

    // Send message to Kafka
    @GetMapping("/send")
    public String sendMsg(@RequestParam("msg") String message) {
        producer.sendMessage(message);
        return "'" + message + "' sent successfully!";
    }

    // Read all messages
    @GetMapping("/getAll")
    public String getAllMessages() {
        return messageRepo.getAllMessages();
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 7: Create the application.yml File

Lastly, configure your application by creating an application.yml

server:
  port: 9090

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    consumer:
      bootstrap-servers: localhost:9092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

myapp:
  kafka:
    topic: myKafkaTest
Enter fullscreen mode Exit fullscreen mode

Top comments (0)