Olá pessoal, tudo bem com vocês?!
Depois de quase um ano sem escrever, por vários motivos profissionais e pessoais, estou aqui escrevendo novamente. Depois também de várias pessoas me cobrarem por retornar a escrever.
Mas vamos lá, talvez seja um assunto que alguns de vocês podem já conhecer, e já usam, mas mesmo assim resolvi escrever sobre esse tema, e fazer um exemplo da utilização do Apache Kafka com AspNet Core.
Primeiramente, preciso explicar para aqueles que ainda não conhecem o Apache Kafka, para que ele serve.
O Apache Kafka, foi desenvolvido na linguagem Scala e Java, e para quem já usa o RabbitMQ para processamento de filas e troca de mensagens, o conceito é o mesmo, mas com algumas diferenças, principalmente performance, e diferente conteúdo de mensagem, como por exemplo streaming, sim isso mesmo que você leu, streaming..
O Apache Kafka, é muito mais performático que o RabbitMQ, ele lida com alta taxa de transferência, bilhões de mensagens. No seu design, foi dada uma atenção especial para o manuseio eficiente de vários consumidores do mesmo fluxo que leem em velocidades diferentes (streamimg)
Vamos fazer um exemplo, de como usar o Apache Kafka.
Primeiramente, vamos fazer a instalação do serviço do Apache Kafka em algum servidor, irei instalar o serviço no Docker. Procurei uma imagem oficial, mas não encontrei, existem algumas por ai, mas irei usar essa que deu certo wurstmeister/kafka
Segue o compose a ser executado em seu servidor com Docker.
version: '2' | |
services: | |
zookeeper: | |
image: wurstmeister/zookeeper | |
ports: | |
- "2181:2181" | |
kafka: | |
image: wurstmeister/kafka:0.10.2.0 | |
ports: | |
- "9092:9092" | |
environment: | |
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 | |
KAFKA_CREATE_TOPICS: "fila_pedido:1:1" | |
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 | |
volumes: | |
- /var/run/docker.sock:/var/run/docker.sock |
Depois de executar o compose, pode perceber que você terá dois serviços rodando, o Apache Kafka e o Apache ZooKeeper.
Dai agora você se pergunta: Fernando, o que é esse tal de Apache ZooKeeper?
O Apache Kafka, usa o Apache ZooKeeper, para manter e coordenar os brokers. Ele é responsável por controlar o status dos nós do cluster e permite que vários clientes executem leituras e gravações simultâneas.
Com o Apache Kafka funcionando em seu Docker junto com o Apache ZooKeeper, vamos criar uma API(Producer) e um Hosted Service (Consumer) em Asp Net Core.
Nos dois projetos, vamos adicionar referência:
PM> Install-Package Confluent.Kafka -Version 1.1.0
Essa referência, irá fazer nos comunicar com o serviço do Apache Kafka.
No projeto Kafka.Producer.API, vamos criar uma nova controller de nome Producer, e adicionar esse código:
using System; | |
using Confluent.Kafka; | |
using Microsoft.AspNetCore.Mvc; | |
namespace Kafka.Producer.API.Controllers | |
{ | |
[Route("api/[controller]")] | |
[ApiController] | |
public class ProducerController : ControllerBase | |
{ | |
[HttpPost] | |
[ProducesResponseType(typeof(string), 201)] | |
[ProducesResponseType(400)] | |
[ProducesResponseType(500)] | |
public IActionResult Post([FromQuery] string msg) | |
{ | |
return Created("", SendMessageByKafka(msg)); | |
} | |
private string SendMessageByKafka(string message) | |
{ | |
var config = new ProducerConfig { BootstrapServers = "localhost:9092" }; | |
using (var producer = new ProducerBuilder<Null, string>(config).Build()) | |
{ | |
try | |
{ | |
var sendResult = producer | |
.ProduceAsync("fila_pedido", new Message<Null, string> { Value = message }) | |
.GetAwaiter() | |
.GetResult(); | |
return $"Mensagem '{sendResult.Value}' de '{sendResult.TopicPartitionOffset}'"; | |
} | |
catch (ProduceException<Null, string> e) | |
{ | |
Console.WriteLine($"Delivery failed: {e.Error.Reason}"); | |
} | |
} | |
return string.Empty; | |
} | |
} | |
} |
Veja que na linha 29, colocamos o nome da “fila”, de “fila_pedido”. Isso foi configurado na criação do container. No Apache Kafka, nos chamamos de Tópico.
Quer dizer, para qual canal iremos mandar a mensagem,e por qual canal iremos obter a mensagem.
Neste caso, fizemos quem irá mandar a mensagem. Agora, a seguir vamos codificar, quem irá receber a mensagem.
No projeto Kafka.Consumer.Handler, vamos criar uma classe de nome MessageHandler.cs, para ser o consumidor da mensagem.
Iremos adicionar esse código nessa classe.
public class MessageHandler : IHostedService | |
{ | |
private readonly ILogger _logger; | |
public MessageHandler(ILogger<MessageHandler> logger) | |
{ | |
_logger = logger; | |
} | |
public Task StartAsync(CancellationToken cancellationToken) | |
{ | |
var conf = new ConsumerConfig | |
{ | |
GroupId = "test-consumer-group", | |
BootstrapServers = "localhost:9092", | |
AutoOffsetReset = AutoOffsetReset.Earliest | |
}; | |
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build()) | |
{ | |
c.Subscribe("fila_pedido"); | |
var cts = new CancellationTokenSource(); | |
try | |
{ | |
while (true) | |
{ | |
var message = c.Consume(cts.Token); | |
_logger.LogInformation($"Mensagem: {message.Value} recebida de {message.TopicPartitionOffset}"); | |
} | |
} | |
catch (OperationCanceledException) | |
{ | |
c.Close(); | |
} | |
} | |
return Task.CompletedTask; | |
} | |
public Task StopAsync(CancellationToken cancellationToken) | |
{ | |
return Task.CompletedTask; | |
} | |
} |
Veja que na linha 20, eu adiciono de qual canal iremos ler as mensagens enviadas.
Caso tenha a mensagem, iremos adicionar no Log a mensagem recebida.
Vamos rodar a aplicação, e ir nesse endereço: http://localhost:55080/swagger/index.html
Swagger
Já adicionei o Swagger na aplicação, e vou colocar uma mensagem a ser enviada.
Quando a mensagem é enviada, o consumer recebe a mensagem:
Bom gente, está um exemplo simples de como usar o Apache Kafka em suas
aplicações, microserviços em Asp Net Core utilizando Docker para hospedar o serviço do Apache Kafka.
Para suas aplicações que terá um grande volume de mensagens, e precisa de muita performance, ai está o Apache Kafka.
Espero terem gostado, segue o fonte em meu Github
Marraia-Kafka
Um grande abraço e até a próxima!
Top comments (0)