DEV Community

Cover image for Building a Spring Boot Application with Spring Cloud Stream for Kafka Stream Processing
Khalid Edaoudi
Khalid Edaoudi

Posted on

Building a Spring Boot Application with Spring Cloud Stream for Kafka Stream Processing

1. Download Kafka

Download the version 2.13-2.7.0 of Kafka (https://kafka.apache.org/downloads)

2. Start Kafka

To start Kafka we should first execute Zookeeper (It handles kafka instances in our application).

we add start command to open the output terminal in an new prompt, bin\windows\zookeeper-server.start.bat is the script file to run zookeeper based on configuration file config/zookeeper.properties, if you have Linux operating system, the zookeeper script is in bin\zookeeper-server.start.sh.

So we have now zookeeper running on port 2181.

By the same way we start Kafka.

Sometimes Kafka won't be running because wmic (Windows Management Instrumentation Command-line) is missing, it used to check JAVA_HOME and java version, to fix that open kafka-start-server.bat file in your editor and replace the whole content by the following code :

@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements.  See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License.  You may obtain a copy of the License at
rem
rem     http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.


IF [%1] EQU [] (
    echo USAGE: %0 server.properties
    EXIT /B 1
)

SetLocal
IF ["%KAFKA_LOG4J_OPTS%"] EQU [""] (
    set KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%~dp0../../config/log4j.properties
)

REM --- Force 64-bit heap, supprime la détection avec wmic ---
IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
    set KAFKA_HEAP_OPTS=-Xmx1G -Xms1G
)

"%~dp0kafka-run-class.bat" kafka.Kafka %*
EndLocal
Enter fullscreen mode Exit fullscreen mode

Then you would have Kafka running on port 9092.


3. kafla-console-producer & kafka-console-consumer

kafla-console-producer and kafka-console-consumer are kafka console clients to test kafka on the console, let's try that.

first, we start the consumer to listen to a topic of name R1 using bin\windows\kafka-console-consumer.bat script.

We mention the address of a running kafka instance using --bootstrap-server then the name of the topic to listen to.

Now we have a consumer which is subscribed to the topic R1.

Let's create a producer using bin\windows\kafka-console-producer.bat script:

On the left, the producer is sending the message "kafka is great" to a Kafka R1 topic, while on the right, the consumer receives the same message in real-time. This illustrates Kafka’s core messaging capability: reliable, real-time communication between distributed systems.


Documentation : https://kafka.apache.org/documentation/#quickstart

3. Create Java Application

We're going to create a Spring Boot project using spring initialzr
We will use Spring boot version 3.5.8 with jar Packaging and Java 17 and choosing Maven as our dependency management system.

As a dependencies add the followings :

  • Lombok
  • Spring Web
  • Spring for Apache Kafka
  • Spring for Apache Kafka Streams
  • Cloud Stream

Then click on Generate, unzip the file and open the project on your favorite Java Editor (I recommend Intellij IDEA which is the editor I used in this demo).

4. Create The PageEvent Entity

inside the main package create another package entities then create class named PageEvent.java


package com.example.kafka.entities;

import lombok.*;

import java.util.Date;

@Getter @Setter @AllArgsConstructor @NoArgsConstructor @ToString
public class PageEvent {
    private String name;
    private String user;
    private Date date;
    private long duration;
}
Enter fullscreen mode Exit fullscreen mode

ATTENTION : Lombok may not be working in recent versions of spring boot, to fix that we specify an older lombok version explicitly in pom.xml file.

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.32</version>
    <optional>true</optional>
</dependency>
Enter fullscreen mode Exit fullscreen mode

And also in the plugins section.

<configuration>
    <annotationProcessorPaths>
        <path>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.32</version>
    </path>
    </annotationProcessorPaths>
</configuration>
Enter fullscreen mode Exit fullscreen mode

5. Create PageEventRestController

Create PageEventRestController.java Class in a new package web.

This REST controller exposes an endpoint that publishes PageEvent messages to any Kafka topic. The key part is StreamBridge, a Spring Cloud Stream utility that lets you send messages programmatically without binding the controller to a specific producer function. When the /publish/{topic}/{name} endpoint is called, the controller creates a new PageEvent object, then uses streamBridge.send(topic, pageEvent) to dynamically route the event to the given Kafka topic. This makes it easy to produce messages on demand, especially for testing or building flexible producer APIs.

package com.example.kafka.web;


import com.example.kafka.entities.PageEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.Random;

@RestController
public class PageEventRestController {

    @Autowired
    private StreamBridge streamBridge;

    @GetMapping("/publish/{topic}/{name}")
    public PageEvent publish(@PathVariable String topic, @PathVariable String name) {
        PageEvent pageEvent = new PageEvent(name,Math.random() > 0.5?"U1":"U2",new Date(),new Random().nextInt(9000));
        streamBridge.send(topic,
                pageEvent);
        return pageEvent;
    }

}
Enter fullscreen mode Exit fullscreen mode

