Ferramentas necessárias:
Crie uma aplicação com o nome de sua escolha no spring starter com as seguintes dependências necessárias

Dentro do arquivo pom.xml adicione a dependencia abaixo, pois vamos utilizar o Gson para serializar nossa mensagem recebida em String para um objeto de uma classe Java.
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.6</version>
        </dependency>
Na nossa segunda aplicação, que eu chamei de API Orquestradora, vamos configurar o elasticsearch e a fila Jms que utilizaremos para realizar o proposito da aplicação.
Criamos a classe ElasticsearchClientConfig onde configuramos o acesso a nosso elasticsearch rodando no docker.
ElasticsearchClientConfig
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
@Configuration
@EnableElasticsearchRepositories(basePackages
        = "br.com.orquestrador.infrastructure.repository.elasticsearch")
@ComponentScan(basePackages = { "br.com.orquestrador" })
public class ElasticsearchClientConfig extends
        AbstractElasticsearchConfiguration {
    @Override
    @Bean
    public RestHighLevelClient elasticsearchClient() {
        final ClientConfiguration clientConfiguration =
                ClientConfiguration
                        .builder()
                        .connectedTo("localhost:9200")
                        .build();
        return RestClients.create(clientConfiguration).rest();
    }
}
E também uma classe Repository com o nome UserRepository para utilizarmos o método de save para salvar usuário no elasticsearch e o metodo de buscar por nome.
UserRepository
import br.com.orquestrador.user.User;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Repository;
import java.util.Optional;
@Repository
    public interface UserRepository extends ElasticsearchRepository<User, String> {
        Optional<User> findByName(String name);
    }
Agora vamos configurar a parte de fila para acessar o ActiveMq rodando em nosso docker, para isto criamos a classe JmsConfig e adicionamos algumas configs da filas utilizadas no application.properties.

JmsConfig
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.ConnectionFactory;
@Configuration
@EnableJms
public class JmsConfig {
    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;
    @Value("${spring.activemq.user}")
    private String user;
    @Value("${spring.activemq.password}")
    private String password;
    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        if ( "".equals(user) ) {
            return new ActiveMQConnectionFactory(brokerUrl);
        }
        return new ActiveMQConnectionFactory(user, password, brokerUrl);
    }
    @Bean
    public JmsListenerContainerFactory jmsFactoryTopic(ConnectionFactory connectionFactory,
                                                       DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setPubSubDomain(true);
        return factory;
    }
    @Bean
    public JmsTemplate jmsTemplate() {
        return new JmsTemplate(connectionFactory());
    }
    @Bean
    public JmsTemplate jmsTemplateTopic() {
        JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
        jmsTemplate.setPubSubDomain( true );
        return jmsTemplate;
    }
}
Vamos configurar um Listener Kafka para ouvir nosso tópico que esta produzindo as mensagens na aplicação anterior.
Para isso criamos uma classe KafkaConsumer que ira ficar ouvindo nosso tópico e a cada mensagem produzida realizar a logica que definimos. 
- Passo 1 - Serializar a mensagem recebida em um objeto Java
  
- Passo 2 - Enviar este objeto para uma fila do Activemq
  
- Passo 3 - Salvar os dados deste usuario no elasticsearch
 KafkaConsumer KafkaConsumer
import br.com.orquestrador.application.ListenerKafka.dto.UserDto;
import br.com.orquestrador.user.User;
import br.com.orquestrador.user.UserService;
import com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    @Autowired
    private Gson serializer;
    @Autowired
    private UserService userService;
    @Autowired
    private JmsTemplate jmsTemplate;
    @Autowired
    public KafkaConsumer(Gson serializer) {
        this.serializer = serializer;
    }
    @KafkaListener(topics = "${user.topic}", groupId = "${spring.kafka.consumer.group-id}")
    public void receive( @Payload String message)  {
        logger.info("message received: {}", message);
        UserDto usuarioDto = serializer.fromJson(message, UserDto.class);
        jmsTemplate.convertAndSend("queue.sample", message);
        User usuario = usuarioDto.converte();
        logger.info(usuario.toString());
        String messageFinal = userService.save(usuario);
        logger.info(messageFinal);
    }
}
Abaixo segue o link do github onde a aplicação esta armazenada para que possam conferir como a mesma ficou.
 

 
    
Top comments (0)