DEV Community

Dev Cookies
Dev Cookies

Posted on

Building Microservices with Kafka: Bidirectional Communication with Multiple Topics

In this guide, we will build two Spring Boot microservices that communicate with each other using Apache Kafka. Each service will act as both a Producer and a Consumer, and they will interact using multiple Kafka topics. We'll also follow real-world microservice design principles like proper topic ownership and modular structure.


✨ Use Case Overview

Services:

  • Order Service
  • Notification Service

Topics:

  • order-topic: Produced by Order Service
  • payment-topic: Produced by Order Service
  • notification-topic: Produced by Notification Service

Communication:

Source Service Kafka Topic Target Service
Order Service order-topic Notification Service
Order Service payment-topic Notification Service
Notification Service notification-topic Order Service

🌐 Prerequisites

  • Java 17+
  • Spring Boot 3+
  • Docker (for Kafka)
  • Maven

🚀 Step 1: Kafka with Docker

docker-compose.yml

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.0.1
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Enter fullscreen mode Exit fullscreen mode

📄 Step 2: Project Structure

kafka-microservices/
├── docker-compose.yml
├── order-service/
│   └── src/main/java/com/example/order/
│       ├── config/
│       ├── controller/
│       ├── service/
│       ├── consumer/
│       └── OrderServiceApplication.java
│   └── resources/application.yml
├── notification-service/
│   └── src/main/java/com/example/notification/
│       ├── config/
│       ├── controller/
│       ├── service/
│       ├── consumer/
│       └── NotificationServiceApplication.java
│   └── resources/application.yml
Enter fullscreen mode Exit fullscreen mode

📅 Step 3: Order Service Code

OrderServiceApplication.java

@SpringBootApplication
public class OrderServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderServiceApplication.class, args);
    }
}
Enter fullscreen mode Exit fullscreen mode

config/KafkaTopicConfig.java

@Configuration
public class KafkaTopicConfig {
    @Bean
    public NewTopic orderTopic() {
        return TopicBuilder.name("order-topic").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic paymentTopic() {
        return TopicBuilder.name("payment-topic").partitions(1).replicas(1).build();
    }
}
Enter fullscreen mode Exit fullscreen mode

controller/OrderController.java

@RestController
@RequestMapping("/api/order")
public class OrderController {

    private final OrderService orderService;

    public OrderController(OrderService orderService) {
        this.orderService = orderService;
    }

    @PostMapping("/create")
    public String createOrder(@RequestParam String orderDetails) {
        orderService.sendOrder(orderDetails);
        return "Order sent";
    }

    @PostMapping("/pay")
    public String makePayment(@RequestParam String paymentDetails) {
        orderService.sendPayment(paymentDetails);
        return "Payment sent";
    }
}
Enter fullscreen mode Exit fullscreen mode

service/OrderService.java

@Service
public class OrderService {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public OrderService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendOrder(String message) {
        kafkaTemplate.send("order-topic", message);
    }

    public void sendPayment(String message) {
        kafkaTemplate.send("payment-topic", message);
    }
}
Enter fullscreen mode Exit fullscreen mode

consumer/NotificationConsumer.java

@Component
public class NotificationConsumer {
    @KafkaListener(topics = "notification-topic", groupId = "order-group")
    public void handleNotification(String message) {
        System.out.println("OrderService received notification: " + message);
    }
}
Enter fullscreen mode Exit fullscreen mode

application.yml

server:
  port: 8081

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: order-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
Enter fullscreen mode Exit fullscreen mode

📅 Step 4: Notification Service Code

NotificationServiceApplication.java

@SpringBootApplication
public class NotificationServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(NotificationServiceApplication.class, args);
    }
}
Enter fullscreen mode Exit fullscreen mode

config/KafkaTopicConfig.java

@Configuration
public class KafkaTopicConfig {
    @Bean
    public NewTopic notificationTopic() {
        return TopicBuilder.name("notification-topic").partitions(1).replicas(1).build();
    }
}
Enter fullscreen mode Exit fullscreen mode

controller/NotifyController.java

@RestController
@RequestMapping("/api/notify")
public class NotifyController {

    private final NotificationService notificationService;

    public NotifyController(NotificationService notificationService) {
        this.notificationService = notificationService;
    }

    @PostMapping
    public String notifyUser(@RequestParam String message) {
        notificationService.sendNotification(message);
        return "Notification sent";
    }
}
Enter fullscreen mode Exit fullscreen mode

service/NotificationService.java

@Service
public class NotificationService {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public NotificationService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendNotification(String message) {
        kafkaTemplate.send("notification-topic", message);
    }
}
Enter fullscreen mode Exit fullscreen mode

consumer/OrderEventConsumer.java

@Component
public class OrderEventConsumer {

    @KafkaListener(topics = "order-topic", groupId = "notification-group")
    public void consumeOrder(String message) {
        System.out.println("NotificationService received order: " + message);
    }

    @KafkaListener(topics = "payment-topic", groupId = "notification-group")
    public void consumePayment(String message) {
        System.out.println("NotificationService received payment: " + message);
    }
}
Enter fullscreen mode Exit fullscreen mode

application.yml

server:
  port: 8082

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: notification-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
Enter fullscreen mode Exit fullscreen mode

💡 Testing

Start Kafka:

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

Run Services:

cd order-service
./mvnw spring-boot:run

cd ../notification-service
./mvnw spring-boot:run
Enter fullscreen mode Exit fullscreen mode

Test Endpoints:

curl -X POST "http://localhost:8081/api/order/create?orderDetails=Order123"
curl -X POST "http://localhost:8081/api/order/pay?paymentDetails=PaidOrder123"
curl -X POST "http://localhost:8082/api/notify?message=OrderConfirmed"
Enter fullscreen mode Exit fullscreen mode

Logs:

NotificationService received order: Order123
NotificationService received payment: PaidOrder123
OrderService received notification: OrderConfirmed
Enter fullscreen mode Exit fullscreen mode

🚀 Summary: Topic Ownership

Topic Created in
order-topic order-service
payment-topic order-service
notification-topic notification-service

Each service creates the topics it produces to. This separation aligns with microservice boundaries and ensures decoupled topic management.


Would you like a ZIP of this project with POM files and full Maven support? Let me know!

Top comments (0)