DEV Community

DEV-AI
DEV-AI

Posted on

Implementing Asynchronous Request-Reply Communication in Spring Boot with Kafka

When implementing asynchronous communication between services using Kafka in a Spring Boot application, where a service produces a request and waits for a response, it is best to encapsulate the Kafka communication logic into a reusable abstraction layer or library. Below is the best approach to achieve this:


Best Approach

1. Use Kafka Request-Reply Pattern

Kafka is inherently asynchronous and not designed for direct request-response communication. However, you can implement a request-reply pattern by:

  • Producing a message to a Kafka topic (e.g., request-topic).
  • The consumer processes the message and produces a response to a response-topic.
  • The producer listens for the response on the response-topic using a unique correlation ID.

2. Spring Kafka Library

Spring Boot's Kafka integration (spring-kafka) provides tools to build this pattern effectively. The library supports producing, consuming, and implementing correlation IDs for request-reply communication.

3. Leverage Correlation ID

Use a correlation ID to match the request with the response. This ID is sent with the request message and used to filter the response.


Steps to Implement

1. Dependency Setup

Add the following dependencies in your pom.xml:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
Enter fullscreen mode Exit fullscreen mode

2. Kafka Configuration

Define the producer and consumer factories, as well as templates for Kafka:

@Configuration
public class KafkaConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "request-reply-group");
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
Enter fullscreen mode Exit fullscreen mode

3. Implement Request-Reply Logic

Producer Service

The producer sends a request message and listens for the response with a CompletableFuture:

@Service
public class KafkaRequestReplyService {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final Map<String, CompletableFuture<String>> pendingRequests = new ConcurrentHashMap<>();

    @Value("${kafka.request.topic}")
    private String requestTopic;

    @Value("${kafka.response.topic}")
    private String responseTopic;

    public KafkaRequestReplyService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public CompletableFuture<String> sendRequest(String request) {
        String correlationId = UUID.randomUUID().toString();
        CompletableFuture<String> future = new CompletableFuture<>();
        pendingRequests.put(correlationId, future);

        // Send request with correlation ID as a header
        kafkaTemplate.send(requestTopic, request)
                .addCallback(
                        success -> System.out.println("Request sent successfully"),
                        failure -> future.completeExceptionally(failure)
                );

        return future;
    }

    @KafkaListener(topics = "${kafka.response.topic}", groupId = "request-reply-group")
    public void listenForResponse(ConsumerRecord<String, String> record) {
        String correlationId = record.headers().lastHeader("correlationId").value().toString();
        String response = record.value();

        CompletableFuture<String> future = pendingRequests.remove(correlationId);

        if (future != null) {
            future.complete(response);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Key Points:

  • A unique correlation ID is generated for each request.
  • Responses are matched with the corresponding future by the correlation ID.
  • The KafkaListener listens for responses and resolves the future.

Consumer Service

The consumer processes the request and sends a response with the same correlation ID:

@Service
public class KafkaConsumerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    @Value("${kafka.response.topic}")
    private String responseTopic;

    public KafkaConsumerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @KafkaListener(topics = "${kafka.request.topic}", groupId = "request-reply-group")
    public void processRequest(ConsumerRecord<String, String> record) {
        String request = record.value();
        String correlationId = record.headers().lastHeader("correlationId").value().toString();

        // Process the request (business logic)
        String response = "Processed: " + request;

        // Send the response back with the same correlation ID
        kafkaTemplate.send(responseTopic, response)
                .addCallback(
                        success -> System.out.println("Response sent successfully"),
                        failure -> System.err.println("Failed to send response")
                );
    }
}
Enter fullscreen mode Exit fullscreen mode

4. Properties Configuration

Add the Kafka configuration properties to your application.properties:

spring.kafka.bootstrap-servers=localhost:9092
kafka.request.topic=request-topic
kafka.response.topic=response-topic
Enter fullscreen mode Exit fullscreen mode

5. Testing the Request-Reply

You can initiate a request and wait for the response as follows:

@RestController
@RequestMapping("/api")
public class TestController {

    private final KafkaRequestReplyService kafkaRequestReplyService;

    public TestController(KafkaRequestReplyService kafkaRequestReplyService) {
        this.kafkaRequestReplyService = kafkaRequestReplyService;
    }

    @PostMapping("/send")
    public ResponseEntity<String> sendRequest(@RequestBody String request) {
        try {
            CompletableFuture<String> responseFuture = kafkaRequestReplyService.sendRequest(request);
            String response = responseFuture.get(10, TimeUnit.SECONDS); // Wait for response
            return ResponseEntity.ok(response);
        } catch (Exception e) {
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Error: " + e.getMessage());
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Conclusion

The request-reply pattern with a custom implementation using spring-kafka is the most flexible and effective approach for asynchronous communication in a Spring Boot application. You can encapsulate this logic in a service or library for reusability.

Top comments (0)