DEV Community

guilhermegarcia86
guilhermegarcia86

Posted on • Originally published at programadev.com.br on

1

Consumindo mensagens do Kafka sem dor de cabeça

No artigo Produzindo mensagens com Kafka vimos como podemos usar o Kafka para enviar mensagens e agora vamos ver como podemos receber essas mensagens de forma simples, sem dor de cabeça e com a vantagem de termos a segurança que o Schema Registry trás com a validação de contratos.

Projeto

Esse projeto consistirá em receber as mensagens no formato Avro do Kafka e irá enviar um email para o usuário. Para iniciar o projeto foi utilizado o Spring Boot mas sem a dependência do spring kafka. Nesse projeto utilizaremos o Spring para subir a aplicação mas queremos ter mais controle sobre os processos do Kafka.

Criando projeto

Segue o pom.xml do projeto com os plugins e dependências utilizadas:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.1</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.irs</groupId>
    <artifactId>sender</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>sender email</name>
    <description>Send email</description>

    <properties>
        <java.version>11</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-mail</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.10.1</version>
        </dependency>

        <!--dependencies needed for the kafka part -->
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer -->
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>5.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-mail</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>

            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <!--for specific record -->
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.10.1</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                            <goal>protocol</goal>
                            <goal>idl-protocol</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
                            <stringType>String</stringType>
                            <createSetters>false</createSetters>
                            <enableDecimalLogicalType>true</enableDecimalLogicalType>
                            <fieldVisibility>private</fieldVisibility>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <!--force discovery of generated classes -->
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <id>add-source</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>add-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>target/generated-sources/avro</source>
                            </sources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

</project>
Enter fullscreen mode Exit fullscreen mode

Atualizando Schema

Antes de iniciar o projeto precisamos atualizar o nosso Schema pois na primeira versão não havia a informação de email e agora precisamos disso para que o serviço funcione corretamente, o novo Schema irá manter os campos que já possuía e será adicionado o campo de email:

{
     "type": "record",
     "namespace": "com.irs.register.avro.taxpayer",
     "name": "TaxPayer",
     "version": "2",
     "fields": [
       { "name": "name", "type": "string", "doc": "Name of TaxPayer" },
       { "name": "document", "type": "string", "doc": "Document of TaxPayer" },
       { "name": "email", "type": "string", "doc": "Email of TaxPayer" },
       { "name": "situation", "type": "boolean", "default": false, "doc": "Legal situation of TaxPayer" }
     ]
}
Enter fullscreen mode Exit fullscreen mode

Configurando consumidor

Vamos começar as configurações de propriedades para quando a aplicação for se conectar ao Kafka :

@Configuration
public class KafkaConfiguration implements MessageConfiguration<TaxPayer> {

    @Autowired
    private KafkaProperties kafkaProperties;

    @Bean(name = "taxpayerConsumer")
    @Override
    public KafkaConsumer<String, TaxPayer> configureConsumer() {

        Properties props = new Properties();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getGroupId());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaProperties.getAutoCommit());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaProperties.getOffsetReset());

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getKeyDesserializer());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaProperties.getValueDesserializer());

        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaProperties.getSchemaRegistryUrl());
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, kafkaProperties.isSpecificAvroReader());

        return new KafkaConsumer<String, TaxPayer>(props);
    }

}
Enter fullscreen mode Exit fullscreen mode

Basicamente configuramos as urls de conexão com o Kafka e Schema Registry , definimos o nosso group id para identificação do nosso consumidor no Consumer Group , também configuramos o Auto Commit do Offset juntamente com o Auto Offset Reset , que indica se queremos sempre buscar as mensagens desde o inicio, do último offset commitado e etc. e por fim definimos que queremos usar um leitor específico do nosso Avro com a configuração SPECIFIC_AVRO_READER_CONFIG.

Feito isso basta instanciarmos a classe KafkaConsumer passando as nossas props.

Implementando o consumidor

Com as configurações prontas agora basta que criemos a classe de serviço que irá utilizar o KafkaConsumer e irá buscar as mensagens no Kafka :

@Service
@Slf4j
public class KafkaConsumerService implements Consumer<TaxPayer> {

    @Autowired
    @Qualifier("taxpayerConsumer")
    private KafkaConsumer<String, TaxPayer> kafkaConsumer;

    @Override
    public String topic() {
        return "taxpayer-avro";
    }

    @PostConstruct
    @Override
    public void receive() {

        kafkaConsumer.subscribe(Collections.singleton(this.topic()));

        while (true) {

            try {

                kafkaConsumer.poll(Duration.ofMillis(1000)).forEach(record -> {

                    log.info("Recebendo TaxPayer");

                    TaxPayer taxpayer = record.value();

                    Person person = Person.builder().email(taxpayer.getEmail()).name(taxpayer.getName()).build();

                    log.info(person.toString());

                });

                kafkaConsumer.commitSync();

            } catch (Exception ex) {
                log.error("Erro ao processar mensagem", ex);
            }

        }

    }

}
Enter fullscreen mode Exit fullscreen mode

Na classe KafkaConsumerService temos o método receive e nele usamos o KafkaConsumer para nos subscrevermos no tópico, que é o taxpayer-avro e entramos em um loop while para sempre ficarmos buscando as mensagens no Kafka e após isso commitamos o offset para as partições em caso de sucesso ou lançamos uma Exception em caso de falha.

Serviço de Email

Para configurar o serviço de envio de mensagem de email vamos usar a classe JavaMailSender que o Spring fornece:

@Configuration
public class MailConfigurer {

    @Bean
    public JavaMailSender getJavaMailSender() {
        JavaMailSenderImpl mailSender = new JavaMailSenderImpl();
        mailSender.setHost("smtp.gmail.com");
        mailSender.setPort(587);

        mailSender.setUsername("fake@gmail.com");
        mailSender.setPassword("senha secreta");

        Properties props = mailSender.getJavaMailProperties();
        props.put("mail.transport.protocol", "smtp");
        props.put("mail.smtp.auth", "true");
        props.put("mail.smtp.starttls.enable", "true");
        props.put("mail.debug", "true");

        return mailSender;
    }

}
Enter fullscreen mode Exit fullscreen mode

Após isso podemos criar uma classe de serviço especializada em enviar os emails:

@Service
@Slf4j
public class EmailService implements Email{

    @Autowired
    private JavaMailSender emailSender;

    @Override
    public void sendMessage(Person person) {

        try {

            log.info("Try to send email for : " + person.getEmail());
            SimpleMailMessage message = new SimpleMailMessage();
            message.setFrom("noreply@irs.com");
            message.setTo(person.getEmail());
            message.setSubject("Confirmação de recebimento");
            message.setText(person.getName() + " seus dados foram recebidos com sucesso");

            emailSender.send(message);

        } catch (MailException ex) {
            log.error("Error to send email", ex);
        }

    }

}
Enter fullscreen mode Exit fullscreen mode

O método sendMessage recebe um objeto do tipo Person e envia o email com a nossa mensagem.

Agora podemos injetar a classe de email no consumidor para enviar o email a cada mensagem recebida:

@Service
@Slf4j
public class KafkaConsumerService implements Consumer<TaxPayer> {

    @Autowired
    @Qualifier("taxpayerConsumer")
    private KafkaConsumer<String, TaxPayer> kafkaConsumer;

    @Autowired
    private Email email;

    @Override
    public String topic() {
        return "taxpayer-avro";
    }

    @PostConstruct
    @Override
    public void receive() {

        kafkaConsumer.subscribe(Collections.singleton(this.topic()));

        while (true) {

            try {

                kafkaConsumer.poll(Duration.ofMillis(1000)).forEach(record -> {

                    log.info("Recebendo TaxPayer");

                    TaxPayer taxpayer = record.value();

                    Person person = Person.builder().email(taxpayer.getEmail()).name(taxpayer.getName()).build();

                    email.sendMessage(person);

                });

                kafkaConsumer.commitSync();

            } catch (Exception ex) {
                log.error("Erro ao processar mensagem", ex);
            }

        }

    }

}
Enter fullscreen mode Exit fullscreen mode

Agora a cada mensagem que for recebida ele irá enviar um email.

Código fonte

O código desse projeto se encontra no GitHub

Alternativa com Spring Boot

No exemplo desse artigo foi mostrado como consumir as mensagens do Kafka sem utilizar as libs do Spring Bot , caso tenha interesse em ver como seria a configuração de um consumidor utilizando a implementação do Spring Boot esses dois artigos mostram como fazer isso, a primeira usando annotations e outra sem usar annotations:

AWS GenAI LIVE image

Real challenges. Real solutions. Real talk.

From technical discussions to philosophical debates, AWS and AWS Partners examine the impact and evolution of gen AI.

Learn more

Top comments (0)

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay