DEV Community

Dev Cookies
Dev Cookies

Posted on

Spring Boot microservices architecture: Producer and a Consumer — communicate using Apache Kafka

✅ Use Case

  • Producer Service (order-service) sends messages (e.g., order events) to Kafka topics.
  • Consumer Service (notification-service) listens to those topics and processes them (e.g., sends notifications).

🔧 Tech Stack

  • Java 17+
  • Spring Boot 3.x
  • Spring Kafka
  • Kafka/Zookeeper (Dockerized)

🗂️ Folder Structure (Root)

kafka-microservices/
├── docker-compose.yml
├── order-service/
│   ├── src/
│   │   └── main/java/com/example/order/
│   │       ├── controller/
│   │       ├── service/
│   │       ├── config/
│   │       └── OrderServiceApplication.java
│   └── resources/
│       └── application.yml
├── notification-service/
│   ├── src/
│   │   └── main/java/com/example/notification/
│   │       ├── consumer/
│   │       ├── config/
│   │       └── NotificationServiceApplication.java
│   └── resources/
│       └── application.yml
Enter fullscreen mode Exit fullscreen mode

📦 1. docker-compose.yml (Kafka + Zookeeper)

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.1
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.0.1
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Enter fullscreen mode Exit fullscreen mode

📦 2. order-serviceProducer

OrderServiceApplication.java

package com.example.order;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class OrderServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderServiceApplication.class, args);
    }
}
Enter fullscreen mode Exit fullscreen mode

controller/OrderController.java

package com.example.order.controller;

import com.example.order.service.OrderService;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api/orders")
public class OrderController {

    private final OrderService orderService;
    public OrderController(OrderService orderService) {
        this.orderService = orderService;
    }

    @PostMapping
    public String createOrder(@RequestParam String message) {
        orderService.sendOrder(message);
        return "Order sent: " + message;
    }
}
Enter fullscreen mode Exit fullscreen mode

service/OrderService.java

package com.example.order.service;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class OrderService {
    private final KafkaTemplate<String, String> kafkaTemplate;

    public OrderService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendOrder(String message) {
        kafkaTemplate.send("order-topic", message);
    }
}
Enter fullscreen mode Exit fullscreen mode

config/KafkaConfig.java

package com.example.order.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaConfig {
    @Bean
    public NewTopic orderTopic() {
        return TopicBuilder.name("order-topic").partitions(1).replicas(1).build();
    }
}
Enter fullscreen mode Exit fullscreen mode

resources/application.yml

server:
  port: 8081

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
Enter fullscreen mode Exit fullscreen mode

📦 3. notification-serviceConsumer

NotificationServiceApplication.java

package com.example.notification;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class NotificationServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(NotificationServiceApplication.class, args);
    }
}
Enter fullscreen mode Exit fullscreen mode

consumer/OrderConsumer.java

package com.example.notification.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class OrderConsumer {

    @KafkaListener(topics = "order-topic", groupId = "notification-group")
    public void consume(String message) {
        System.out.println("Received order: " + message);
    }
}
Enter fullscreen mode Exit fullscreen mode

resources/application.yml

server:
  port: 8082

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: notification-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Enter fullscreen mode Exit fullscreen mode

🏁 Run Instructions

1. Start Kafka and Zookeeper

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

2. Start Microservices (In separate terminals)

cd order-service
./mvnw spring-boot:run

cd ../notification-service
./mvnw spring-boot:run
Enter fullscreen mode Exit fullscreen mode

3. Test via CURL or Postman

curl -X POST "http://localhost:8081/api/orders?message=HelloKafka"
Enter fullscreen mode Exit fullscreen mode

👉 Output will appear in notification-service terminal:

Received order: HelloKafka
Enter fullscreen mode Exit fullscreen mode

✅ Summary

Service Port Role Kafka Role
order-service 8081 Producer Sends to topic
notification-service 8082 Consumer Listens to topic

Top comments (0)