DEV Community

Cover image for Utilizando Kafka em suas aplicações AspNet Core
Fernando Mendes
Fernando Mendes

Posted on

5 3

Utilizando Kafka em suas aplicações AspNet Core

Olá pessoal, tudo bem com vocês?!

Depois de quase um ano sem escrever, por vários motivos profissionais e pessoais, estou aqui escrevendo novamente. Depois também de várias pessoas me cobrarem por retornar a escrever.

Mas vamos lá, talvez seja um assunto que alguns de vocês podem já conhecer, e já usam, mas mesmo assim resolvi escrever sobre esse tema, e fazer um exemplo da utilização do Apache Kafka com AspNet Core.

Primeiramente, preciso explicar para aqueles que ainda não conhecem o Apache Kafka, para que ele serve.

O Apache Kafka, foi desenvolvido na linguagem Scala e Java, e para quem já usa o RabbitMQ para processamento de filas e troca de mensagens, o conceito é o mesmo, mas com algumas diferenças, principalmente performance, e diferente conteúdo de mensagem, como por exemplo streaming, sim isso mesmo que você leu, streaming..

O Apache Kafka, é muito mais performático que o RabbitMQ, ele lida com alta taxa de transferência, bilhões de mensagens. No seu design, foi dada uma atenção especial para o manuseio eficiente de vários consumidores do mesmo fluxo que leem em velocidades diferentes (streamimg)

Vamos fazer um exemplo, de como usar o Apache Kafka.

Primeiramente, vamos fazer a instalação do serviço do Apache Kafka em algum servidor, irei instalar o serviço no Docker. Procurei uma imagem oficial, mas não encontrei, existem algumas por ai, mas irei usar essa que deu certo wurstmeister/kafka

Segue o compose a ser executado em seu servidor com Docker.

version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:0.10.2.0
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_CREATE_TOPICS: "fila_pedido:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock

Depois de executar o compose, pode perceber que você terá dois serviços rodando, o Apache Kafka e o Apache ZooKeeper.

Dai agora você se pergunta: Fernando, o que é esse tal de Apache ZooKeeper?
O Apache Kafka, usa o Apache ZooKeeper, para manter e coordenar os brokers. Ele é responsável por controlar o status dos nós do cluster e permite que vários clientes executem leituras e gravações simultâneas.

Com o Apache Kafka funcionando em seu Docker junto com o Apache ZooKeeper, vamos criar uma API(Producer) e um Hosted Service (Consumer) em Asp Net Core.

Alt Text
Estrutura da solução

Nos dois projetos, vamos adicionar referência:
PM> Install-Package Confluent.Kafka -Version 1.1.0

Essa referência, irá fazer nos comunicar com o serviço do Apache Kafka.
No projeto Kafka.Producer.API, vamos criar uma nova controller de nome Producer, e adicionar esse código:

using System;
using Confluent.Kafka;
using Microsoft.AspNetCore.Mvc;
namespace Kafka.Producer.API.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class ProducerController : ControllerBase
{
[HttpPost]
[ProducesResponseType(typeof(string), 201)]
[ProducesResponseType(400)]
[ProducesResponseType(500)]
public IActionResult Post([FromQuery] string msg)
{
return Created("", SendMessageByKafka(msg));
}
private string SendMessageByKafka(string message)
{
var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
try
{
var sendResult = producer
.ProduceAsync("fila_pedido", new Message<Null, string> { Value = message })
.GetAwaiter()
.GetResult();
return $"Mensagem '{sendResult.Value}' de '{sendResult.TopicPartitionOffset}'";
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
return string.Empty;
}
}
}

Veja que na linha 29, colocamos o nome da “fila”, de “fila_pedido”. Isso foi configurado na criação do container. No Apache Kafka, nos chamamos de Tópico.
Quer dizer, para qual canal iremos mandar a mensagem,e por qual canal iremos obter a mensagem.

Neste caso, fizemos quem irá mandar a mensagem. Agora, a seguir vamos codificar, quem irá receber a mensagem.

No projeto Kafka.Consumer.Handler, vamos criar uma classe de nome MessageHandler.cs, para ser o consumidor da mensagem.

Alt Text

Iremos adicionar esse código nessa classe.

public class MessageHandler : IHostedService
{
private readonly ILogger _logger;
public MessageHandler(ILogger<MessageHandler> logger)
{
_logger = logger;
}
public Task StartAsync(CancellationToken cancellationToken)
{
var conf = new ConsumerConfig
{
GroupId = "test-consumer-group",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe("fila_pedido");
var cts = new CancellationTokenSource();
try
{
while (true)
{
var message = c.Consume(cts.Token);
_logger.LogInformation($"Mensagem: {message.Value} recebida de {message.TopicPartitionOffset}");
}
}
catch (OperationCanceledException)
{
c.Close();
}
}
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}

Veja que na linha 20, eu adiciono de qual canal iremos ler as mensagens enviadas.
Caso tenha a mensagem, iremos adicionar no Log a mensagem recebida.
Vamos rodar a aplicação, e ir nesse endereço: http://localhost:55080/swagger/index.html

Alt Text

Swagger
Já adicionei o Swagger na aplicação, e vou colocar uma mensagem a ser enviada.
Alt Text

Quando a mensagem é enviada, o consumer recebe a mensagem:
Alt Text

Mensagem de Log
Alt Text

Bom gente, está um exemplo simples de como usar o Apache Kafka em suas
aplicações, microserviços em Asp Net Core utilizando Docker para hospedar o serviço do Apache Kafka.

Para suas aplicações que terá um grande volume de mensagens, e precisa de muita performance, ai está o Apache Kafka.

Espero terem gostado, segue o fonte em meu Github
Marraia-Kafka

Um grande abraço e até a próxima!

Top comments (0)