Creating Spring project
- Consumer: https://github.com/magnuspedro/KafkaConsumer
- Producer: https://github.com/magnuspedro/KafkaProducer
Description
This tutorial is going to show how to create a spring Kotlin application from scratch to create a Kafka consumer and producer and set up Kafka with docker. We also going to user Sleuth to trace our message over the services, so we can keep a track of each transaction.
Installation
To create a new spring project I'm going to use the start.spring.io with Kotlin.
The first service is going to be the producer as the following image:
The consumer witch is going to be our second sever is going to have the same configurations as the producer, but with a consumer on its name.
We are going to use sleuth because it traces all our requests over the services, and prints them on our logs.
now we are installing the logback
lib to the project for logging. All we have to do is add the libs to the pom.xml
so maven will know we want it as a dependency, and run nvm clean install
to install it.
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.6</version>
</dependency>
Setting Up Kafka with Docker
In the first step, we are going to set up Kafka with docker, using a docker-compose.yml
file.
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
The first service to be installed with our docker-compose
is the zookeeper. The zookeeper allows you to create a Kafka cluster, and it's going to take care of the connection and distribution. The second service is Kafka itself.
Configuration
- PLAINTEXT: We are setting the protocols to plaintext, which means that it's non-encrypted messages. It's not recommended for production, but it's alright for our tests.
- Kafka Exposed Port: 29092
- Zookeeper Exposed Port: 22181
- Kafka Replication Factor: The replication factor is the number of clusters that the partitions are going to be replicated, in our case only on our main cluster 1.
For a better understanding of the variables and Kafka itself, I would recommend this article
Configuring The Producer
To be able to use produce Kafka messages we first need to implement a few configurations so we can send them to our Kafka server.
Properties Config
First, we are going to change the application.properties
file to an application.yml
, just because I like it better if you want you can keep the application.properties
.
For the properties configurations, we are going to create a variable to the Kafka address and topic.
Kafka:
bootstrapAddress: localhost:29092
topics:
product: product
We are using the localhost:29092
as it was set on our container.
Kafka Config
For the Kafka config file, we will use the address to create a Map that has our server address and then create a new topic with the name we defined on our application.yml
file.
As we are creating a @Configuration
it's going to be injected on running time to our code, so after spring finishing booting the topic is going to be created.
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.NewTopic
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.KafkaAdmin
@Configuration
class KafkaConfig(
@Value("\${kafka.bootstrapAddress}")
private val servers: String,
@Value("\${kafka.topics.product}")
private val topic: String
) {
@Bean
fun kafkaAdmin(): KafkaAdmin {
val configs: MutableMap<String, Any?> = HashMap()
configs[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = servers
return KafkaAdmin(configs)
}
@Bean
fun porduto(): NewTopic {
return NewTopic(topic, 1, 1.toShort())
}
}
Producer Config
Now we have the topic configuration, but we need to produce messages to post on the topic, for that we are going to create a producer.
import com.magnuspedro.kafka.producer.kafkaProducer.config.serializer.ProductSerializer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
@Configuration
class KafkaProducerConfig(
@Value("\${kafka.bootstrapAddress}")
private val servers: String
) {
@Bean
fun producerFactory(): ProducerFactory<String, Any> {
val configProps: MutableMap<String, Any> = HashMap()
configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = servers
configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ProductSerializer::class.java
return DefaultKafkaProducerFactory(configProps)
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, Any> {
return KafkaTemplate(producerFactory())
}
}
The producer has even more configuration, it takes the server address, and two serializes. The key serializer is represented by the KEY_SERIALIZER_CLASS_CONFIG
that is going to be a string and the VALUE_SERIALIZER_CLASS_CONFIG
that is a custom serializer created to convert our product entity into JSON.
We are creating a KafkaTemplate
implementation with our configured producer factory.
Custom serializer
For our custom serializer, we are going to use Jackosn to write a byte array from our entity. It has to be a byte array because we are implementing the Serializer Class from spring Kafka.
import com.fasterxml.jackson.databind.ObjectMapper
import com.magnuspedro.kafka.producer.kafkaProducer.entities.Product
import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.Serializer
import org.slf4j.LoggerFactory
class ProductSerializer : Serializer<Product> {
private val objectMapper = ObjectMapper()
private val log = LoggerFactory.getLogger(javaClass)
override fun serialize(topic: String?, data: Product?): ByteArray? {
log.info("Serializing...")
return objectMapper.writeValueAsBytes(
data ?: throw SerializationException("Error when serializing Product to ByteArray[]")
)
}
override fun close() {}
}
For the serialization, we are only accepting data that is not null, or else we are going to throw a SerializationException
.
Our Product file is going to be a simple Kotlin data class
import com.fasterxml.jackson.annotation.JsonProperty
data class Product(
@JsonProperty("name")
val name: String,
@JsonProperty("sku")
val sku: String?
)
We are adding the @JsonProperty
annotation so Jackson knows the name of each field when it's going to convert it.
Test Controller
At last, we need something to send messages to the topic, I have chosen to create a controller, so we can change our objects and send as many as we want.
import com.magnuspedro.kafka.producer.kafkaProducer.entities.Product
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.KafkaHeaders
import org.springframework.messaging.Message
import org.springframework.messaging.support.MessageBuilder
import org.springframework.validation.annotation.Validated
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
@RestController
@RequestMapping("/product")
class LogController(
@Value("\${kafka.topics.product}") val topic: String,
@Autowired
private val kafkaTemplate: KafkaTemplate<String, Any>
) {
private val log = LoggerFactory.getLogger(javaClass)
@PostMapping
fun post(@Validated @RequestBody product: Product): ResponseEntity<Any> {
return try {
log.info("Receiving product request")
log.info("Sending message to Kafka {}", product)
val message: Message<Product> = MessageBuilder
.withPayload(product)
.setHeader(KafkaHeaders.TOPIC, topic)
.setHeader("X-Custom-Header", "Custom header here")
.build()
kafkaTemplate.send(message)
log.info("Message sent with success")
ResponseEntity.ok().build()
} catch (e: Exception) {
log.error("Exception: {}",e)
ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Error to send message")
}
}
}
On the controller, we are going to create a Message
type, so we can add Kafka headers and custom headers when sending the message to the topic. We are going to add only one custom header and see if it's going to come back to us on our producer.
That finishes all the configurations we need to implement a simple Kafka producer, to send JSON messages.
Kafka Consumer
For the Kafka consumer, we are going to need the same basic configurations as our producer, we need to change our application.properties
to application.yml
and create the same Product
entity, so we can change it back from JSON.
If you are going to run both projects on the same machine you need to change on project port, to do so you need to add this config to the
application.yml
file.
logging:
level:
root: info
Kafka Consumer Config
Our consumer config file is going to be similar to our producer.
import com.magnuspedro.kafka.consumer.KafkaConsumer.config.deserializer.ProductDeserializer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.listener.ContainerProperties
@EnableKafka
@Configuration
class KafkaConsumerConfig(
@Value("\${kafka.bootstrapAddress}")
private val servers: String
) {
@Bean
fun consumerFactory(): ConsumerFactory<String?, Any?> {
val props: MutableMap<String, Any> = HashMap()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = servers
props[ConsumerConfig.GROUP_ID_CONFIG] = "ppr"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ProductDeserializer::class.java
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
return DefaultKafkaConsumerFactory(props)
}
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, Any>? {
val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
factory.consumerFactory = consumerFactory()
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
factory.containerProperties.isSyncCommits = true;
return factory
}
}
Most of the configuration as said before is like our producer, and like it, we have a custom deserializer for the value.
Configuration
A committed message is a message that has already been read and confirmed to be consumed.
- GROUP_ID_CONFIG: Is the identification of the group that the consumer belongs
-
AUTO_OFFSET_RESET_CONFIG: Set the configuration when Kafka consumer starts and has no committed messages,
earliest
meaning that it's going the get all the menages from the beginning andlatest
is going to get only the last message - AckMode.MANUAL_IMMEDIATE: Is setting the commit of the messages to be done manually
Custom Deserializer
For our custom deserializer, we are going to implement the Deserializer Class from Spring Kafka and override the deserialize
method.
import com.fasterxml.jackson.databind.ObjectMapper
import com.magnuspedro.kafka.consumer.KafkaConsumer.entities.Product
import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.Deserializer
import org.slf4j.LoggerFactory
import kotlin.text.Charsets.UTF_8
class ProductDeserializer : Deserializer<Product> {
private val objectMapper = ObjectMapper()
private val log = LoggerFactory.getLogger(javaClass)
override fun deserialize(topic: String?, data: ByteArray?): Product? {
log.info("Deserializing...")
return objectMapper.readValue(
String(
data ?: throw SerializationException("Error when deserializing byte[] to Product"), UTF_8
), Product::class.java
)
}
override fun close() {}
}
For the deserialization, we are only going to convert the byte array that we received to a string with the UTF_8
char-set and use Jackson to convert back to our entity. As the serializer, we are going to throw an error if the data is null.
Kafka Consumer
For consuming messages we first need to create a Consumer, and for that Spring Kafka offers an annotation that creates a listener for the topic.
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.support.Acknowledgment
import org.springframework.stereotype.Component
@Component
class Consumer {
private val logger = LoggerFactory.getLogger(javaClass)
@KafkaListener(topics = ["\${kafka.topics.product}"], groupId = "ppr")
fun listenGroupFoo(consumerRecord: ConsumerRecord<Any, Any>, ack: Acknowledgment) {
logger.info("Message received {}", consumerRecord)
ack.acknowledge()
}
}
After annotating our method with KafkaListener
we are going to log the consumerRecord
and acknowledge (commit the message), so we only consume once each message.
Results
Now for the results, I'm going to run both applications and send a post request with the JSON object using insomnia to the producer service that is going to send the message to Kafka and it's going to be consumed by the consumer.
Now we have the producer logs showing us that the message was sent to the Topic
Pay attention to the Trace ID b79dc0a696fa6550
that is going to be the same as even though we are receiving the message from another service
The message is too big to fit on my monitor so I'm going to pass it here as plain text
2022-02-07 22:27:09.178 INFO [,b79dc0a696fa6550,d95cf5541abb5fb9] 83720 --- [ntainer#0-0-C-1] c.m.k.c.K.infra.consumer.Consumer : Message received ConsumerRecord(topic = product, partition = 0, leaderEpoch = 0, offset = 104, CreateTime = 1644283629123, serialized key size = -1, serialized value size = 42, headers = RecordHeaders(headers = [RecordHeader(key = X-Custom-Header, value = [67, 117, 115, 116, 111, 109, 32, 104, 101, 97, 100, 101, 114, 32, 104, 101, 114, 101]), RecordHeader(key = spring_json_header_types, value = [123, 34, 88, 45, 67, 117, 115, 116, 111, 109, 45, 72, 101, 97, 100, 101, 114, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false), key = null, value = Product(name=Test for Dev.To, sku=1232343))
There we have it, our message was propagated to our Kafka consumer and converted from JSON to our Product entity.
Top comments (1)
Very good