Em artigos anteriores, abordei a integração do Brighter com Kafka e o Brighter V10 RC1. Este guia foca na migração para o Brighter V10, enfatizando mudanças na configuração do Kafka e atualizações significativas.
Requisitos
- .NET 8 ou superior
- Um projeto .NET com os seguintes pacotes NuGet:
- Paramore.Brighter.MessagingGateway.Kafka: Habilita integração com Kafka.
- Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection: Permite registrar o Brighter com o DI do Microsoft.
- Paramore.Brighter.ServiceActivator.Extensions.Hosting: Hospeda o Brighter como serviço em segundo plano.
- Serilog.AspNetCore: Para logging estruturado (opcional, mas recomendado).
Recapitulando o Brighter
Antes de continuar sobre a configuração do Kafka, vamos recapitular o que já sabemos sobre o Brighter.
Solicitação (Comando/Evento)
Defina mensagens usando IRequest
:
[PublicationTopic("greeting.topic")]
public class Greeting() : Event(Guid.NewGuid())
{
public string Name { get; set; } = string.Empty;
}
-
Comandos: Operações para um único destinatário (ex.:
SendEmail
). -
Eventos: Notificações broadcastadas (ex.:
OrderShipped
).
Mapeador de Mensagem (Opcional)
Traduz entre mensagens do Brighter e objetos do aplicativo. Para fluxos assíncronos, mapeadores agora exigem IAmAMessageMapperAsync
.
Manipulador de Solicitações
Processa mensagens recebidas:
public class GreetingHandler(ILogger<GreetingHandler> logger) : RequestHandler<Greeting>
{
public override Greeting Handle(Greeting command)
{
logger.LogInformation("Olá {Name}", command.Name);
return base.Handle(command);
}
}
Configurando o Brighter com Kafka
1. Configuração da Conexão
Defina detalhes da conexão com o Kafka:
var connection = new KafkaMessagingGatewayConfiguration
{
Name = "sample", // Nome do aplicativo
BootStrapServers = ["localhost:9092"], // Endereço do broker
SecurityProtocol = SecurityProtocol.Plaintext, // Use SSL em produção
SaslMechanisms = SaslMechanism.Plain,
};
2. Assinatura do Kafka
Assine um tópico:
.AddServiceActivator(opt =>
{
opt.Subscriptions =
[
new KafkaSubscription<Greeting>(
new SubscriptionName("kafka.greeting.subscription"), // Nome da assinatura (interno)
new ChannelName("greeting.topic"), // Nome do tópico
new RoutingKey("greeting.topic"), // Nome do tópico
groupId: "some-consumer-group", // ID do grupo Kafka
makeChannels: OnMissingChannel.Create, // Criar tópico se não existir
numOfPartitions: 2, // Número de partições (usado ao criar tópico via código)
noOfPerformers: 2, // Nº de assinaturas paralelas (não deve exceder o nº de partições)
messagePumpType: MessagePumpType.Proactor
),
];
opt.DefaultChannelFactory = new ChannelFactory(new KafkaMessageConsumerFactory(connection));
})
3. Configuração do Produtor Kafka
Publique eventos em um tópico:
.UseExternalBus(opt =>
{
opt.ProducerRegistry = new KafkaProducerRegistryFactory(connection,
[
new KafkaPublication
{
MakeChannels = OnMissingChannel.Create,
Source = new Uri("test-app", UriKind.RelativeOrAbsolute), // Propriedades CloudEvents
Topic = new RoutingKey("greeting.topic")
}
]).Create();
})
Novas Funcionalidades
Suporte a CloudEvents
O Brighter agora tem suporte nativo ao CloudEvents, permitindo configurar Source
, Subject
, Type
e outras propriedades via Publication
.
Por padrão, o Brighter usa o modo binário. Para JSON, registre CloudEventJsonMessageMapper<T>
ou mude o mapeador padrão:
// Usar CloudEvent JSON como padrão
.MapperRegistry(registry => registry.SetCloudEventJsonAsDefaultMessageMapper())
// Ou registrar por tipo
.MapperRegistry(registry => registry.Register<Greeting, CloudEventJsonMessageMapper<Greeting>>())
Atributo PublicationTopic
Com o suporte a mapeadores padrão, o Brighter precisa identificar qual publicação usar. Isso pode ser feito via atributo ou configuração explícita:
[PublicationTopic("greeting.topic")]
public class Greeting() : Event(Guid.NewGuid()) { ... }
// Alternativa: Publicação explícita
new KafkaPublication<Greeting> { ... }
Mudanças Significativas no Brighter V10
Valores Padrão Alterados
-
KafkaPublication.BatchNumberMessages
de10
para10000
-
KafkaPublication.QueueBufferingMaxMessages
de10
para100000
Revisão dos Mapeadores de Mensagem
- Serialização JSON Padrão: Agora é integrada, exceto para lógica personalizada.
Assinatura
-
Tipos de Bomba de Mensagem Explícitos: Substituído
isAsync
pormessagePumpType
(Reactor
,Proactor
). -
Renomeação de Propriedades:
ChannelFactory
→DefaultChannelFactory
.
Publicação
Use ExternalBusConfiguration
para produtores e padrão outbox:
// V10
.UseExternalBus(opt => { ... })
// V9
.UseExternalBus(new RmqProducerRegistryFactory(...))
Dicas para Kafka
Prefira Post
em vez de PostAsync
para produtores Kafka, evitando overhead desnecessário (melhora a taxa de transferência em 100x+).
Conclusão
O Brighter V10 simplifica a integração com Kafka, oferecendo suporte a CloudEvents, padrões aprimorados e configuração simplificada. As mudanças significativas buscam clareza, exigindo atualizações em assinaturas, produtores e lógica de serialização. Para detalhes, consulte o repositório GitHub do Brighter e a especificação CloudEvents.
Referência
Código completo no GitHub - https://github.com/lillo42/brighter-sample/commits/connect-to-kafka/
Top comments (0)