DEV Community

DevCorner
DevCorner

Posted on

Building a Reactive System with Kafka and Spring WebFlux

Kafka is a distributed event streaming platform that works seamlessly with Spring WebFlux to build reactive, non-blocking microservices. In this guide, we'll walk through step-by-step how to integrate Kafka with Spring WebFlux for reactive messaging.


1. Why Kafka with Spring WebFlux?

Using Kafka with WebFlux allows us to:

✅ Process messages asynchronously and reactively.

✅ Handle high-throughput event streams efficiently.

✅ Scale consumers using Kafka’s consumer groups.

✅ Avoid blocking threads, improving resource efficiency.


2. Setting Up Kafka in Spring Boot

Step 1: Add Dependencies

Add the following dependencies in pom.xml:

<dependencies>
    <!-- Spring Boot WebFlux -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    <!-- Spring Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <!-- Reactive Kafka -->
    <dependency>
        <groupId>io.projectreactor.kafka</groupId>
        <artifactId>reactor-kafka</artifactId>
        <version>1.3.19</version>
    </dependency>

    <!-- Lombok for reducing boilerplate code -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <scope>provided</scope>
    </dependency>
</dependencies>
Enter fullscreen mode Exit fullscreen mode

Step 2: Configure Kafka in application.yml

Create a configuration file for Kafka:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: reactive-group
      auto-offset-reset: earliest
    producer:
      retries: 3
      acks: all
Enter fullscreen mode Exit fullscreen mode

Note: Make sure Kafka and Zookeeper are running on your system.


3. Creating a Reactive Kafka Producer

Step 1: Define a Model Class

Create a simple DTO (Data Transfer Object) for Kafka messages:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageDTO {
    private String id;
    private String message;
}
Enter fullscreen mode Exit fullscreen mode

Step 2: Implement Kafka Producer Service

Create a non-blocking producer using KafkaSender from Reactor Kafka:

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;

@Service
public class ReactiveKafkaProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public ReactiveKafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public Mono<String> sendMessage(String topic, String message) {
        return Mono.fromFuture(() -> kafkaTemplate.send(topic, message).completable())
                   .map(result -> "Message sent to " + result.getRecordMetadata().topic());
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 3: Create a Reactive REST API for Sending Messages

Expose an endpoint in ReactiveKafkaController.java:

import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Mono;

@RestController
@RequestMapping("/kafka")
public class ReactiveKafkaController {
    private final ReactiveKafkaProducer kafkaProducer;

    public ReactiveKafkaController(ReactiveKafkaProducer kafkaProducer) {
        this.kafkaProducer = kafkaProducer;
    }

    @PostMapping("/send")
    public Mono<String> sendMessage(@RequestParam String topic, @RequestParam String message) {
        return kafkaProducer.sendMessage(topic, message);
    }
}
Enter fullscreen mode Exit fullscreen mode

Test it using cURL or Postman:

curl -X POST "http://localhost:8080/kafka/send?topic=reactive-topic&message=Hello Kafka"
Enter fullscreen mode Exit fullscreen mode

4. Creating a Reactive Kafka Consumer

Step 1: Implement Kafka Consumer Service

Use KafkaReceiver from Reactor Kafka for non-blocking message consumption:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

@Service
public class ReactiveKafkaConsumer {

    public Flux<String> consumeMessages(String topic) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(props)
                .subscription(Collections.singleton(topic));

        return KafkaReceiver.create(receiverOptions)
                            .receive()
                            .map(record -> "Received: " + record.value());
    }
}
Enter fullscreen mode Exit fullscreen mode

Step 2: Expose a Reactive API for Consuming Messages

Create an endpoint in ReactiveKafkaController.java:

import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;

@RestController
@RequestMapping("/kafka")
public class ReactiveKafkaController {
    private final ReactiveKafkaConsumer kafkaConsumer;

    public ReactiveKafkaController(ReactiveKafkaConsumer kafkaConsumer) {
        this.kafkaConsumer = kafkaConsumer;
    }

    @GetMapping("/consume")
    public Flux<String> consumeMessages(@RequestParam String topic) {
        return kafkaConsumer.consumeMessages(topic);
    }
}
Enter fullscreen mode Exit fullscreen mode

Test the consumer using cURL:

curl -X GET "http://localhost:8080/kafka/consume?topic=reactive-topic"
Enter fullscreen mode Exit fullscreen mode

5. Testing the Full Reactive Kafka Workflow

  1. Start Kafka and Zookeeper:
   bin/zookeeper-server-start.sh config/zookeeper.properties
   bin/kafka-server-start.sh config/server.properties
Enter fullscreen mode Exit fullscreen mode
  1. Create a Kafka topic:
   bin/kafka-topics.sh --create --topic reactive-topic --bootstrap-server localhost:9092
Enter fullscreen mode Exit fullscreen mode
  1. Send a message:
   curl -X POST "http://localhost:8080/kafka/send?topic=reactive-topic&message=Hello Kafka"
Enter fullscreen mode Exit fullscreen mode
  1. Start consuming messages:
   curl -X GET "http://localhost:8080/kafka/consume?topic=reactive-topic"
Enter fullscreen mode Exit fullscreen mode

6. Key Takeaways

Fully non-blocking Kafka producer and consumer using Spring WebFlux.

Backpressure handled via Reactor Kafka.

Scalable event-driven architecture for microservices.


Next Steps

Now that we’ve covered Kafka + Spring WebFlux, I'll create a similar guide for RabbitMQ + Spring WebFlux next. Let me know if you want any additional details!

AWS Security LIVE!

Tune in for AWS Security LIVE!

Join AWS Security LIVE! for expert insights and actionable tips to protect your organization and keep security teams prepared.

Learn More

Top comments (0)

Image of Docusign

Bring your solution into Docusign. Reach over 1.6M customers.

Docusign is now extensible. Overcome challenges with disconnected products and inaccessible data by bringing your solutions into Docusign and publishing to 1.6M customers in the App Center.

Learn more