So, previously we setup a basic kafka environment and a Spring Boot Web API to push a basic string message to Kafka, let's spend the time in this article reading the value.
Continuing on from where we left off, we need to create another Spring Application using the https://start.spring.io website:
You can omit the Spring Web dependency this time around!
We aren't going to add anything fancy yet, just get the code base and get it ready to start a new application!
Set Up Spring Cloud
First, let's set up Spring Cloud dependencies. In your pom.xml
let' make the following changes:
- Add a new property indicating the version for Spring Cloud:
<properties>
<java.version>18</java.version>
<spring-cloud-release.version>2021.0.1</spring-cloud-release.version>
</properties>
Next we want to reference the Spring Cloud POM. Add the following XML block under the project
tag:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud-release.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
This will include any dependencies we need from Spring Cloud v2021.0.1
For our String based consumer we just need 2 core dependencies added and 1 test dependency:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<scope>test</scope>
<classifier>test-binder</classifier>
<type>test-jar</type>
</dependency>
Consumers and Processors
Now we can add in the code that will basically be a data pipeline, taking a message from a topic and then doing something with it before passing it on to another topic.
Create a new class KafkaConfiguration.java
with the following beans:
package com.example.demo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Flux;
import java.util.function.Consumer;
import java.util.function.Function;
@Configuration
public class KafkaConfiguration {
private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
@Bean
public Function<Flux<String>, Flux<String>> employeeProcessor() {
return employee -> employee.map(emp -> {
log.info("Employee is {}", emp);
return emp;
}).log();
}
@Bean
public Consumer<String> employeeConsumer() {
return (value) -> log.info("Consumed {}", value);
}
}
Let's walk the code!
We have a basic Logger in place to output values as they happen:
private static final Logger log = LoggerFactory.getLogger(KafkaConfiguration.class);
Next we have a bean that will act as an Input/Output of the message coming from the topic:
@Bean
public Function<Flux<String>, Flux<String>> employeeProcessor() {
return employee -> employee.map(emp -> {
log.info("Employee is {}", emp);
return emp;
}).log();
}
Our method signature represents a (data in/data out) pipeline:
Function<Flux<String>, Flux<String>>
Here we are basically saying that the message we read in will be a string (<Flux<String>,
)and we will then send a message that is also a string (, Flux<String>>
)
The body of our processor just reads the message and prints it to the log (a couple of times!):
return employee -> employee.map(emp -> {
log.info("Employee is {}", emp);
return emp;
}).log();
Typically we might call another method to do more with the incoming data creating perhaps a different output. For now we just send the message as-is on it's way.
The next bean is a consumer, for now it just logs out the String message:
@Bean
public Consumer<String> employeeConsumer() {
return (value) -> log.info("Consumed {}", value);
}
Before we can run this application you may notice something missing, the topics aren't yet defined.
When we use Spring Cloud Functions to create these messaging apps, we need to defer to the binding capabilities whereby we configure these beans and bind them to the underlying topics of the message bus.
Open the src/main/resources/application.yml
file and let's do that now!
spring:
cloud:
function:
definition: employeeConsumer;employeeProcessor
stream:
bindings:
employeeProcessor-in-0:
destination: employee
employeeProcessor-out-0:
destination: employees-processed
employeeConsumer-in-0:
destination: employees-processed
kafka:
binder:
producer-properties:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
key.serializer: org.apache.kafka.common.serialization.StringSerializer
consumer-properties:
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
First in our config, we've created function definitions:
spring:
cloud:
function:
definition: employeeConsumer;employeeProcessor
As you can see these are simply the names of the beans we just defined. We will use these as reference points later on in the configuration.
Next we've defined the actual Bean-to-Topic bindings:
spring:
cloud:
stream:
bindings:
employeeProcessor-in-0:
destination: employee
employeeProcessor-out-0:
destination: employees-processed
employeeConsumer-in-0:
destination: employees-processed
Using the definitions (employeeProcessor, employeeConsumer
) we have mapped the topics accordingly. So, the employeeProcessor has 2 mappings:
- Message In:
employee
- Message Out:
employees-processed
Meanwhile the employeeConsumer
has 1 binding:
- Message In:
employees-processed
This means that anything that lands on the employee
topic, gets picked up by the employeeProcesor
. The employeeProcessor
then prints out the message then sends the message to the employees-processed
topic.
Any messages coming to the employees-processed
queue then get picked up by the employeeConsumer
which, again, just prints the message out to the logs.
Lastly in our configuration we are defining some basic properties for Kafka in general:
spring:
cloud:
stream:
kafka:
binder:
producer-properties:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
key.serializer: org.apache.kafka.common.serialization.StringSerializer
consumer-properties:
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
Quite simply put, we treat messages coming off and going onto the topics as Strings!
Starting the application
You now be able to run the application with:
$ ./mvnw spring-boot:run
Provided the original web application from the previous article is still running, we should be able to send a message from the API and see it being picked up by our consumer:
Employee is Marshall, Andrew
Consumed Marshall, Andrew
In the next article we will introduce Avro as the serializer and show you how to send binary messages and the Schema Registry!
Top comments (0)