DEV Community

guilhermegarcia86
guilhermegarcia86

Posted on • Originally published at programadev.com.br

1 1

Kafka Sink

Introdução

No último artigo foi mostrado como é possível a partir de uma fonte de dados externa, alimentar um tópico do Kafka através do Kafka Source. Agora será visto como podemos fazer a operação inversa onde as informações de um tópico alimentarão uma fonte de dados externa através do Kafka Sink, nesse exemplo será usado o banco de dados não relacional MongoDB.

Kafka Sink

Como foi visto no artigo passado o Kafka Source é uma ferramenta open source criado pela Apache, essa ferramenta facilita a comunicação em sistemas externos ao Kafka, com isso conseguimos trazer dados de fontes externas e o oposto também pode ocorrer graças ao Kafka Sink, que realiza essa operação de sink (escoar) para sistemas externos.

Data Mountaineer

O Data Mountaineer é uma empresa com foco em BigData e streaming de dados que se juntou com a Landoop e fornece o connector que se liga ao Kafka Connect e realiza a operação de Sink que nesse caso será com o MongoDB.

Projeto

O projeto consistirá na configuração de um connector que irá receber mensagens de um tópico do Kafka e irá salvar essas informações no MongoDB, também será criado um projeto Java que estará conectado ao banco de dados e irá expor via API REST os dados para consulta.

Focando na parte que será apresentada temos um desenho do fluxo:

Imagem Fluxo De Dados MySQL para Mongo

Onde podemos ver de forma resumida o fluxo de dados que começa com o banco de dados MySQL, onde as alterações são capturadas pelo Debezium e através do Kafka Connect são enviadas ao broker do Kafka e também via Kafka Connect os dados são consumidos de um tópico e enviados ao Data Mountaineer e serão inseridos no MongoDB.

Adicionando o Mongo ao Docker Compose

É necessário adicionar o MongoDB ao docker-compose como abaixo:

  mongo:
    image: mongo
    container_name: mongo
    depends_on: 
    - kafka-cluster    
    ports:
      - 27017:27017
    networks: 
      - kafka-network
Enter fullscreen mode Exit fullscreen mode

Acessando o MongoDB

Vamos acessar o MongoDB via terminal:

docker exec -it mongo mongo
Enter fullscreen mode Exit fullscreen mode

E vamos acessar a collection taxpayer:

use taxpayer
Enter fullscreen mode Exit fullscreen mode

Configurando Kafka Sink

Para configurar o Kafka Sink basta acessar novamente o dashboard, através da imagem landoop/kafka-lenses-dev e clicar em Connectors no lado esquerdo e depois no botão New Connector.

Imagem Dashboard Novo Connector

Será exibida a tela com os conectores disponíveis e entre eles o conector do MongoDB basta clicar nele que será aberta a tela para inserir as configurações.

Imagem Dashboard Mongo Connector

Nessa tela iremos inserir as seguintes configurações:

connector.class=com.datamountaineer.streamreactor.connect.mongodb.sink.MongoSinkConnector
connect.mongo.kcql=UPSERT INTO taxpayer SELECT after FROM irs-conn.decider.complaint_taxpayer PK after.document;UPSERT INTO taxpayer SELECT after FROM irs-conn.decider.defaulted_taxpayer PK after.document
connect.mongo.connection=mongodb://mongo:27017
tasks.max=2
topics=irs-conn.decider.complaint_taxpayer,irs-conn.decider.defaulted_taxpayer
name=mongo
connect.mongo.db=taxpayer
Enter fullscreen mode Exit fullscreen mode

Vamos bater ponto a ponto essa configuração:

  • name: Nome da conexão.
  • connector.class: É a classe que é carregada para fazer a conexão com o MongoDB.
  • connect.mongo.connection: URI de conexão com o banco de dados.
  • connect.mongo.db: Nome da collection no Mongo.
  • tasks.max Máximo de tasks para essa conexão.
  • topics: Aqui deve ser informado quais são os tópicos que serão "escutados" pelo Kafka Connect para delegar ao Data Mountaineer
  • connect.mongo.kcql: KCQL é como diz documentação, SQL for Kafka Connect, não é algo incomum a própria Confluent possui algo similar que é o kSQL que nada mais é do que uma forma de simplificar a manipulação de dados e streams através de queries SQL. Como no nosso exemplo iremos capturar dados de dois tópicos precisamos criar duas queries para fazer UPSERT no MongoDB.