Then You run the KafkaApplicationclass.

Now you open a consumer console subscribed to topic R1 and test the publish controller end-point which represent our producer.

the producer publishes a message and the consumer receives it instantly, confirming that everything is working correctly.

6. Create PageEventConsumer

Now we will create a consumer inside our Spring Boot application, you name it PageEventService.java.


package com.example.kafka.services;

import com.example.kafka.entities.PageEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;

import java.util.function.Consumer;

@Service
public class PageEventService {

    @Bean
    public Consumer<PageEvent> pageEventConsumer() {
        return (input) -> {
            System.out.println("**************************************");
            System.out.println(input.toString());
            System.out.println("**************************************");
        };
    }

}
Enter fullscreen mode Exit fullscreen mode

This class defines a Kafka consumer using Spring Cloud Stream’s functional programming model.
Spring Cloud Stream allows you to create message-driven microservices by exposing functions like Consumer<T> that automatically bind to Kafka topics.

In this example, the PageEventService declares a Consumer<PageEvent> bean named pageEventConsumer.
When a PageEvent message arrives from Kafka, Spring Cloud Stream passes it to this function. The consumer then prints the event details between separators, making it easy to verify that the message was received and processed.

This setup provides a clean, minimal way to build real-time Kafka consumers without dealing directly with low-level Kafka APIs—Spring Cloud Stream handles the boilerplate, letting you focus on processing your data.

Then open your application.properties (or application.yml) file and put those properties

spring.cloud.function.definition=pageEventConsumer
spring.cloud.stream.bindings.pageEventConsumer-in-0.destination=R1
spring.cloud.stream.bindings.pageEventConsumer-in-0.group=group1
Enter fullscreen mode Exit fullscreen mode

This configuration tells Spring Cloud Stream which function should be activated, which Kafka topic it should listen to, and which consumer group it belongs to.

  • spring.cloud.function.definition=pageEventConsumer
    This selects the functional bean named pageEventConsumer as the active consumer function. Spring Cloud Stream will bind this function to Kafka automatically.

  • spring.cloud.stream.bindings.pageEventConsumer-in-0.destination=R1
    This maps the input channel of the function to the Kafka topic R1.
    Whenever a message is published to the R1 topic, it will be delivered to pageEventConsumer.

  • spring.cloud.stream.bindings.pageEventConsumer-in-0.group=group1
    This assigns the consumer to the Kafka consumer group named group1.

Now you run the application and you test (tap localhost:8080/publish/R1/blog)

7. Create Producer Poller

In Spring Cloud Stream, a producer poller is a mechanism that automatically triggers a Supplier function at a fixed interval so it can generate and publish new messages to a topic without needing an external event.
Instead of manually calling the producer or waiting for an HTTP request, the poller acts like a timer: every few milliseconds or seconds, it executes the Supplier, collects the returned object, and sends it to the configured message channel

 @Bean
    public Supplier<PageEvent> pageEventSupplier() {
        return () -> new PageEvent(
                Math.random() > 0.5?"P1":"P2",
                Math.random() > 0.5?"U1":"U2",
                new Date(),new Random().nextInt(9000));
    }
Enter fullscreen mode Exit fullscreen mode

In appilication.properties file I set spring.cloud.stream.poller.fixed-delay property to 100ms, that mean 100ms is the time interval between each new message.

spring.cloud.function.definition=pageEventConsumer;pageEventSupplier

spring.cloud.stream.bindings.pageEventConsumer-in-0.destination=R1
spring.cloud.stream.bindings.pageEventConsumer-in-0.group=group1

spring.cloud.stream.bindings.pageEventSupplier-out-0.destination=R2
spring.cloud.stream.bindings.pageEventSupplier-out-0.group=group2
spring.cloud.stream.poller.fixed-delay=100
Enter fullscreen mode Exit fullscreen mode

8. Summary

In this tutorial, we explored the essential building blocks of using Kafka with Spring Boot and Spring Cloud Stream, starting from installing Kafka locally, testing message flow with console producers and consumers, and finally building a real-time event streaming application in Java.

You learned how to:

  • Install and run Zookeeper and Kafka on your machine

  • Use kafka-console-producer and kafka-console-consumer to test topics manually

  • Build a REST producer using StreamBridge

  • Implement a Kafka consumer using Spring Cloud Stream's functional model

  • Create a Producer Poller with a Supplier to automatically publish messages on a fixed schedule

Together, these components demonstrate how powerful and simple it can be to create event-driven microservices using Spring Cloud Stream. With only a few lines of configuration and functional beans, your application becomes capable of consuming, producing, transforming, and streaming data in real time all without the complexity of low-level Kafka APIs.

From here, you can extend your project by adding Kafka Streams, function compositions, message filtering, windowing, or even advanced analytics pipelines. Event-driven architectures open the door to creating scalable, reactive systems that handle data as it flows.

Top comments (0)