Clima natalino chegando, na verdade, mais perto do que em janeiro. Vamos apenas pensar em resolver problemas que um dia foram complicados para muitos, mas por meio do artigo certo, conseguimos resolver nossos empecilhos facilmente.
Já passou pelo problema de tentar se conectar com um banco oracle, para conseguir captar todos os eventos de insert ou update de uma tabela específica a partir do kafka-connect?
Caso tenha ou já passou por problemas semelhantes a esses, venho aqui lhe trazer boas novas sobre formas de conexão a partir do conector jdbc.
O connector jdbc, é Open source, presente na documentação oficial da confluent, com todas as descrições bem claras dos campos que são utilizados, e seus níveis de prioridades.
JDBC
Documentação do Connector
Vamos deixar de enrolação e partir logo pro abraço, porque estamos no fim de ano, e quanto mais rápido aprendermos essa funcionalidade, mais cedo comemoraremos a ceia de natal.
Antes de tudo, certifique-se que seu kafka-connect possui o seguinte connector instalado na sua máquina:
confluent-hub install confluentinc/kafka-connect-jdbc:latest
Confirmando a instalação, vamos para alguns casos em que o connector pode ser utilizado:
1 - Fazer Dump completo do banco de dados, ou de tabelas específicas;
2 - Buscar apenas dados que foram inseridos na tabela;
3 - Receber apenas os dados que sofreram updates;
4 - Receber mensagens de itens que foram inseridos ou atualizados na tabela;
Esses 4 cenários conseguimos tratar com o kafka-connect, cada um com sua forma de configurar um connector.
1 - Dump completo do banco, ou de tabelas específicas
Esse processo requer que configuremos o nosso connector da seguinte forma:
Obs: Estarei especificando o significado de cada campo do objeto
{
"name": "nome do connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@<host do banco>:<porta>/<nome do banco de dados>",
"connection.user": "<username do banco>",
"connection.password": "<senha do banco>",
"topic.prefix": "<prefixo do tópico kafka que será produzido>",
"table.whitelist": "<tabelas que serão lidas>",
"poll.interval.ms": 5000,
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"mode": "bulk"
}
}
Para a criação de um connector jdbc, definimos dois campos primários (name e config). "name" se baseia no nome do nosso connector e "config" são as definições do comportamento desse connector.
Config
- connector.class => Classe que tem as funções do nosso connector, dentro do pacote que instalamos anteriormente
- connection.url => URL de conexão com o banco dados, devendo seguir a seguinte sintaxe do comando jdbc -> jdbc:oracle:thin:@:/
- connection.user => usuário do banco de dados
- connection.password => senha para acessar ao banco
- topic.prefix => Inicio do nome do nosso tópico. Geralmente ao criarmos um connector, o nosso tópico vem escrito com:
- table.whitelist => Tabelas que serão mapeadas no processo. Caso o número de tabelas seja muito grande, podemos definir quais tabelas não serão lidas, a partir do parâmetro "table.blacklist". Não obstante, deve existir apenas um desses campos, ou utilizamos a "whitelist" ou "blacklist"
- poll.interval.ms => Intervalo em milisegundos que o nosso connector fará o dump das tabelas
- value.converter => Definindo para o formato que os nossos dados serão convertidos. Com essa informação no campo: org.apache.kafka.connect.json.JsonConverter. Sempre iremos transformar nossas mensagens em formato json;
- mode => Modo que será feito o mapeamento. O modo bulk se refere a produção dos dados completos da(s) tabela(s), partir de x tempo (definido pelo poll.interval.ms)
2 - Buscar apenas dados que foram inseridos na tabela
Configuração do connector, para que apenas eventos de inserts sejam capturados pelo connector
Obs: Os campos repetidos, não serão replicados na descrição
{
"name": "nome do connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@<host do banco>:<porta>/<nome do banco de dados>",
"connection.user": "<username do banco>",
"connection.password": "<senha do banco>",
"topic.prefix": "<prefixo do tópico kafka que será produzido>",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"mode":"incrementing",
"query": "<QUERY SELECT PARA BUSCAR OS DADOS ESPECIFICOS DA TABELA>",
"numeric.mapping":"best_fit",
"incrementing.column.name": "<NOME DO CAMPO DA TABELA, QUE OBEDEÇA VALOR DE CHAVE PRIMARIA>",
"validate.non.null": true
}
}
Config
- mode => Incrementing, tem o significado que apenas será executado, a partir do momento que a chave primária (Obrigatoriamente sendo de um tipo número), ao se produzir um novo dado, será usada como parâmetro para verificar se é um novo dado.
- query => Select que será executado, quando essa nova mensagem for gerada. Nesse momento, esse select será replicado em forma de mensagem, então o select contendo os dados corretos e úteis que o seu consumidor fará a leitura, estará presente nesse comando
- incrementing.column.name => Campo que o connector terá como parâmetro, para validar se a informação é nova na tabela;
- numeric.mapping => Validação se o campo definido referência para o connector é do tipo primitivo correto. O valor "best_fit", significa que será levado em conta vários tipos como parâmetro, desde INT até DECIMAL.
- validate.non.null => Parâmetro booleano, que exigirá ou não que o campo utilizado como referência seja não nulo
3 - Buscar apenas dados que foram atualizados na tabela
Tem um corpo semelhante ao item anterior, apenas modificando alguns campos
{
"name": "nome do connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@<host do banco>:<porta>/<nome do banco de dados>",
"connection.user": "<username do banco>",
"connection.password": "<senha do banco>",
"topic.prefix": "<prefixo do tópico kafka que será produzido>",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"mode":"timestamp",
"query": "<QUERY SELECT PARA BUSCAR OS DADOS ESPECIFICOS DA TABELA>",
"timestamp.column.name": "<NOME DO CAMPO DA TABELA, QUE OBEDEÇA VALOR DE TIMESTAMP>",
"validate.non.null": false
}
}
Config
- mode: Timestamp, para apenas captar itens de atualização
- timestamp.column.name => Coluna da tabela, que será utilizada como referência pelo connector. Esse campo deve ser do tipo datetime para que o connector funcione
Os demais campos se repetem na lógica
4 - Receber mensagens de itens que foram inseridos ou atualizados na tabela
Se baseia na junção dos dois itens anteriores. Tendo o seguinte corpo do objeto
{
"name": "nome do connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@<host do banco>:<porta>/<nome do banco de dados>",
"connection.user": "<username do banco>",
"connection.password": "<senha do banco>",
"topic.prefix": "<prefixo do tópico kafka que será produzido>",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"mode":"timestamp+incrementing",
"query": "SELECT CAST(p.PERSON_ID AS NUMERIC(8,0)) AS PERSON_ID, p.CREATED_AT, p.UPDATED_AT FROM PERSONS p",
"numeric.mapping":"best_fit",
"incrementing.column.name": "PERSON_ID",
"timestamp.column.name": "UPDATED_AT",
"validate.non.null": false
}
}
Config
- mode => timestamp+incrementing Deve seguir esse nome para que funcione no connector.
Agradecimentos
Espero que esse artigo seja útil para você, facilitando seu trabalho, evitando horas de pesquisa para um problema que já esteve no dia-a-dia de outros.
Top comments (0)