DEV Community

Dev Cookies
Dev Cookies

Posted on

Building Microservices with Kafka: Bidirectional Communication with Multiple Topics(KRaft)

In this guide, we will build two Spring Boot microservices that communicate with each other using Apache Kafka. Each service will act as both a Producer and a Consumer, and they will interact using multiple Kafka topics. We'll also follow real-world microservice design principles like proper topic ownership and modular structure.


✨ Use Case Overview

Services:

  • Order Service
  • Notification Service

Topics:

  • order-topic: Produced by Order Service
  • payment-topic: Produced by Order Service
  • notification-topic: Produced by Notification Service

Communication:

Source Service Kafka Topic Target Service
Order Service order-topic Notification Service
Order Service payment-topic Notification Service
Notification Service notification-topic Order Service

🌐 Prerequisites

  • Java 17+
  • Spring Boot 3+
  • Docker (for Kafka)
  • Maven

πŸš€ Step 1: Kafka with Docker (KRaft Mode, No Zookeeper)

docker-compose.yml

version: '3.8'
services:
  kafka:
    image: bitnami/kafka:latest
    ports:
      - "9092:9092"
    environment:
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_LOG_DIRS=/bitnami/kafka/data
      - ALLOW_PLAINTEXT_LISTENER=yes
    volumes:
      - kafka_data:/bitnami/kafka

volumes:
  kafka_data:
Enter fullscreen mode Exit fullscreen mode

πŸ“„ Step 2: Project Structure

kafka-microservices/
β”œβ”€β”€ docker-compose.yml
β”œβ”€β”€ order-service/
β”‚   └── src/main/java/com/example/order/
β”‚       β”œβ”€β”€ config/
β”‚       β”œβ”€β”€ controller/
β”‚       β”œβ”€β”€ service/
β”‚       β”œβ”€β”€ consumer/
β”‚       └── OrderServiceApplication.java
β”‚   └── resources/application.yml
β”œβ”€β”€ notification-service/
β”‚   └── src/main/java/com/example/notification/
β”‚       β”œβ”€β”€ config/
β”‚       β”œβ”€β”€ controller/
β”‚       β”œβ”€β”€ service/
β”‚       β”œβ”€β”€ consumer/
β”‚       └── NotificationServiceApplication.java
β”‚   └── resources/application.yml
Enter fullscreen mode Exit fullscreen mode

πŸ“… Step 3: Order Service Code

OrderServiceApplication.java

@SpringBootApplication
public class OrderServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderServiceApplication.class, args);
    }
}
Enter fullscreen mode Exit fullscreen mode

config/KafkaTopicConfig.java

@Configuration
public class KafkaTopicConfig {
    @Bean
    public NewTopic orderTopic() {
        return TopicBuilder.name("order-topic").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic paymentTopic() {
        return TopicBuilder.name("payment-topic").partitions(1).replicas(1).build();
    }
}
Enter fullscreen mode Exit fullscreen mode

controller/OrderController.java

@RestController
@RequestMapping("/api/order")
public class OrderController {

    private final OrderService orderService;

    public OrderController(OrderService orderService) {
        this.orderService = orderService;
    }

    @PostMapping("/create")
    public String createOrder(@RequestParam String orderDetails) {
        orderService.sendOrder(orderDetails);
        return "Order sent";
    }

    @PostMapping("/pay")
    public String makePayment(@RequestParam String paymentDetails) {
        orderService.sendPayment(paymentDetails);
        return "Payment sent";
    }
}
Enter fullscreen mode Exit fullscreen mode

service/OrderService.java

@Service
public class OrderService {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public OrderService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendOrder(String message) {
        kafkaTemplate.send("order-topic", message);
    }

    public void sendPayment(String message) {
        kafkaTemplate.send("payment-topic", message);
    }
}
Enter fullscreen mode Exit fullscreen mode

consumer/NotificationConsumer.java

@Component
public class NotificationConsumer {
    @KafkaListener(topics = "notification-topic", groupId = "order-group")
    public void handleNotification(String message) {
        System.out.println("OrderService received notification: " + message);
    }
}
Enter fullscreen mode Exit fullscreen mode

application.yml

server:
  port: 8081

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: order-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
Enter fullscreen mode Exit fullscreen mode

πŸ“… Step 4: Notification Service Code

NotificationServiceApplication.java

@SpringBootApplication
public class NotificationServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(NotificationServiceApplication.class, args);
    }
}
Enter fullscreen mode Exit fullscreen mode

config/KafkaTopicConfig.java

@Configuration
public class KafkaTopicConfig {
    @Bean
    public NewTopic notificationTopic() {
        return TopicBuilder.name("notification-topic").partitions(1).replicas(1).build();
    }
}
Enter fullscreen mode Exit fullscreen mode

controller/NotifyController.java

@RestController
@RequestMapping("/api/notify")
public class NotifyController {

    private final NotificationService notificationService;

    public NotifyController(NotificationService notificationService) {
        this.notificationService = notificationService;
    }

    @PostMapping
    public String notifyUser(@RequestParam String message) {
        notificationService.sendNotification(message);
        return "Notification sent";
    }
}
Enter fullscreen mode Exit fullscreen mode

service/NotificationService.java

@Service
public class NotificationService {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public NotificationService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendNotification(String message) {
        kafkaTemplate.send("notification-topic", message);
    }
}
Enter fullscreen mode Exit fullscreen mode

consumer/OrderEventConsumer.java

@Component
public class OrderEventConsumer {

    @KafkaListener(topics = "order-topic", groupId = "notification-group")
    public void consumeOrder(String message) {
        System.out.println("NotificationService received order: " + message);
    }

    @KafkaListener(topics = "payment-topic", groupId = "notification-group")
    public void consumePayment(String message) {
        System.out.println("NotificationService received payment: " + message);
    }
}
Enter fullscreen mode Exit fullscreen mode

application.yml

server:
  port: 8082

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: notification-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
Enter fullscreen mode Exit fullscreen mode

πŸ’‘ Testing

Start Kafka:

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

Run Services:

cd order-service
./mvnw spring-boot:run

cd ../notification-service
./mvnw spring-boot:run
Enter fullscreen mode Exit fullscreen mode

Test Endpoints:

curl -X POST "http://localhost:8081/api/order/create?orderDetails=Order123"
curl -X POST "http://localhost:8081/api/order/pay?paymentDetails=PaidOrder123"
curl -X POST "http://localhost:8082/api/notify?message=OrderConfirmed"
Enter fullscreen mode Exit fullscreen mode

Logs:

NotificationService received order: Order123
NotificationService received payment: PaidOrder123
OrderService received notification: OrderConfirmed
Enter fullscreen mode Exit fullscreen mode

πŸš€ Summary: Topic Ownership

Topic Created in
order-topic order-service
payment-topic order-service
notification-topic notification-service

Each service creates the topics it produces to. This separation aligns with microservice boundaries and ensures decoupled topic management.


Top comments (0)