DEV Community

Pedro Magnus
Pedro Magnus

Posted on

Setting up Spring Kafka With Kotlin

Creating Spring project

Description

This tutorial is going to show how to create a spring Kotlin application from scratch to create a Kafka consumer and producer and set up Kafka with docker. We also going to user Sleuth to trace our message over the services, so we can keep a track of each transaction.

Installation

To create a new spring project I'm going to use the start.spring.io with Kotlin.

The first service is going to be the producer as the following image:

Image description

The consumer witch is going to be our second sever is going to have the same configurations as the producer, but with a consumer on its name.

We are going to use sleuth because it traces all our requests over the services, and prints them on our logs.

now we are installing the logback lib to the project for logging. All we have to do is add the libs to the pom.xml so maven will know we want it as a dependency, and run nvm clean install to install it.

<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-core</artifactId>
    <version>1.2.6</version>
</dependency>
Enter fullscreen mode Exit fullscreen mode

Setting Up Kafka with Docker

In the first step, we are going to set up Kafka with docker, using a docker-compose.yml file.

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Enter fullscreen mode Exit fullscreen mode

The first service to be installed with our docker-compose is the zookeeper. The zookeeper allows you to create a Kafka cluster, and it's going to take care of the connection and distribution. The second service is Kafka itself.

Configuration
  • PLAINTEXT: We are setting the protocols to plaintext, which means that it's non-encrypted messages. It's not recommended for production, but it's alright for our tests.
  • Kafka Exposed Port: 29092
  • Zookeeper Exposed Port: 22181
  • Kafka Replication Factor: The replication factor is the number of clusters that the partitions are going to be replicated, in our case only on our main cluster 1.

For a better understanding of the variables and Kafka itself, I would recommend this article

Configuring The Producer

To be able to use produce Kafka messages we first need to implement a few configurations so we can send them to our Kafka server.

Properties Config

First, we are going to change the application.properties file to an application.yml, just because I like it better if you want you can keep the application.properties.

For the properties configurations, we are going to create a variable to the Kafka address and topic.

Kafka:
  bootstrapAddress: localhost:29092
  topics:
    product: product
Enter fullscreen mode Exit fullscreen mode

We are using the localhost:29092 as it was set on our container.

Kafka Config

For the Kafka config file, we will use the address to create a Map that has our server address and then create a new topic with the name we defined on our application.yml file.
As we are creating a @Configuration it's going to be injected on running time to our code, so after spring finishing booting the topic is going to be created.

import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.NewTopic
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.KafkaAdmin

@Configuration
class KafkaConfig(
    @Value("\${kafka.bootstrapAddress}")
    private val servers: String,
    @Value("\${kafka.topics.product}")
    private val topic: String
) {

    @Bean
    fun kafkaAdmin(): KafkaAdmin {
        val configs: MutableMap<String, Any?> = HashMap()
        configs[AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG] = servers
        return KafkaAdmin(configs)
    }

    @Bean
    fun porduto(): NewTopic {
        return NewTopic(topic, 1, 1.toShort())
    }
}
Enter fullscreen mode Exit fullscreen mode

Producer Config

Now we have the topic configuration, but we need to produce messages to post on the topic, for that we are going to create a producer.

import com.magnuspedro.kafka.producer.kafkaProducer.config.serializer.ProductSerializer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory


@Configuration
class KafkaProducerConfig(
    @Value("\${kafka.bootstrapAddress}")
    private val servers: String
) {
    @Bean
    fun producerFactory(): ProducerFactory<String, Any> {
        val configProps: MutableMap<String, Any> = HashMap()
        configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = servers
        configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ProductSerializer::class.java
        return DefaultKafkaProducerFactory(configProps)
    }

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, Any> {
        return KafkaTemplate(producerFactory())
    }
}
Enter fullscreen mode Exit fullscreen mode

The producer has even more configuration, it takes the server address, and two serializes. The key serializer is represented by the KEY_SERIALIZER_CLASS_CONFIG that is going to be a string and the VALUE_SERIALIZER_CLASS_CONFIG that is a custom serializer created to convert our product entity into JSON.

