You're using Kotlin. You love coroutines. Your entire codebase is suspend functions, Flow, and structured concurrency.
Then you need to consume messages from RabbitMQ.
And suddenly, you're writing this:
runBlocking {
processMessage(message)
}
Congratulations. You just mass-deployed an anti-pattern.
The problem nobody warns you about
The official RabbitMQ Java client was designed in 2007. Kotlin coroutines were released in 2018. These two worlds don't mix well.
Here's what consuming messages looks like with the Java client:
val factory = ConnectionFactory().apply {
host = "localhost"
username = "guest"
password = "guest"
}
val connection = factory.newConnection()
val channel = connection.createChannel()
channel.basicConsume("orders", false, object : DefaultConsumer(channel) {
override fun handleDelivery(
consumerTag: String?,
envelope: Envelope?,
properties: AMQP.BasicProperties?,
body: ByteArray?
) {
val message = String(body!!, Charsets.UTF_8)
// You want to call a suspend function here
// But you can't. This is a callback on a Java thread.
processOrder(message) // Compile error if suspend
channel.basicAck(envelope!!.deliveryTag, false)
}
})
See the problem? handleDelivery is a regular function called by the Java client's internal thread pool. You can't just sprinkle suspend on it.
The "solution" everyone uses
So you reach for runBlocking:
override fun handleDelivery(
consumerTag: String?,
envelope: Envelope?,
properties: AMQP.BasicProperties?,
body: ByteArray?
) {
val message = String(body!!, Charsets.UTF_8)
runBlocking {
processOrder(message) // Compiles!
updateInventory(message)
notifyShipping(message)
}
channel.basicAck(envelope!!.deliveryTag, false)
}
It compiles. It runs. You ship it.
But here's what you actually did:
-
Blocked a thread —
runBlockingparks the current thread until all coroutines complete - Defeated the purpose of coroutines — You're not saving resources; you're using MORE (coroutine overhead + blocked thread)
- Created backpressure issues — The Java client has a limited thread pool. Block those threads, and message consumption stalls.
You wanted non-blocking message processing. You got blocking message processing with extra steps.
"But I use launch instead!"
Some developers try to be clever:
val scope = CoroutineScope(Dispatchers.Default)
override fun handleDelivery(
consumerTag: String?,
envelope: Envelope?,
properties: AMQP.BasicProperties?,
body: ByteArray?
) {
val message = String(body!!, Charsets.UTF_8)
scope.launch {
processOrder(message)
updateInventory(message)
notifyShipping(message)
}
channel.basicAck(envelope!!.deliveryTag, false) // ACK before processing!
}
Now you have a different problem: you're acknowledging the message before processing completes. If your app crashes mid-processing, that message is gone forever.
Move the ACK inside the coroutine? Now you're calling channel.basicAck() from a different thread than the one that received the delivery — and the Java client's channels are not thread-safe.
There's no clean way out.
What coroutine-first actually looks like
Here's the same consumer with Kourier:
fun main() = runBlocking {
val connection = createAMQPConnection(this, "amqp://guest:guest@localhost:5672/")
val channel = connection.openChannel()
channel.queueDeclare("orders", durable = true)
val consumer = channel.basicConsume("orders")
for (delivery in consumer) {
val message = delivery.message.body.decodeToString()
processOrder(message) // suspend function - works!
updateInventory(message) // suspend function - works!
notifyShipping(message) // suspend function - works!
channel.basicAck(delivery.message)
}
}
Wait — there's a runBlocking at the top! Am I a hypocrite?
No. That's just the entry point — your main function needs somewhere to start the coroutine world. The difference is inside the loop: every suspend function you call actually suspends. The thread isn't blocked while processOrder waits for a database response. Other coroutines can run. That's the whole point.
With the Java client's callback, you're stuck in a non-suspend function. Here, you're in a proper coroutine context where suspension works as intended.
No callbacks. No thread-safety gymnastics. Just a for loop over messages.
Prefer callbacks? Kourier also supports a callback-style API with full suspend support:
channel.basicConsume(
queue = "orders",
onDelivery = { delivery ->
// This is a suspend lambda — call any suspend function directly
val message = delivery.message.body.decodeToString()
processOrder(message)
updateInventory(message)
notifyShipping(message)
channel.basicAck(delivery.message)
}
)
Unlike the Java client's callback, onDelivery is a proper suspend lambda. No runBlocking wrapper needed. No thread-safety tricks. Just suspend functions doing what suspend functions do.
Want automatic reconnection when the network hiccups? One word change:
val connection = createRobustAMQPConnection(this, "amqp://guest:guest@localhost:5672/")
// Everything else stays the same
The robust client handles reconnection and channel recovery automatically. No custom retry logic, no connection state machines.
The null safety bonus
Did you notice the !! operators in the Java client code?
val message = String(body!!, Charsets.UTF_8)
channel.basicAck(envelope!!.deliveryTag, false)
The Java client returns platform types. Every parameter in handleDelivery is technically nullable from Kotlin's perspective, even though they're never actually null in practice.
You either:
- Scatter
!!everywhere (and pray) - Add defensive
?.letblocks (verbose) - Suppress warnings and hope for the best
Kourier gives you actual Kotlin types:
for (delivery in consumer) {
val body: ByteArray = delivery.message.body // Non-nullable
val message: String = body.decodeToString() // Clean conversion
val tag: Long = delivery.message.deliveryTag // Non-nullable
}
No force unwrapping. No null checks. Just types that mean what they say.
"But is it production ready?"
Fair question. Here's what Kourier brings to production:
- Automatic reconnection — Connection drops happen. Kourier reconnects and resumes consuming.
- Publisher confirms — Ensure messages reach the broker before proceeding.
- OpenTelemetry integration — Distributed tracing out of the box.
- Listed on RabbitMQ's official client page — Not some random GitHub repo.
It's not a wrapper around the Java client. It's a pure Kotlin implementation of the AMQP 0-9-1 protocol, built for coroutines from day one.
The migration path
You don't have to rewrite everything at once.
Start with one consumer. The one that's been causing backpressure issues. The one where you've been fighting thread pools. Replace the Java client callback with Kourier's suspend-based consumer.
See how it feels. Check the metrics. Then decide.
// build.gradle.kts
implementation("dev.kourier:amqp-client:0.3.2")
// If you want automatic reconnection
implementation("dev.kourier:amqp-client-robust:0.3.2")
Stop fighting your language
Kotlin gave you coroutines for a reason. Structured concurrency, cancellation, backpressure — these aren't features you should have to work around.
Every runBlocking in a message handler is a sign that your tools don't match your language. Every !! on a delivery callback is friction that shouldn't exist.
You wouldn't use a Java HTTP client in a Ktor app. Why use a Java AMQP client in a coroutine-based system?
Kourier is a pure Kotlin AMQP/RabbitMQ client with native coroutine support. Star it, try it, break it — and open an issue when you do.
Top comments (0)