DEV Community

Rafael Berçam
Rafael Berçam

Posted on

5 1 1 1 1

🧪 Validando Mensagens no Kafka com Kotlin e Awaitility

Fala pessoal! Neste post, vou te guiar por um exemplo prático de como testar a publicação de mensagens em um tópico Kafka usando Kotlin, Awaitility e o KafkaConsumer. Esse tipo de validação é essencial em sistemas distribuídos para garantir que as mensagens sejam enviadas e recebidas corretamente.

📚 Estrutura do Projeto

A estrutura básica do projeto é assim:

.
├── src
│   ├── main
│   │    └── kotlin
│   │         └── api
│   │              └── KafkaProducerService.kt
│   └── test
│        └── kotlin
│             └── api
│                  └── KafkaApiTest.kt
└── pom.xml (ou build.gradle.kts para Kotlin)
Enter fullscreen mode Exit fullscreen mode

Certifique-se de incluir as seguintes dependências no seu pom.xml (Maven) ou build.gradle.kts (Gradle):

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.7.0</version>
</dependency>

<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <version>4.2.0</version>
</dependency>

<dependency>
    <groupId>com.fasterxml.jackson.module</groupId>
    <artifactId>jackson-module-kotlin</artifactId>
    <version>2.16.0</version>
</dependency>
Enter fullscreen mode Exit fullscreen mode

🤔 O Que é o Awaitility?

O Awaitility é uma biblioteca Java/Kotlin projetada para facilitar a espera por condições assíncronas em testes. Em vez de usar Thread.sleep() (o que é ineficiente), o Awaitility permite esperar de forma mais inteligente até que uma condição seja atendida.

✅ Por que usar Awaitility com Kafka?

  • Os consumidores Kafka não recebem mensagens imediatamente (processamento assíncrono).

  • Precisamos esperar até que a mensagem seja publicada no tópico e lida pelo consumidor.

📊 Exemplo Prático: Validando Transações Bancárias

Vamos criar um teste que:

  1. Publica uma mensagem de uma transação bancária em um tópico Kafka.

  2. Consome a mensagem do tópico.

  3. Valida se o conteúdo recebido está correto.

🛠️ O Modelo de Transação

data class TransacaoBancaria @JsonCreator constructor(
@JsonProperty("idTransacao") val idTransacao: String,
@JsonProperty("tipo") val tipo: String,
@JsonProperty("valor") val valor: Double,
@JsonProperty("contaOrigem") val contaOrigem: String,
@JsonProperty("contaDestino") val contaDestino: String,
@JsonProperty("dataHora") val dataHora: String
) {
companion object {
fun criarAleatoria(): TransacaoBancaria {
val tipos = listOf("TRANSFERENCIA", "PAGAMENTO", "DEPOSITO")
return TransacaoBancaria(
idTransacao = UUID.randomUUID().toString(),
tipo = tipos.random(),
valor = (100..10000).random().toDouble(),
contaOrigem = "${(100000..999999).random()}-${(0..9).random()}",
contaDestino = "${(100000..999999).random()}-${(0..9).random()}",
dataHora = java.time.LocalDateTime.now().toString()
)
}
}
}
view raw KafkaApiTest.kt hosted with ❤ by GitHub

📬 O Teste com Kafka e Awaitility

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KafkaApiTest {
private lateinit var kafkaProducer: KafkaProducerService
private lateinit var kafkaConsumer: KafkaConsumer<String, String>
private val topic = "transacoes-bancarias"
private val objectMapper = ObjectMapper() // Para serializar e desserializar JSON
@BeforeAll
fun setup() {
kafkaProducer = KafkaProducerService(topic)
// Configuração do consumidor Kafka
val props = Properties().apply {
put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
put(ConsumerConfig.GROUP_ID_CONFIG, "grupo-de-teste")
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java.name)
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") // Lê desde o início do tópico
}
kafkaConsumer = KafkaConsumer<String, String>(props)
kafkaConsumer.subscribe(listOf(topic))
}
@AfterAll
fun tearDown() {
kafkaProducer.close()
kafkaConsumer.close()
}
@Test
fun `deve enviar e validar transacao bancaria no Kafka`() {
val key = UUID.randomUUID().toString()
// Criação da transação bancária
val transacao = TransacaoBancaria.criarAleatoria()
// Serializa a transação em JSON
val message = objectMapper.writeValueAsString(transacao)
// Envia a transação para o Kafka
kafkaProducer.sendMessage(key, message)
// Aguarda e valida se a mensagem foi recebida no Kafka
await().atMost(Duration.ofSeconds(30)).untilAsserted {
val registros = kafkaConsumer.poll(Duration.ofMillis(500))
assertTrue(
registros.any {
it.key() == key && validaMensagem(it.value(), transacao)
},
"A transação não foi encontrada ou está incorreta no Kafka"
)
}
}
// Função auxiliar para validar a mensagem recebida
private fun validaMensagem(receivedMessage: String, expected: TransacaoBancaria): Boolean {
return try {
val transacaoRecebida = objectMapper.readValue(receivedMessage, TransacaoBancaria::class.java)
assertEquals(expected.idTransacao, transacaoRecebida.idTransacao, "ID da transação incorreto")
assertEquals(expected.tipo, transacaoRecebida.tipo, "Tipo de transação incorreto")
assertEquals(expected.valor, transacaoRecebida.valor, "Valor da transação incorreto")
assertEquals(expected.contaOrigem, transacaoRecebida.contaOrigem, "Conta de origem incorreta")
assertEquals(expected.contaDestino, transacaoRecebida.contaDestino, "Conta de destino incorreta")
assertEquals(expected.dataHora, transacaoRecebida.dataHora, "Data e hora incorretas")
true
} catch (e: Exception) {
println("Erro ao validar mensagem: ${e.message}")
false
}
}
}
view raw KafkaApiTest.kt hosted with ❤ by GitHub

🧐 Explicando o Teste

  1. Produzimos uma mensagem Kafka com kafkaProducer.sendMessage().

Tela do IntelliJ com teste executado com sucesso

  1. Consumimos com kafkaConsumer.poll().

  2. Usamos o await().untilAsserted para esperar até a mensagem ser validada.

Se a mensagem não for encontrada ou os dados estiverem incorretos, o teste falha com uma mensagem de erro clara. ✅

Tela do Docker com o tópico da transação feita pelo teste

📢 Conclusão

Testar mensagens Kafka de forma assíncrona é essencial para garantir a integridade do sistema. Usando o Awaitility com KafkaConsumer, conseguimos validar mensagens de forma eficiente.

Se você gostou do conteúdo ou tem dúvidas, deixe um comentário! 🚀

👉 Me siga no dev.to para mais conteúdo de qualidade!

Happy coding! 💻

🔗 Links Referencias e Projeto no GitHub

Hostinger image

Get n8n VPS hosting 3x cheaper than a cloud solution

Get fast, easy, secure n8n VPS hosting from $4.99/mo at Hostinger. Automate any workflow using a pre-installed n8n application and no-code customization.

Start now

Top comments (0)

Qodo Takeover

Introducing Qodo Gen 1.0: Transform Your Workflow with Agentic AI

Rather than just generating snippets, our agents understand your entire project context, can make decisions, use tools, and carry out tasks autonomously.

Read full post

Best practices for optimal infrastructure performance with Magento

Running a Magento store? Struggling with performance bottlenecks? Join us and get actionable insights and real-world strategies to keep your store fast and reliable.

Tune in to the full event

DEV is partnering to bring live events to the community. Join us or dismiss this billboard if you're not interested. ❤️