We are creating a KafkaTemplate implementation with our configured producer factory.

Custom serializer

For our custom serializer, we are going to use Jackosn to write a byte array from our entity. It has to be a byte array because we are implementing the Serializer Class from spring Kafka.

import com.fasterxml.jackson.databind.ObjectMapper
import com.magnuspedro.kafka.producer.kafkaProducer.entities.Product
import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.Serializer
import org.slf4j.LoggerFactory


class ProductSerializer : Serializer<Product> {
    private val objectMapper = ObjectMapper()
    private val log = LoggerFactory.getLogger(javaClass)

    override fun serialize(topic: String?, data: Product?): ByteArray? {
        log.info("Serializing...")
        return objectMapper.writeValueAsBytes(
            data ?: throw SerializationException("Error when serializing Product to ByteArray[]")
        )
    }

    override fun close() {}
}
Enter fullscreen mode Exit fullscreen mode

For the serialization, we are only accepting data that is not null, or else we are going to throw a SerializationException.

Our Product file is going to be a simple Kotlin data class

import com.fasterxml.jackson.annotation.JsonProperty

data class Product(
    @JsonProperty("name")
    val name: String,
    @JsonProperty("sku")
    val sku: String?
)
Enter fullscreen mode Exit fullscreen mode

We are adding the @JsonProperty annotation so Jackson knows the name of each field when it's going to convert it.

Test Controller

At last, we need something to send messages to the topic, I have chosen to create a controller, so we can change our objects and send as many as we want.

import com.magnuspedro.kafka.producer.kafkaProducer.entities.Product
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.support.KafkaHeaders
import org.springframework.messaging.Message
import org.springframework.messaging.support.MessageBuilder
import org.springframework.validation.annotation.Validated
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController


