DEV Community

Dev Cookies
Dev Cookies

Posted on

Building Microservices with Kafka: Bidirectional Communication using Spring Cloud Stream and Spring Cloud Function

In this advanced guide, we'll build two Spring Boot microservices communicating through Kafka using Spring Cloud Stream and Spring Cloud Function. Both services act as producers and consumers, each managing their own topics. The project avoids ZooKeeper by using KRaft mode in Kafka.


✨ Use Case Overview

Services:

  • Order Service
  • Notification Service

Topics:

  • order-topic: Produced by Order Service
  • payment-topic: Produced by Order Service
  • notification-topic: Produced by Notification Service

Communication:

Source Service Kafka Topic Target Service
Order Service order-topic Notification Service
Order Service payment-topic Notification Service
Notification Service notification-topic Order Service

🌐 Prerequisites

  • Java 17+
  • Spring Boot 3+
  • Spring Cloud 2023+ release
  • Docker (for Kafka)
  • Maven

🐳 Step 1: Kafka in KRaft Mode using Docker

docker-compose.yml:

version: '3.8'
services:
  kafka:
    image: bitnami/kafka:latest
    ports:
      - "9092:9092"
    environment:
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_LOG_DIRS=/bitnami/kafka/data
      - ALLOW_PLAINTEXT_LISTENER=yes
    volumes:
      - kafka_data:/bitnami/kafka

volumes:
  kafka_data:
Enter fullscreen mode Exit fullscreen mode

πŸ“¦ Project Structure

kafka-microservices/
β”œβ”€β”€ docker-compose.yml
β”œβ”€β”€ order-service/
β”‚   β”œβ”€β”€ src/main/java/com/example/order/
β”‚   β”‚   β”œβ”€β”€ config/
β”‚   β”‚   β”œβ”€β”€ handlers/
β”‚   β”‚   β”œβ”€β”€ OrderServiceApplication.java
β”‚   └── resources/
β”‚       └── application.yml
β”œβ”€β”€ notification-service/
β”‚   β”œβ”€β”€ src/main/java/com/example/notification/
β”‚   β”‚   β”œβ”€β”€ config/
β”‚   β”‚   β”œβ”€β”€ handlers/
β”‚   β”‚   β”œβ”€β”€ NotificationServiceApplication.java
β”‚   └── resources/
β”‚       └── application.yml
Enter fullscreen mode Exit fullscreen mode

πŸ”§ Step 2: Spring Cloud Stream Setup in Order Service

OrderServiceApplication.java

@SpringBootApplication
public class OrderServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderServiceApplication.class, args);
    }
}
Enter fullscreen mode Exit fullscreen mode

handlers/OrderFunctions.java

@Configuration
public class OrderFunctions {

    private static final Logger log = LoggerFactory.getLogger(OrderFunctions.class);

    @Bean
    public Function<String, String> orderProcessor() {
        return input -> {
            log.info("Processing order: {}", input);
            return "Processed Order: " + input;
        };
    }

    @Bean
    public Function<String, String> paymentProcessor() {
        return input -> {
            log.info("Processing payment: {}", input);
            return "Processed Payment: " + input;
        };
    }

    @Bean
    public Consumer<String> notificationListener() {
        return message -> log.info("Received notification: {}", message);
    }
}
Enter fullscreen mode Exit fullscreen mode

application.yml

spring:
  cloud:
    function:
      definition: orderProcessor;paymentProcessor;notificationListener
    stream:
      bindings:
        orderProcessor-out-0:
          destination: order-topic
        paymentProcessor-out-0:
          destination: payment-topic
        notificationListener-in-0:
          destination: notification-topic
      kafka:
        binder:
          brokers: localhost:9092
Enter fullscreen mode Exit fullscreen mode

πŸ”§ Step 3: Spring Cloud Stream Setup in Notification Service

NotificationServiceApplication.java

@SpringBootApplication
public class NotificationServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(NotificationServiceApplication.class, args);
    }
}
Enter fullscreen mode Exit fullscreen mode

handlers/NotificationFunctions.java

@Configuration
public class NotificationFunctions {

    private static final Logger log = LoggerFactory.getLogger(NotificationFunctions.class);

    @Bean
    public Consumer<String> orderListener() {
        return message -> log.info("Received Order: {}", message);
    }

    @Bean
    public Consumer<String> paymentListener() {
        return message -> log.info("Received Payment: {}", message);
    }

    @Bean
    public Function<String, String> notifyFunction() {
        return input -> {
            log.info("Sending notification: {}", input);
            return "Notification sent: " + input;
        };
    }
}
Enter fullscreen mode Exit fullscreen mode

application.yml

spring:
  cloud:
    function:
      definition: orderListener;paymentListener;notifyFunction
    stream:
      bindings:
        orderListener-in-0:
          destination: order-topic
        paymentListener-in-0:
          destination: payment-topic
        notifyFunction-out-0:
          destination: notification-topic
      kafka:
        binder:
          brokers: localhost:9092
Enter fullscreen mode Exit fullscreen mode

βœ… Test It All!

Start Kafka:

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

Start Microservices:

cd order-service && ./mvnw spring-boot:run
cd notification-service && ./mvnw spring-boot:run
Enter fullscreen mode Exit fullscreen mode

Trigger Functions:

You can use ApplicationRunner, REST endpoint, or manual testing with Kafka tools to push test messages.

Expected Logs:

[NotificationService] Received Order: Order123
[NotificationService] Received Payment: PaidOrder123
[OrderService] Received notification: OrderConfirmed
Enter fullscreen mode Exit fullscreen mode

πŸ“Œ Summary

Topic Produced by Consumed by
order-topic order-service notification-service
payment-topic order-service notification-service
notification-topic notification-service order-service

🧠 Each service manages its own topic configuration and production, following bounded context principles in microservices.

Top comments (0)