DEV Community

Cover image for Migrando para Brighter V10 com Kafka
Rafael Andrade
Rafael Andrade

Posted on

Migrando para Brighter V10 com Kafka

Em artigos anteriores, abordei a integração do Brighter com Kafka e o Brighter V10 RC1. Este guia foca na migração para o Brighter V10, enfatizando mudanças na configuração do Kafka e atualizações significativas.

Requisitos

Recapitulando o Brighter

Antes de continuar sobre a configuração do Kafka, vamos recapitular o que já sabemos sobre o Brighter.

Solicitação (Comando/Evento)

Defina mensagens usando IRequest:

[PublicationTopic("greeting.topic")]
public class Greeting() : Event(Guid.NewGuid())
{
    public string Name { get; set; } = string.Empty;
}
Enter fullscreen mode Exit fullscreen mode
  • Comandos: Operações para um único destinatário (ex.: SendEmail).
  • Eventos: Notificações broadcastadas (ex.: OrderShipped).

Mapeador de Mensagem (Opcional)

Traduz entre mensagens do Brighter e objetos do aplicativo. Para fluxos assíncronos, mapeadores agora exigem IAmAMessageMapperAsync.

Manipulador de Solicitações

Processa mensagens recebidas:

public class GreetingHandler(ILogger<GreetingHandler> logger) : RequestHandler<Greeting>
{
    public override Greeting Handle(Greeting command)
    {
        logger.LogInformation("Olá {Name}", command.Name);
        return base.Handle(command);
    }
}
Enter fullscreen mode Exit fullscreen mode

Configurando o Brighter com Kafka

1. Configuração da Conexão

Defina detalhes da conexão com o Kafka:

var connection = new KafkaMessagingGatewayConfiguration
{
    Name = "sample", // Nome do aplicativo
    BootStrapServers = ["localhost:9092"], // Endereço do broker
    SecurityProtocol = SecurityProtocol.Plaintext, // Use SSL em produção
    SaslMechanisms = SaslMechanism.Plain,
};
Enter fullscreen mode Exit fullscreen mode

2. Assinatura do Kafka

Assine um tópico:

.AddServiceActivator(opt =>
{
    opt.Subscriptions =
    [
        new KafkaSubscription<Greeting>(
            new SubscriptionName("kafka.greeting.subscription"), // Nome da assinatura (interno)
            new ChannelName("greeting.topic"), // Nome do tópico
            new RoutingKey("greeting.topic"),  // Nome do tópico
            groupId: "some-consumer-group", // ID do grupo Kafka
            makeChannels: OnMissingChannel.Create, // Criar tópico se não existir
            numOfPartitions: 2, // Número de partições (usado ao criar tópico via código)
            noOfPerformers: 2, // Nº de assinaturas paralelas (não deve exceder o nº de partições)
            messagePumpType: MessagePumpType.Proactor
        ),
    ];

    opt.DefaultChannelFactory = new ChannelFactory(new KafkaMessageConsumerFactory(connection));
})
Enter fullscreen mode Exit fullscreen mode

3. Configuração do Produtor Kafka

Publique eventos em um tópico:

.UseExternalBus(opt =>
{
    opt.ProducerRegistry = new KafkaProducerRegistryFactory(connection,
    [
        new KafkaPublication
        {
            MakeChannels = OnMissingChannel.Create,
            Source = new Uri("test-app", UriKind.RelativeOrAbsolute), // Propriedades CloudEvents
            Topic = new RoutingKey("greeting.topic")
        }
    ]).Create();
})
Enter fullscreen mode Exit fullscreen mode

Novas Funcionalidades

Suporte a CloudEvents

O Brighter agora tem suporte nativo ao CloudEvents, permitindo configurar Source, Subject, Type e outras propriedades via Publication.

Por padrão, o Brighter usa o modo binário. Para JSON, registre CloudEventJsonMessageMapper<T> ou mude o mapeador padrão:

// Usar CloudEvent JSON como padrão  
.MapperRegistry(registry => registry.SetCloudEventJsonAsDefaultMessageMapper())

// Ou registrar por tipo
.MapperRegistry(registry => registry.Register<Greeting, CloudEventJsonMessageMapper<Greeting>>())
Enter fullscreen mode Exit fullscreen mode

Atributo PublicationTopic

Com o suporte a mapeadores padrão, o Brighter precisa identificar qual publicação usar. Isso pode ser feito via atributo ou configuração explícita:

[PublicationTopic("greeting.topic")]
public class Greeting() : Event(Guid.NewGuid()) { ... }

// Alternativa: Publicação explícita 
new KafkaPublication<Greeting> { ... }
Enter fullscreen mode Exit fullscreen mode

Mudanças Significativas no Brighter V10

Valores Padrão Alterados

  • KafkaPublication.BatchNumberMessages de 10 para 10000
  • KafkaPublication.QueueBufferingMaxMessages de 10 para 100000

Revisão dos Mapeadores de Mensagem

  • Serialização JSON Padrão: Agora é integrada, exceto para lógica personalizada.

Assinatura

  • Tipos de Bomba de Mensagem Explícitos: Substituído isAsync por messagePumpType (Reactor, Proactor).
  • Renomeação de Propriedades: ChannelFactoryDefaultChannelFactory.

Publicação

Use ExternalBusConfiguration para produtores e padrão outbox:

// V10
.UseExternalBus(opt => { ... })

// V9
.UseExternalBus(new RmqProducerRegistryFactory(...))
Enter fullscreen mode Exit fullscreen mode

Dicas para Kafka

Prefira Post em vez de PostAsync para produtores Kafka, evitando overhead desnecessário (melhora a taxa de transferência em 100x+).

Conclusão

O Brighter V10 simplifica a integração com Kafka, oferecendo suporte a CloudEvents, padrões aprimorados e configuração simplificada. As mudanças significativas buscam clareza, exigindo atualizações em assinaturas, produtores e lógica de serialização. Para detalhes, consulte o repositório GitHub do Brighter e a especificação CloudEvents.

Referência

Código completo no GitHub - https://github.com/lillo42/brighter-sample/commits/connect-to-kafka/

Top comments (0)