@RestController
@RequestMapping("/product")
class LogController(
    @Value("\${kafka.topics.product}") val topic: String,
    @Autowired
    private val kafkaTemplate: KafkaTemplate<String, Any>
) {
    private val log = LoggerFactory.getLogger(javaClass)

    @PostMapping
    fun post(@Validated @RequestBody product: Product): ResponseEntity<Any> {
        return try {
            log.info("Receiving product request")
            log.info("Sending message to Kafka {}", product)
            val message: Message<Product> = MessageBuilder
                .withPayload(product)
                .setHeader(KafkaHeaders.TOPIC, topic)
                .setHeader("X-Custom-Header", "Custom header here")
                .build()
            kafkaTemplate.send(message)
            log.info("Message sent with success")
            ResponseEntity.ok().build()
        } catch (e: Exception) {
            log.error("Exception: {}",e)
            ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Error to send message")
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

On the controller, we are going to create a Message type, so we can add Kafka headers and custom headers when sending the message to the topic. We are going to add only one custom header and see if it's going to come back to us on our producer.

That finishes all the configurations we need to implement a simple Kafka producer, to send JSON messages.

Kafka Consumer

For the Kafka consumer, we are going to need the same basic configurations as our producer, we need to change our application.properties to application.yml and create the same Product entity, so we can change it back from JSON.

If you are going to run both projects on the same machine you need to change on project port, to do so you need to add this config to the application.yml file.

logging:
  level:
    root: info
Enter fullscreen mode Exit fullscreen mode

Kafka Consumer Config

Our consumer config file is going to be similar to our producer.

import com.magnuspedro.kafka.consumer.KafkaConsumer.config.deserializer.ProductDeserializer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.listener.ContainerProperties


@EnableKafka
@Configuration
class KafkaConsumerConfig(
    @Value("\${kafka.bootstrapAddress}")
    private val servers: String
) {

    @Bean
    fun consumerFactory(): ConsumerFactory<String?, Any?> {
        val props: MutableMap<String, Any> = HashMap()
        props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = servers
        props[ConsumerConfig.GROUP_ID_CONFIG] = "ppr"
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ProductDeserializer::class.java
        props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
        return DefaultKafkaConsumerFactory(props)
    }

    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, Any>? {
        val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
        factory.consumerFactory = consumerFactory()
        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
        factory.containerProperties.isSyncCommits = true;
        return factory
    }
}
Enter fullscreen mode Exit fullscreen mode

Most of the configuration as said before is like our producer, and like it, we have a custom deserializer for the value.

Configuration

A committed message is a message that has already been read and confirmed to be consumed.

  • GROUP_ID_CONFIG: Is the identification of the group that the consumer belongs
  • AUTO_OFFSET_RESET_CONFIG: Set the configuration when Kafka consumer starts and has no committed messages, earliest meaning that it's going the get all the menages from the beginning and latest is going to get only the last message
  • AckMode.MANUAL_IMMEDIATE: Is setting the commit of the messages to be done manually

Custom Deserializer

For our custom deserializer, we are going to implement the Deserializer Class from Spring Kafka and override the deserialize method.

import com.fasterxml.jackson.databind.ObjectMapper
import com.magnuspedro.kafka.consumer.KafkaConsumer.entities.Product
import org.apache.kafka.common.errors.SerializationException
import org.apache.kafka.common.serialization.Deserializer
import org.slf4j.LoggerFactory
import kotlin.text.Charsets.UTF_8


class ProductDeserializer : Deserializer<Product> {
    private val objectMapper = ObjectMapper()
    private val log = LoggerFactory.getLogger(javaClass)

    override fun deserialize(topic: String?, data: ByteArray?): Product? {
        log.info("Deserializing...")
        return objectMapper.readValue(
            String(
                data ?: throw SerializationException("Error when deserializing byte[] to Product"), UTF_8
            ), Product::class.java
        )
    }

    override fun close() {}

}
Enter fullscreen mode Exit fullscreen mode

For the deserialization, we are only going to convert the byte array that we received to a string with the UTF_8 char-set and use Jackson to convert back to our entity. As the serializer, we are going to throw an error if the data is null.

Kafka Consumer

For consuming messages we first need to create a Consumer, and for that Spring Kafka offers an annotation that creates a listener for the topic.

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.support.Acknowledgment
import org.springframework.stereotype.Component

@Component
class Consumer {
    private val logger = LoggerFactory.getLogger(javaClass)

    @KafkaListener(topics = ["\${kafka.topics.product}"], groupId = "ppr")
    fun listenGroupFoo(consumerRecord: ConsumerRecord<Any, Any>, ack: Acknowledgment) {
        logger.info("Message received {}", consumerRecord)
        ack.acknowledge()
    }
}
Enter fullscreen mode Exit fullscreen mode

After annotating our method with KafkaListener we are going to log the consumerRecord and acknowledge (commit the message), so we only consume once each message.

Results

Now for the results, I'm going to run both applications and send a post request with the JSON object using insomnia to the producer service that is going to send the message to Kafka and it's going to be consumed by the consumer.

insomnia request

Now we have the producer logs showing us that the message was sent to the Topic

Producer Logs

Pay attention to the Trace ID b79dc0a696fa6550 that is going to be the same as even though we are receiving the message from another service

Consumer Logs

The message is too big to fit on my monitor so I'm going to pass it here as plain text

2022-02-07 22:27:09.178  INFO [,b79dc0a696fa6550,d95cf5541abb5fb9] 83720 --- [ntainer#0-0-C-1] c.m.k.c.K.infra.consumer.Consumer        : Message received ConsumerRecord(topic = product, partition = 0, leaderEpoch = 0, offset = 104, CreateTime = 1644283629123, serialized key size = -1, serialized value size = 42, headers = RecordHeaders(headers = [RecordHeader(key = X-Custom-Header, value = [67, 117, 115, 116, 111, 109, 32, 104, 101, 97, 100, 101, 114, 32, 104, 101, 114, 101]), RecordHeader(key = spring_json_header_types, value = [123, 34, 88, 45, 67, 117, 115, 116, 111, 109, 45, 72, 101, 97, 100, 101, 114, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false), key = null, value = Product(name=Test for Dev.To, sku=1232343))
Enter fullscreen mode Exit fullscreen mode

There we have it, our message was propagated to our Kafka consumer and converted from JSON to our Product entity.

Top comments (1)

Collapse
 
washington profile image
Washington

Very good