Após clicar em Create Connector a nova conexão deve ser listada como mostrado abaixo:

Imagem Dashboard Connector Criado

Monitorando o MongoDB

Agora podemos realizar alterações no MySQL e elas serão enviadas para os tópicos correspondentes e serão capturadas pelo Kafka Sink e enviadas ao MongoDB:

> db.taxpayer.find().pretty()
{
        "_id" : "890.955.180-19",
        "after" : {
                "document" : "890.955.180-19",
                "email" : "teste@teste.com",
                "name" : "Sandra Nair Sueli Ribeiro",
                "status" : "DEFAULTED"
        }
}
{
        "_id" : "869.097.474-10",
        "after" : {
                "document" : "869.097.474-10",
                "email" : "fake@fake.com.br",
                "name" : "Joao Alface",
                "status" : "COMPLAINT"
        }
}
Enter fullscreen mode Exit fullscreen mode

Projeto Java

Para iniciar o projeto foi utilizado o Spring Initializr onde foi adicionado as dependências de Web, Lombok e Spring Data Mongo.

Iniciando pelo domínio dessa aplicação temos a Taxpayer:

@Data
@Builder
public class Taxpayer {

    private String document;
    private String email;
    private String name;
    private String status;

}
Enter fullscreen mode Exit fullscreen mode

E nos casos de uso dessa aplicação queremos que seja possível buscar os Taxpayers pelo número de documento, email e listar todos. Para isso foi criado a classe SearchTaxpayer:

@Component
public class SearchTaxpayer {

    private final TaxpayerRepository taxpayerRepository;

    @Autowired
    public SearchTaxpayer(TaxpayerRepository taxpayerRepository){
        this.taxpayerRepository = taxpayerRepository;
    }

    public Taxpayer searchByEmail(String email){
        final Optional<Taxpayer> optionalTaxpayer = taxpayerRepository.findByEmail(email);

        if(!optionalTaxpayer.isPresent()){
            throw new TaxpayerNotFoundException("Taxpayer not found");
        }

        return optionalTaxpayer.get();
    }

    public Taxpayer searchByDocument(String document){
        final Optional<Taxpayer> optionalTaxpayer = taxpayerRepository.findByDocument(document);

        if(!optionalTaxpayer.isPresent()){
            throw new TaxpayerNotFoundException("Taxpayer not found");
        }

        return optionalTaxpayer.get();
    }

    public List<Taxpayer> findAll(){
        return taxpayerRepository.findAll().get();
    }
}
Enter fullscreen mode Exit fullscreen mode

Para ela funcionar é necessário criar a classe TaxpayerRepository que nada mais é que a interface que servirá como port no conceito de ports and adapters:

public interface TaxpayerRepository {

    Optional<Taxpayer> findByEmail(String email);

    Optional<Taxpayer> findByDocument(String document);

    Optional<List<Taxpayer>> findAll();
}
Enter fullscreen mode Exit fullscreen mode

E agora criamos o nosso Adapter para ele, mas primeiro faremos as configurações que o MongoDB necessita, primeiro criando a nossa entidade de banco de dados:

@Data
@Document(collection = "taxpayer")
public class TaxpayerEntity {

    @Id
    private String id;

    private After after;
}
Enter fullscreen mode Exit fullscreen mode
@Data
public class After {

    private String document;
    private String email;
    private String name;
    private String status;

}
Enter fullscreen mode Exit fullscreen mode

Como estamos usando Spring Data o trabalho é muito simplificado bastando criar uma interface que estenda de MongoRepository:

public interface MongoTaxpayerRepository extends MongoRepository<TaxpayerEntity, String> {

    Optional<TaxpayerEntity> findByAfter_Email(String email);

    Optional<TaxpayerEntity> findByAfter_Document(String document);
}
Enter fullscreen mode Exit fullscreen mode

Agora criamos o Adapter implementando a TaxpayerRepository:

@Component
public class MongoTaxpayer implements TaxpayerRepository {

