In this part, we'll explore how to set up AMQP (Advanced Message Queuing Protocol) with RabbitMQ for asynchronous
message processing. This enables a worker pattern where HTTP requests queue messages that are processed asynchronously.
Architecture Overview
The messaging system consists of:
- MessageBroker: Interface defining message broker operations
- RabbitMqMessageBroker: RabbitMQ implementation using the Kourier AMQP client
- MessageHandler: Interface for message handlers
- SendNotificationHandler: Handler for processing notification events
- SendNotificationEvent: Data class representing notification events
Constants Configuration
File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/Constants.kt
object Constants {
const val RABBITMQ_EXCHANGE = "notifications_exchange"
const val RABBITMQ_QUEUE = "notifications_queue"
const val RABBITMQ_ROUTING_KEY = "notifications.send"
}
These constants define:
- Exchange name for routing messages
- Queue name for storing messages
- Routing key for binding the queue to the exchange
MessageBroker Interface
File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/MessageBroker.kt
interface MessageBroker {
suspend fun initialize()
suspend fun publish(exchange: String, routingKey: String, message: String)
suspend fun startConsuming(queue: String, handler: MessageHandler)
}
This interface defines three core operations:
-
initialize(): Set up the connection and declare exchanges/queues -
publish(): Send a message to an exchange with a routing key -
startConsuming(): Start consuming messages from a queue with a handler
RabbitMQ Implementation
File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/RabbitMqMessageBroker.kt
class RabbitMqMessageBroker(
private val coroutineScope: CoroutineScope,
private val host: String,
private val port: Int,
private val user: String,
private val password: String,
) : MessageBroker {
private lateinit var amqpConnection: AMQPConnection
private lateinit var amqpChannel: AMQPChannel
override suspend fun initialize() {
amqpConnection = createRobustAMQPConnection(coroutineScope) {
server {
host = this@RabbitMqMessageBroker.host
port = this@RabbitMqMessageBroker.port
user = this@RabbitMqMessageBroker.user
password = this@RabbitMqMessageBroker.password
}
}
amqpChannel = amqpConnection.openChannel().also {
it.exchangeDeclare {
name = Constants.RABBITMQ_EXCHANGE
type = BuiltinExchangeType.TOPIC
}
it.queueDeclare {
name = Constants.RABBITMQ_QUEUE
durable = true
}
it.queueBind {
queue = Constants.RABBITMQ_QUEUE
exchange = Constants.RABBITMQ_EXCHANGE
routingKey = Constants.RABBITMQ_ROUTING_KEY
}
}
}
override suspend fun publish(exchange: String, routingKey: String, message: String) {
amqpChannel.basicPublish(
body = message.toByteArray(),
exchange = exchange,
routingKey = routingKey,
properties = properties {
deliveryMode = 2u // Make message persistent
}
)
}
override suspend fun startConsuming(queue: String, handler: MessageHandler) {
amqpChannel.basicConsume(
queue = queue,
noAck = false,
onDelivery = { delivery ->
handler(delivery.message.routingKey, delivery.message.body.decodeToString())
amqpChannel.basicAck(delivery.message.deliveryTag, false)
}
)
}
}
Initialization Flow
-
Connection Creation:
- Uses
createRobustAMQPConnection()for automatic reconnection - Configured with RabbitMQ server credentials
- Scoped to a coroutine scope for lifecycle management
- Uses
-
Channel Setup:
- Opens an AMQP channel on the connection
- Declares a TOPIC exchange for flexible routing
- Declares a durable queue (survives server restarts)
- Binds the queue to the exchange with a routing key
-
Publishing Messages:
- Converts message string to bytes
- Sets delivery mode to 2 (persistent messages)
- Routes message via exchange and routing key
-
Consuming Messages:
- Sets up a consumer on the queue
- Uses manual acknowledgment (
noAck = false) - Delegates message handling to the provided handler
- Acknowledges message after successful processing (
basicAck)
Using the Kourier AMQP Client
The project uses the Kourier AMQP client (dev.kourier:amqp-client-robust), which
provides:
- Kotlin Multiplatform support
- Native compatibility (JVM and Native targets)
- Robust connection with automatic reconnection
- Coroutine-based API
Dependency Configuration
In build.gradle.kts:
sourceSets {
commonMain.dependencies {
implementation(libs.amqp)
// ... other dependencies
}
}
And in gradle/libs.versions.toml:
[versions]
amqp = "0.4.0"
[libraries]
amqp = { module = "dev.kourier:amqp-client-robust", version.ref = "amqp" }
Message Handler Interface
File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/MessageHandler.kt
interface MessageHandler {
suspend operator fun invoke(routingKey: String, body: String)
}
The handler uses the invoke operator, allowing instances to be called like functions. It receives:
-
routingKey: The routing key of the message -
body: The message body as a string
Notification Event and Handler
Event Data Class (
src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/handlers/SendNotificationEvent.kt):
@Serializable
data class SendNotificationEvent(
val title: String,
val body: String,
val token: String,
)
Handler Implementation (
src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/messaging/handlers/SendNotificationHandler.kt):
class SendNotificationHandler(
private val notificationService: NotificationService,
) : MessageHandler {
override suspend fun invoke(routingKey: String, body: String) {
val event = Serialization.json.decodeFromString<SendNotificationEvent>(body)
notificationService.sendNotification(
token = event.token,
title = event.title,
body = event.body,
)
}
}
Handler Flow
- Receives the message body as JSON string
- Deserializes to
SendNotificationEventusing Kotlinx Serialization - Calls the
NotificationServiceto send the notification - The service is injected via constructor (dependency injection)
Serialization Setup
File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/Serialization.kt
object Serialization {
val json = Json {
ignoreUnknownKeys = true
explicitNulls = false
}
}
This configured JSON instance is used for:
- Serializing events before publishing to RabbitMQ
- Deserializing events when consuming messages
- Consistent JSON handling across the application
Configuration:
-
ignoreUnknownKeys = true: Allows adding fields without breaking old consumers -
explicitNulls = false: Omits null values from JSON output
Integration with Application
File: src/commonMain/kotlin/me/nathanfallet/ktornativeworkertutorial/config/MessageBroker.kt
fun Application.configureMessageBroker() = runBlocking {
val messageBroker by inject<MessageBroker>()
messageBroker.initialize()
messageBroker.startConsuming(Constants.RABBITMQ_QUEUE, get<SendNotificationHandler>())
}
This configuration function:
- Injects the
MessageBrokerinstance from Koin - Initializes the connection and channel
- Starts consuming messages with the
SendNotificationHandler
Environment Configuration
The RabbitMQ connection is configured via environment variables in the Koin module:
single<MessageBroker> {
RabbitMqMessageBroker(
coroutineScope = this@mainModule,
host = getEnv("RABBITMQ_HOST") ?: "localhost",
port = getEnv("RABBITMQ_PORT")?.toIntOrNull() ?: 5672,
user = getEnv("RABBITMQ_USER") ?: "guest",
password = getEnv("RABBITMQ_PASSWORD") ?: "guest",
)
}
Default values:
- Host:
localhost - Port:
5672 - User:
guest - Password:
guest
Message Flow
- HTTP request arrives at the API endpoint
- Route publishes a
SendNotificationEventto RabbitMQ - Message is stored in the queue
- Consumer receives the message
-
SendNotificationHandlerdeserializes and processes it -
NotificationServicesends the FCM notification
This architecture provides:
- Decoupling of HTTP request handling from notification sending
- Reliability through message persistence
- Scalability through async processing
- Error isolation (notification failures don't fail HTTP requests)
Summary
The AMQP setup demonstrates:
- Clean interface-based design for message brokers
- RabbitMQ integration with Kourier AMQP client
- Message handler pattern for processing events
- Serialization with Kotlinx Serialization
- Environment-based configuration
- Worker pattern for async processing
In the next part, we'll explore how to define HTTP routes that publish messages to this message broker.
Top comments (0)