DEV Community

Cover image for Importando Funções Python do Repos para o Notebook do Databricks
romerito
romerito

Posted on

Importando Funções Python do Repos para o Notebook do Databricks

Importar funções no notebook do Databricks sempre foi um pouco complicado, se olharmos para a forma tradicional de desenvolvimento usando o python, é natural criarmos módulos para abstrair ao máximo o código e depois importamos para nosso arquivo principal(main). Olhando por esse lado é simples, porém olhando pela ótica de transformação de dados é um pouco complicado, atualmente para importar uma biblioteca no notebook, precisamos configurar o pacote no cluster, que seja via: pypi, whell etc ... e caso tu precises adicionar 1 novo pacote tem que parar o cluster configurar e reiniciar para que tenha efeito(cansativo).
Mas nem tudo é tristeza, hoje me deparei com algo super interessante de fazer, porém ainda não conclui a fundo o estudo que fiz, quero dizer que temos uma possibilidade, criar todo o código e armazenar no Azure Repos e depois linkar ele ao repos do Databricks e por fim fazer o import package.
Para exemplicar esse ensaio, vamos fazer o seguinte, vou criar umas funções de trasnformação de dados.
Nossa estrutura de projeto vai ficar assim.

Image description

Coisa básica, vamos a implementação da função:
loads.py

from json import loads
from pyspark.sql.types import StructType as struct

def schema(line: str):
    schema = f"schemas/{line}"
    with open(schema, "r") as file:
        get = file.read()
        estrutura = struct.fromJson(loads(get))
    return estrutura


def read(arquivo: str, schema: str, formato: str, separador: str, engine: object):
    dataframe = (
        engine.read.format(formato)
        .schema(schema)
        .option("header", "true")
        .option("delimiter", f"{separador}")
        .option("inferSchema", "false")
        .option("path", f"{arquivo}")
        .load()
    )
    return dataframe

Enter fullscreen mode Exit fullscreen mode

Função que carrega o schema, e outra função que vai ler o arquivo para ingestão

Agora vamos criar a implementação do módulo:
processing.py

def view(dataframe: object, descricao: str):
    return dataframe.createOrReplaceTempView(descricao)


def query(dataframe: object, describe: str, file: str, engine: object):
    view(dataframe, describe)
    with open(f"{file}") as load:
        query = load.read()
        queryObject = engine.sql(query)
    return queryObject

Enter fullscreen mode Exit fullscreen mode

função que cria uma view no sparksql a partir de DataFrame e outra função que executa um arquivo sql fazendo uma transformação específica.

E por fim a implementação do código principal

# Databricks notebook source
# DBTITLE 1,Adiciona Caminho de Módulos Python 
import sys
import os
sys.path.append('/Workspace/Repos/falano@outlook.com.br/delta-live-tables/functions/ingestion')
sys.path.append('/Workspace/Repos/falano@outlook.com.br/delta-live-tables/functions/transformer')

# COMMAND ----------

# DBTITLE 1,Importação de Módulos
from loads import schema
from loads import read
from processing import view
from processing import query

# COMMAND ----------

Data = "/FileStore/tables/licitacoes_2022.csv"
FileSQL = "/Workspace/Repos/fulano@outlook.com.br/delta-live-tables/sql/agrupado-por-tipo.sql"

# COMMAND ----------

# DBTITLE 1,Carrega schema de arquivo
SchLicitacao = schema("licitacoes.json")

# COMMAND ----------

# DBTITLE 1,Carrega dados
Licitacao = read(Data, SchLicitacao, "csv", ";", spark)

# COMMAND ----------

display(Licitacao)

# COMMAND ----------

# DBTITLE 1,Query que faz agrupamento pelo Tipo
ValorPorTipo = query(Licitacao,"TabelaLicitacao",FileSQL, spark)
display(ValorPorTipo)

Enter fullscreen mode Exit fullscreen mode

Lembrando que esse código possui informaçãoes do notebook do Databricks, agora vamos subir nossas modificações para o Azure DevOps via git e temos isso

Image description

E agora por dentro do Databricks vamos puxar[pull] essas informações.

Image description

Agora vou explicar célula por célula do Databricks

Image description

O método sys.path.append() adiciona os arquivos no mesmo local onde esta o nosso arquivo principal em tempo de execução, tornando simples a importação dos módulos, vamos ver se funciona?

Image description

Prontinho! o metodo type() mostra que o objeto schema é uma função

E por fim vamos ingerir 1 arquivo e fazer umas trasnformações

Image description

Agora vamos fazer uma transformação a partir de um arquivo. sql, veja como é o nosso arquivo
/Workspace/Repos/fulano.com.br/delta-live-tables/sql/agrupado-por-tipo.sql

SELECT
Tipo,
SUM(ValorContratado) as ValorContratado
FROM TabelaLicitacao
GROUP BY Tipo
ORDER BY ValorContratado DESC
Enter fullscreen mode Exit fullscreen mode

Image description

Pronto! simples demais, com essa possibilidade de ter as funçoes e arquivos de transformações no Azure Repos, o céu é o limite. Espero ter te ajudado com isso, o código está aqui.
https://github.com/romermor/databricks-using-functions-repos

Top comments (0)