    private final MongoTaxpayerRepository mongoTaxpayerRepository;

    @Autowired
    public MongoTaxpayer(MongoTaxpayerRepository mongoTaxpayerRepository){
        this.mongoTaxpayerRepository = mongoTaxpayerRepository;
    }

    @Override
    public Optional<Taxpayer> findByEmail(String email) {

        final Optional<TaxpayerEntity> optionalTaxpayerEntity = mongoTaxpayerRepository.findByAfter_Email(email);

        if(!optionalTaxpayerEntity.isPresent()){
            return Optional.empty();
        }

        final After after = optionalTaxpayerEntity.get().getAfter();

        return Optional.of(Taxpayer.builder().name(after.getName()).document(after.getDocument()).email(after.getEmail()).status(after.getStatus()).build());
    }

    @Override
    public Optional<Taxpayer> findByDocument(String document) {

        final Optional<TaxpayerEntity> optionalTaxpayerEntity = mongoTaxpayerRepository.findByAfter_Document(document);

        if(!optionalTaxpayerEntity.isPresent()){
            return Optional.empty();
        }

        final After after = optionalTaxpayerEntity.get().getAfter();

        return Optional.of(Taxpayer.builder().name(after.getName()).document(after.getDocument()).email(after.getEmail()).status(after.getStatus()).build());
    }

    @Override
    public Optional<List<Taxpayer>> findAll() {

        final List<TaxpayerEntity> taxpayerEntityList = mongoTaxpayerRepository.findAll();

        if(null == taxpayerEntityList || taxpayerEntityList.isEmpty()){
            return Optional.empty();
        }

        final List<Taxpayer> taxpayerList = taxpayerEntityList.stream().map(taxpayerEntity -> {
            final After after = taxpayerEntity.getAfter();

            return Taxpayer.builder().name(after.getName()).document(after.getDocument()).email(after.getEmail()).status(after.getStatus()).build();
        }).collect(Collectors.toList());

        return Optional.of(taxpayerList);
    }
}
Enter fullscreen mode Exit fullscreen mode

Com o adaptador criado vamos criar o controller que irá receber as requisições REST e delegar para a classe SearchTaxpayer:

@RestController
@RequestMapping("/taxpayer")
public class TaxpayerController {

    @Autowired
    private SearchTaxpayer searchTaxpayer;

    @GetMapping
    public ResponseEntity<List<Taxpayer>> getAllTaxpayers(){
        return ResponseEntity.ok(searchTaxpayer.findAll());
    }

    @GetMapping("document")
    public ResponseEntity<Taxpayer> getTaxpayerByDocument(@RequestParam("document") String document){
        return ResponseEntity.ok(searchTaxpayer.searchByDocument(document));
    }

    @GetMapping("email")
    public ResponseEntity<Taxpayer> getTaxpayerByEmail(@RequestParam("email") String email){
        return ResponseEntity.ok(searchTaxpayer.searchByEmail(email));
    }

}
Enter fullscreen mode Exit fullscreen mode

Após subir a aplicação e fazendo uma requisição para url


 teremos como resultado:



```json
[
    {
        "document": "890.955.180-19",
        "email": "teste@teste.com",
        "name": "Sandra Nair Sueli Ribeiro",
        "status": "DEFAULTED"
    },
    {
        "document": "1234567890",
        "email": "fake@fake.com.br",
        "name": "Joao Alface",
        "status": "COMPLAINT"
    }
]
Enter fullscreen mode Exit fullscreen mode

Conclusão

Aqui vimos a outra ponta do Kafka Connect, o Kafka Sink e foi apresentado um exemplo prático de como podemos ter bases ou até mesmo aplicações que podem receber dados através de streams com as ferramentas que o Kafka disponibiliza.

Código fonte

O código do projeto está no GitHub

AWS GenAI LIVE image

How is generative AI increasing efficiency?

Join AWS GenAI LIVE! to find out how gen AI is reshaping productivity, streamlining processes, and driving innovation.

Learn more

Top comments (0)

Billboard image

Create up to 10 Postgres Databases on Neon's free plan.

If you're starting a new project, Neon has got your databases covered. No credit cards. No trials. No getting in your way.

Try Neon for Free →

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay