Importar funções no notebook do Databricks sempre foi um pouco complicado, se olharmos para a forma tradicional de desenvolvimento usando o python, é natural criarmos módulos para abstrair ao máximo o código e depois importamos para nosso arquivo principal(main). Olhando por esse lado é simples, porém olhando pela ótica de transformação de dados é um pouco complicado, atualmente para importar uma biblioteca no notebook, precisamos configurar o pacote no cluster, que seja via: pypi, whell etc ... e caso tu precises adicionar 1 novo pacote tem que parar o cluster configurar e reiniciar para que tenha efeito(cansativo).
Mas nem tudo é tristeza, hoje me deparei com algo super interessante de fazer, porém ainda não conclui a fundo o estudo que fiz, quero dizer que temos uma possibilidade, criar todo o código e armazenar no Azure Repos e depois linkar ele ao repos do Databricks e por fim fazer o import package
.
Para exemplicar esse ensaio, vamos fazer o seguinte, vou criar umas funções de trasnformação de dados.
Nossa estrutura de projeto vai ficar assim.
Coisa básica, vamos a implementação da função:
loads.py
from json import loads
from pyspark.sql.types import StructType as struct
def schema(line: str):
schema = f"schemas/{line}"
with open(schema, "r") as file:
get = file.read()
estrutura = struct.fromJson(loads(get))
return estrutura
def read(arquivo: str, schema: str, formato: str, separador: str, engine: object):
dataframe = (
engine.read.format(formato)
.schema(schema)
.option("header", "true")
.option("delimiter", f"{separador}")
.option("inferSchema", "false")
.option("path", f"{arquivo}")
.load()
)
return dataframe
Função que carrega o schema, e outra função que vai ler o arquivo para ingestão
Agora vamos criar a implementação do módulo:
processing.py
def view(dataframe: object, descricao: str):
return dataframe.createOrReplaceTempView(descricao)
def query(dataframe: object, describe: str, file: str, engine: object):
view(dataframe, describe)
with open(f"{file}") as load:
query = load.read()
queryObject = engine.sql(query)
return queryObject
função que cria uma view no sparksql a partir de DataFrame e outra função que executa um arquivo sql fazendo uma transformação específica.
E por fim a implementação do código principal
# Databricks notebook source
# DBTITLE 1,Adiciona Caminho de Módulos Python
import sys
import os
sys.path.append('/Workspace/Repos/falano@outlook.com.br/delta-live-tables/functions/ingestion')
sys.path.append('/Workspace/Repos/falano@outlook.com.br/delta-live-tables/functions/transformer')
# COMMAND ----------
# DBTITLE 1,Importação de Módulos
from loads import schema
from loads import read
from processing import view
from processing import query
# COMMAND ----------
Data = "/FileStore/tables/licitacoes_2022.csv"
FileSQL = "/Workspace/Repos/fulano@outlook.com.br/delta-live-tables/sql/agrupado-por-tipo.sql"
# COMMAND ----------
# DBTITLE 1,Carrega schema de arquivo
SchLicitacao = schema("licitacoes.json")
# COMMAND ----------
# DBTITLE 1,Carrega dados
Licitacao = read(Data, SchLicitacao, "csv", ";", spark)
# COMMAND ----------
display(Licitacao)
# COMMAND ----------
# DBTITLE 1,Query que faz agrupamento pelo Tipo
ValorPorTipo = query(Licitacao,"TabelaLicitacao",FileSQL, spark)
display(ValorPorTipo)
Lembrando que esse código possui informaçãoes do notebook do Databricks, agora vamos subir nossas modificações para o Azure DevOps via git e temos isso
E agora por dentro do Databricks vamos puxar[pull] essas informações.
Agora vou explicar célula por célula do Databricks
O método sys.path.append() adiciona os arquivos no mesmo local onde esta o nosso arquivo principal em tempo de execução, tornando simples a importação dos módulos, vamos ver se funciona?
Prontinho! o metodo type() mostra que o objeto schema é uma função
E por fim vamos ingerir 1 arquivo e fazer umas trasnformações
Agora vamos fazer uma transformação a partir de um arquivo. sql, veja como é o nosso arquivo
/Workspace/Repos/fulano.com.br/delta-live-tables/sql/agrupado-por-tipo.sql
SELECT
Tipo,
SUM(ValorContratado) as ValorContratado
FROM TabelaLicitacao
GROUP BY Tipo
ORDER BY ValorContratado DESC
Pronto! simples demais, com essa possibilidade de ter as funçoes e arquivos de transformações no Azure Repos, o céu é o limite. Espero ter te ajudado com isso, o código está aqui.
https://github.com/romermor/databricks-using-functions-repos
Top comments (0)