DEV Community

Nathan Fallet
Nathan Fallet

Posted on

Stop wrapping your RabbitMQ code in runBlocking

You're using Kotlin. You love coroutines. Your entire codebase is suspend functions, Flow, and structured concurrency.

Then you need to consume messages from RabbitMQ.

And suddenly, you're writing this:

runBlocking {
    processMessage(message)
}
Enter fullscreen mode Exit fullscreen mode

Congratulations. You just mass-deployed an anti-pattern.

The problem nobody warns you about

The official RabbitMQ Java client was designed in 2007. Kotlin coroutines were released in 2018. These two worlds don't mix well.

Here's what consuming messages looks like with the Java client:

val factory = ConnectionFactory().apply {
    host = "localhost"
    username = "guest"
    password = "guest"
}

val connection = factory.newConnection()
val channel = connection.createChannel()

channel.basicConsume("orders", false, object : DefaultConsumer(channel) {
    override fun handleDelivery(
        consumerTag: String?,
        envelope: Envelope?,
        properties: AMQP.BasicProperties?,
        body: ByteArray?
    ) {
        val message = String(body!!, Charsets.UTF_8)

        // You want to call a suspend function here
        // But you can't. This is a callback on a Java thread.

        processOrder(message) // Compile error if suspend

        channel.basicAck(envelope!!.deliveryTag, false)
    }
})
Enter fullscreen mode Exit fullscreen mode

See the problem? handleDelivery is a regular function called by the Java client's internal thread pool. You can't just sprinkle suspend on it.

The "solution" everyone uses

So you reach for runBlocking:

override fun handleDelivery(
    consumerTag: String?,
    envelope: Envelope?,
    properties: AMQP.BasicProperties?,
    body: ByteArray?
) {
    val message = String(body!!, Charsets.UTF_8)

    runBlocking {
        processOrder(message)  // Compiles!
        updateInventory(message)
        notifyShipping(message)
    }

    channel.basicAck(envelope!!.deliveryTag, false)
}
Enter fullscreen mode Exit fullscreen mode

It compiles. It runs. You ship it.

But here's what you actually did:

  1. Blocked a threadrunBlocking parks the current thread until all coroutines complete
  2. Defeated the purpose of coroutines — You're not saving resources; you're using MORE (coroutine overhead + blocked thread)
  3. Created backpressure issues — The Java client has a limited thread pool. Block those threads, and message consumption stalls.

You wanted non-blocking message processing. You got blocking message processing with extra steps.

"But I use launch instead!"

Some developers try to be clever:

val scope = CoroutineScope(Dispatchers.Default)

override fun handleDelivery(
    consumerTag: String?,
    envelope: Envelope?,
    properties: AMQP.BasicProperties?,
    body: ByteArray?
) {
    val message = String(body!!, Charsets.UTF_8)

    scope.launch {
        processOrder(message)
        updateInventory(message)
        notifyShipping(message)
    }

    channel.basicAck(envelope!!.deliveryTag, false)  // ACK before processing!
}
Enter fullscreen mode Exit fullscreen mode

Now you have a different problem: you're acknowledging the message before processing completes. If your app crashes mid-processing, that message is gone forever.

Move the ACK inside the coroutine? Now you're calling channel.basicAck() from a different thread than the one that received the delivery — and the Java client's channels are not thread-safe.

There's no clean way out.

What coroutine-first actually looks like

Here's the same consumer with Kourier:

fun main() = runBlocking {
    val connection = createAMQPConnection(this, "amqp://guest:guest@localhost:5672/")
    val channel = connection.openChannel()

    channel.queueDeclare("orders", durable = true)
    val consumer = channel.basicConsume("orders")

    for (delivery in consumer) {
        val message = delivery.message.body.decodeToString()

        processOrder(message)      // suspend function - works!
        updateInventory(message)   // suspend function - works!
        notifyShipping(message)    // suspend function - works!

        channel.basicAck(delivery.message)
    }
}
Enter fullscreen mode Exit fullscreen mode

Wait — there's a runBlocking at the top! Am I a hypocrite?

No. That's just the entry point — your main function needs somewhere to start the coroutine world. The difference is inside the loop: every suspend function you call actually suspends. The thread isn't blocked while processOrder waits for a database response. Other coroutines can run. That's the whole point.

With the Java client's callback, you're stuck in a non-suspend function. Here, you're in a proper coroutine context where suspension works as intended.

No callbacks. No thread-safety gymnastics. Just a for loop over messages.

Prefer callbacks? Kourier also supports a callback-style API with full suspend support:

channel.basicConsume(
    queue = "orders",
    onDelivery = { delivery ->
        // This is a suspend lambda — call any suspend function directly
        val message = delivery.message.body.decodeToString()
        processOrder(message)
        updateInventory(message)
        notifyShipping(message)
        channel.basicAck(delivery.message)
    }
)
Enter fullscreen mode Exit fullscreen mode

Unlike the Java client's callback, onDelivery is a proper suspend lambda. No runBlocking wrapper needed. No thread-safety tricks. Just suspend functions doing what suspend functions do.

Want automatic reconnection when the network hiccups? One word change:

val connection = createRobustAMQPConnection(this, "amqp://guest:guest@localhost:5672/")
// Everything else stays the same
Enter fullscreen mode Exit fullscreen mode

The robust client handles reconnection and channel recovery automatically. No custom retry logic, no connection state machines.

The null safety bonus

Did you notice the !! operators in the Java client code?

val message = String(body!!, Charsets.UTF_8)
channel.basicAck(envelope!!.deliveryTag, false)
Enter fullscreen mode Exit fullscreen mode

The Java client returns platform types. Every parameter in handleDelivery is technically nullable from Kotlin's perspective, even though they're never actually null in practice.

You either:

  • Scatter !! everywhere (and pray)
  • Add defensive ?.let blocks (verbose)
  • Suppress warnings and hope for the best

Kourier gives you actual Kotlin types:

for (delivery in consumer) {
    val body: ByteArray = delivery.message.body        // Non-nullable
    val message: String = body.decodeToString()        // Clean conversion
    val tag: Long = delivery.message.deliveryTag       // Non-nullable
}
Enter fullscreen mode Exit fullscreen mode

No force unwrapping. No null checks. Just types that mean what they say.

"But is it production ready?"

Fair question. Here's what Kourier brings to production:

  • Automatic reconnection — Connection drops happen. Kourier reconnects and resumes consuming.
  • Publisher confirms — Ensure messages reach the broker before proceeding.
  • OpenTelemetry integration — Distributed tracing out of the box.
  • Listed on RabbitMQ's official client page — Not some random GitHub repo.

It's not a wrapper around the Java client. It's a pure Kotlin implementation of the AMQP 0-9-1 protocol, built for coroutines from day one.

The migration path

You don't have to rewrite everything at once.

Start with one consumer. The one that's been causing backpressure issues. The one where you've been fighting thread pools. Replace the Java client callback with Kourier's suspend-based consumer.

See how it feels. Check the metrics. Then decide.

// build.gradle.kts
implementation("dev.kourier:amqp-client:0.3.2")

// If you want automatic reconnection
implementation("dev.kourier:amqp-client-robust:0.3.2")
Enter fullscreen mode Exit fullscreen mode

Stop fighting your language

Kotlin gave you coroutines for a reason. Structured concurrency, cancellation, backpressure — these aren't features you should have to work around.

Every runBlocking in a message handler is a sign that your tools don't match your language. Every !! on a delivery callback is friction that shouldn't exist.

You wouldn't use a Java HTTP client in a Ktor app. Why use a Java AMQP client in a coroutine-based system?


Kourier is a pure Kotlin AMQP/RabbitMQ client with native coroutine support. Star it, try it, break it — and open an issue when you do.

Top comments (0)