DEV Community

Nathan Fallet
Nathan Fallet

Posted on

Part 3: AMQP Setup and Calls - Ktor Native Worker Tutorial

In this part, we'll explore how to set up AMQP (Advanced Message Queuing Protocol) with RabbitMQ for asynchronous
message processing. This enables a worker pattern where HTTP requests queue messages that are processed asynchronously.

Architecture Overview

The messaging system consists of:

  • MessageBroker: Interface defining message broker operations
  • RabbitMqMessageBroker: RabbitMQ implementation using the Kourier AMQP client
  • MessageHandler: Interface for message handlers
  • SendNotificationHandler: Handler for processing notification events
  • SendNotificationEvent: Data class representing notification events

Constants Configuration

File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/Constants.kt

object Constants {

    const val RABBITMQ_EXCHANGE = "notifications_exchange"
    const val RABBITMQ_QUEUE = "notifications_queue"
    const val RABBITMQ_ROUTING_KEY = "notifications.send"

}
Enter fullscreen mode Exit fullscreen mode

These constants define:

  • Exchange name for routing messages
  • Queue name for storing messages
  • Routing key for binding the queue to the exchange

MessageBroker Interface

File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/MessageBroker.kt

interface MessageBroker {
    suspend fun initialize()
    suspend fun publish(exchange: String, routingKey: String, message: String)
    suspend fun startConsuming(queue: String, handler: MessageHandler)
}
Enter fullscreen mode Exit fullscreen mode

This interface defines three core operations:

  • initialize(): Set up the connection and declare exchanges/queues
  • publish(): Send a message to an exchange with a routing key
  • startConsuming(): Start consuming messages from a queue with a handler

RabbitMQ Implementation

File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/RabbitMqMessageBroker.kt

class RabbitMqMessageBroker(
    private val coroutineScope: CoroutineScope,
    private val host: String,
    private val port: Int,
    private val user: String,
    private val password: String,
) : MessageBroker {
    private lateinit var amqpConnection: AMQPConnection
    private lateinit var amqpChannel: AMQPChannel

    override suspend fun initialize() {
        amqpConnection = createRobustAMQPConnection(coroutineScope) {
            server {
                host = this@RabbitMqMessageBroker.host
                port = this@RabbitMqMessageBroker.port
                user = this@RabbitMqMessageBroker.user
                password = this@RabbitMqMessageBroker.password
            }
        }

        amqpChannel = amqpConnection.openChannel().also {
            it.exchangeDeclare {
                name = Constants.RABBITMQ_EXCHANGE
                type = BuiltinExchangeType.TOPIC
            }
            it.queueDeclare {
                name = Constants.RABBITMQ_QUEUE
                durable = true
            }
            it.queueBind {
                queue = Constants.RABBITMQ_QUEUE
                exchange = Constants.RABBITMQ_EXCHANGE
                routingKey = Constants.RABBITMQ_ROUTING_KEY
            }
        }
    }

    override suspend fun publish(exchange: String, routingKey: String, message: String) {
        amqpChannel.basicPublish(
            body = message.toByteArray(),
            exchange = exchange,
            routingKey = routingKey,
            properties = properties {
                deliveryMode = 2u // Make message persistent
            }
        )
    }

    override suspend fun startConsuming(queue: String, handler: MessageHandler) {
        amqpChannel.basicConsume(
            queue = queue,
            noAck = false,
            onDelivery = { delivery ->
                handler(delivery.message.routingKey, delivery.message.body.decodeToString())
                amqpChannel.basicAck(delivery.message.deliveryTag, false)
            }
        )
    }
}
Enter fullscreen mode Exit fullscreen mode

Initialization Flow

  1. Connection Creation:

    • Uses createRobustAMQPConnection() for automatic reconnection
    • Configured with RabbitMQ server credentials
    • Scoped to a coroutine scope for lifecycle management
  2. Channel Setup:

    • Opens an AMQP channel on the connection
    • Declares a TOPIC exchange for flexible routing
    • Declares a durable queue (survives server restarts)
    • Binds the queue to the exchange with a routing key
  3. Publishing Messages:

    • Converts message string to bytes
    • Sets delivery mode to 2 (persistent messages)
    • Routes message via exchange and routing key
  4. Consuming Messages:

    • Sets up a consumer on the queue
    • Uses manual acknowledgment (noAck = false)
    • Delegates message handling to the provided handler
    • Acknowledges message after successful processing (basicAck)

Using the Kourier AMQP Client

The project uses the Kourier AMQP client (dev.kourier:amqp-client-robust), which
provides:

  • Kotlin Multiplatform support
  • Native compatibility (JVM and Native targets)
  • Robust connection with automatic reconnection
  • Coroutine-based API

Dependency Configuration

In build.gradle.kts:

sourceSets {
    commonMain.dependencies {
        implementation(libs.amqp)
        // ... other dependencies
    }
}
Enter fullscreen mode Exit fullscreen mode

And in gradle/libs.versions.toml:

[versions]
amqp = "0.4.0"

[libraries]
amqp = { module = "dev.kourier:amqp-client-robust", version.ref = "amqp" }
Enter fullscreen mode Exit fullscreen mode

Message Handler Interface

File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/MessageHandler.kt

interface MessageHandler {
    suspend operator fun invoke(routingKey: String, body: String)
}
Enter fullscreen mode Exit fullscreen mode

The handler uses the invoke operator, allowing instances to be called like functions. It receives:

  • routingKey: The routing key of the message
  • body: The message body as a string

Notification Event and Handler

Event Data Class (
src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/handlers/SendNotificationEvent.kt):

@Serializable
data class SendNotificationEvent(
    val title: String,
    val body: String,
    val token: String,
)
Enter fullscreen mode Exit fullscreen mode

Handler Implementation (
src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/handlers/SendNotificationHandler.kt):

class SendNotificationHandler(
    private val notificationService: NotificationService,
) : MessageHandler {
    override suspend fun invoke(routingKey: String, body: String) {
        val event = Serialization.json.decodeFromString<SendNotificationEvent>(body)
        notificationService.sendNotification(
            token = event.token,
            title = event.title,
            body = event.body,
        )
    }
}
Enter fullscreen mode Exit fullscreen mode

Handler Flow

  1. Receives the message body as JSON string
  2. Deserializes to SendNotificationEvent using Kotlinx Serialization
  3. Calls the NotificationService to send the notification
  4. The service is injected via constructor (dependency injection)

Serialization Setup

File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/Serialization.kt

object Serialization {
    val json = Json {
        ignoreUnknownKeys = true
        explicitNulls = false
    }
}
Enter fullscreen mode Exit fullscreen mode

This configured JSON instance is used for:

  • Serializing events before publishing to RabbitMQ
  • Deserializing events when consuming messages
  • Consistent JSON handling across the application

Configuration:

  • ignoreUnknownKeys = true: Allows adding fields without breaking old consumers
  • explicitNulls = false: Omits null values from JSON output

Integration with Application

File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/config/MessageBroker.kt

fun Application.configureMessageBroker() = runBlocking {
    val messageBroker by inject<MessageBroker>()
    messageBroker.initialize()
    messageBroker.startConsuming(Constants.RABBITMQ_QUEUE, get<SendNotificationHandler>())
}
Enter fullscreen mode Exit fullscreen mode

This configuration function:

  1. Injects the MessageBroker instance from Koin
  2. Initializes the connection and channel
  3. Starts consuming messages with the SendNotificationHandler

Environment Configuration

The RabbitMQ connection is configured via environment variables in the Koin module:

single<MessageBroker> {
    RabbitMqMessageBroker(
        coroutineScope = this@mainModule,
        host = getEnv("RABBITMQ_HOST") ?: "localhost",
        port = getEnv("RABBITMQ_PORT")?.toIntOrNull() ?: 5672,
        user = getEnv("RABBITMQ_USER") ?: "guest",
        password = getEnv("RABBITMQ_PASSWORD") ?: "guest",
    )
}
Enter fullscreen mode Exit fullscreen mode

Default values:

  • Host: localhost
  • Port: 5672
  • User: guest
  • Password: guest

Message Flow

  1. HTTP request arrives at the API endpoint
  2. Route publishes a SendNotificationEvent to RabbitMQ
  3. Message is stored in the queue
  4. Consumer receives the message
  5. SendNotificationHandler deserializes and processes it
  6. NotificationService sends the FCM notification

This architecture provides:

  • Decoupling of HTTP request handling from notification sending
  • Reliability through message persistence
  • Scalability through async processing
  • Error isolation (notification failures don't fail HTTP requests)

Summary

The AMQP setup demonstrates:

  • Clean interface-based design for message brokers
  • RabbitMQ integration with Kourier AMQP client
  • Message handler pattern for processing events
  • Serialization with Kotlinx Serialization
  • Environment-based configuration
  • Worker pattern for async processing

In the next part, we'll explore how to define HTTP routes that publish messages to this message broker.

Top comments (0)