DEV Community

Vinicius Carvalho
Vinicius Carvalho

Posted on

3 2

Kotlin coroutine based KafkaProducer extension

Getting rid of Callback hell in KafkaProducer

Kotlin's coroutines provides a nice way to write async code. It makes it easy to write and compose asynchronous computation using a very light-weight model.

This post is not about what coroutines are, the link with the docs have a very deep and easy to read explanation of that. Instead I'm offering a solution to using KafkaProducer.send method.

The issue is send() leverages a Callback strategy, and we all know that there's a special place in hell for those who use callbacks.

Fortunately Kotlin coroutines offer a solution: suspendCoroutine function, that allows us to transform a callback into a suspend function call.

Receiver functions are also another nice treat of Kotlin language. It allows us to augment regular types with custom functions.

I decided to call the new function dispatch instead of send because I find a bit confusing when people decide to extend original function using the same name, and imports can get a bit messy.

So the extension function you need to write is very simple:

suspend inline fun <reified K : Any, reified V : Any> KafkaProducer<K, V>.dispatch(record: ProducerRecord<K, V>) =
    suspendCoroutine<RecordMetadata> { continuation ->
        val callback = Callback { metadata, exception ->
            if (metadata == null) {
                continuation.resumeWithException(exception!!)
            } else {
                continuation.resume(metadata)
            }
        }
        this.send(record, callback)
    }
Enter fullscreen mode Exit fullscreen mode

Now you can just use it from your regular KafkaProducer instance:

val props = Properties()
props["bootstrap.servers"] = "localhost:9092"
props["key.serializer"] = StringSerializer::class.java
props["value.serializer"] = JacksonSerializer::class.java
val kafkaProducer = KafkaProducer<String, SensorReading>(props)
async {
    kafkaProducer.dispatch(ProducerRecord("sample", SensorReading("Bedroom", 72.0, false)))
}

Enter fullscreen mode Exit fullscreen mode

Just remember that you can only call a suspend function within the boundaries of a coroutine, hence the need for async, same could be achieved with launch or runBlocking for testing.

Happy coding!

Sentry mobile image

Mobile Vitals: A first step to Faster Apps

Slow startup times, UI hangs, and frozen frames frustrate users—but they’re also fixable. Mobile Vitals help you measure and understand these performance issues so you can optimize your app’s speed and responsiveness. Learn how to use them to reduce friction and improve user experience.

Read the guide →

Top comments (2)

Collapse
 
asafmesika profile image
asaf mesika

Did you manage to git rid of Threads in Consumer side as well?

Collapse
 
gklijs profile image
Gerard Klijs

Consumer is not thread safe, so I don't see how, unless you can 'force' a coroutine to always run on the same underlying thread. But that would kind of defeat the purpose for coroutines.

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay