Reduzindo Custos com Automação de Processos no Databricks
Tive uma necessidade em um cliente de reduzir o custo de processos que rodavam no Databricks. Uma das features que o Databricks era responsável era de coletar os arquivos de vários SFTP, descompactá-los e colocá-los no Data Lake.
A automação de fluxos de trabalho de dados é um componente crucial na engenharia de dados moderna. Neste artigo, exploraremos como criar uma função AWS Lambda usando GitLab CI/CD e Terraform, que permite a uma aplicação em Go conectar-se a um servidor SFTP, coletar arquivos, armazená-los no Amazon S3 e, por fim, acionar um job no Databricks. Este processo end-to-end é essencial para sistemas que dependem de integração e automação de dados eficientes.
O que Você Vai Precisar para Este Artigo
- Conta no GitLab com um repositório para o projeto.
- Conta na AWS com permissões para criar recursos Lambda, S3 e IAM.
- Conta no Databricks com permissões para criar e executar jobs.
- Conhecimento básico em Go, Terraform e GitLab CI/CD.
Passo 1: Preparando a Aplicação em Go
Comece criando uma aplicação em Go que se conectará ao servidor SFTP para coletar arquivos. Utilize pacotes como github.com/pkg/sftp
para estabelecer a conexão SFTP e github.com/aws/aws-sdk-go
para interagir com o serviço S3 da AWS.
package main
import (
"fmt"
"log"
"os"
"path/filepath"
"github.com/pkg/sftp"
"golang.org/x/crypto/ssh"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
func main() {
// Configuração do cliente SFTP
user := "seu_usuario_sftp"
pass := "sua_senha_sftp"
host := "endereco_sftp:22"
config := &ssh.ClientConfig{
User: user,
Auth: []ssh.AuthMethod{
ssh.Password(pass),
},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}
// Conectar ao servidor SFTP
conn, err := ssh.Dial("tcp", host, config)
if err != nil {
log.Fatal(err)
}
client, err := sftp.NewClient(conn)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// Baixar arquivos do SFTP
remoteFilePath := "/path/to/remote/file"
localDir := "/path/to/local/dir"
localFilePath := filepath.Join(localDir, filepath.Base(remoteFilePath))
dstFile, err := os.Create(localFilePath)
if err != nil {
log.Fatal(err)
}
defer dstFile.Close()
srcFile, err := client.Open(remoteFilePath)
if err != nil {
log.Fatal(err)
}
defer srcFile.Close()
if _, err := srcFile.WriteTo(dstFile); err != nil {
log.Fatal(err)
}
fmt.Println("Arquivo baixado com sucesso:", localFilePath)
// Configuração do cliente S3
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String("us-west-2"),
}))
uploader := s3manager.NewUploader(sess)
// Carregar arquivo para o S3
file, err := os.Open(localFilePath)
if err != nil {
log.Fatal(err)
}
defer file.Close()
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String("seu-bucket-s3"),
Key: aws.String(filepath.Base(localFilePath)),
Body: file,
})
if err != nil {
log.Fatal("Falha ao carregar arquivo para o S3:", err)
}
fmt.Println("Arquivo carregado com sucesso no S3")
}
Passo 2: Configurando o Terraform
O Terraform será usado para provisionar a função Lambda e os recursos necessários na AWS. Crie um arquivo main.tf
com a configuração necessária para criar a função Lambda, as políticas de IAM e os buckets do S3.
provider "aws" {
region = "us-east-1"
}
resource "aws_iam_role" "lambda_execution_role" {
name = "lambda_execution_role"
assume_role_policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Action = "sts:AssumeRole",
Effect = "Allow",
Principal = {
Service = "lambda.amazonaws.com"
},
},
]
})
}
resource "aws_iam_policy" "lambda_policy" {
name = "lambda_policy"
description = "A policy that allows a lambda function to access S3 and SFTP resources"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Action = [
"s3:ListBucket",
"s3:GetObject",
"s3:PutObject",
],
Effect = "Allow",
Resource = [
"arn:aws:s3:::seu-bucket-s3",
"arn:aws:s3:::seu-bucket-s3/*",
],
},
]
})
}
resource "aws_iam_role_policy_attachment" "lambda_policy_attachment" {
role = aws_iam_role.lambda_execution_role.name
policy_arn = aws_iam_policy.lambda_policy.arn
}
resource "aws_lambda_function" "sftp_lambda" {
function_name = "sftp_lambda_function"
s3_bucket = "seu-bucket-s3-com-codigo-lambda"
s3_key = "sftp-lambda.zip"
handler = "main"
runtime = "go1.x"
role = aws_iam_role.lambda_execution_role.arn
environment {
variables = {
SFTP_HOST = "endereco_sftp",
SFTP_USER = "seu_usuario_sftp",
SFTP_PASSWORD = "sua_senha_sftp",
S3_BUCKET = "seu-bucket-s3",
}
}
}
resource "aws_s3_bucket" "s3_bucket" {
bucket = "seu-bucket-s3"
acl = "private"
}
Passo 3: Configurando o GitLab CI/CD
No GitLab, defina o pipeline CI/CD no arquivo .gitlab-ci.yml
. Este pipeline deve incluir etapas para testar a aplicação Go, executar o Terraform para provisionar a infraestrutura e uma etapa para limpeza, se necessário.
stages:
- test
- build
- deploy
variables:
S3_BUCKET: "seu-bucket-s3"
AWS_DEFAULT_REGION: "us-east-1"
TF_VERSION: "1.0.0"
before_script:
- 'which ssh-agent || ( apt-get update -y && apt-get install openssh-client -y )'
- eval $(ssh-agent -s)
- echo "$PRIVATE_KEY" | tr -d '\r' | ssh-add -
- mkdir -p ~/.ssh
- chmod 700 ~/.ssh
- ssh-keyscan -H 'endereco_sftp' >> ~/.ssh/known_hosts
test:
stage: test
image: golang:1.18
script:
- go test -v ./...
build:
stage: build
image: golang:1.18
script:
- go build -o myapp
- zip -r sftp-lambda.zip myapp
artifacts:
paths:
- sftp-lambda.zip
only:
- master
deploy:
stage: deploy
image: hashicorp/terraform:$TF_VERSION
script:
- terraform init
- terraform apply -auto-approve
only:
- master
environment:
name: production
Passo 4: Integrando com o Databricks
Após o upload dos arquivos para o S3, a função Lambda deve acionar um job no Databricks. Isso pode ser feito utilizando a API do Databricks para iniciar jobs existentes.
package main
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)
// Estrutura para a requisição de iniciar um job no Databricks
type DatabricksJobRequest struct {
JobID int `json:"job_id"`
}
// Função para acionar um job no Databricks
func triggerDatabricksJob(databricksInstance string, token string, jobID int) error {
url := fmt.Sprintf("https://%s/api/2.0/jobs/run-now", databricksInstance)
requestBody, _ := json.Marshal(DatabricksJobRequest{JobID: jobID})
req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBody))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("Failed to trigger Databricks job, status code: %d", resp.StatusCode)
}
return nil
}
func main() {
// ... (código existente para conectar ao SFTP e carregar no S3)
// Substitua pelos seus valores reais
databricksInstance := "your-databricks-instance"
databricksToken := "your-databricks-token"
databricksJobID := 123 // ID do job que você deseja acionar
// Acionar o job no Databricks após o upload para o S3
err := triggerDatabricksJob(databricksInstance, databricksToken, databricksJobID)
if err != nil {
log.Fatal("Erro ao acionar o job do Databricks:", err)
}
fmt.Println("Job do Databricks acionado com sucesso")
}
Passo 5: Executando o Pipeline
Faça o push do código para o repositório GitLab para que o pipeline seja executado. Verifique se todos os passos são concluídos com sucesso e se a função Lambda está operacional e interagindo corretamente com o S3 e o Databricks.
Uma vez que você tenha o código completo e o arquivo .gitlab-ci.yml
configurado, você pode executar o pipeline seguindo estes passos:
- Faça o push do seu código para o repositório GitLab:
git add .
git commit -m "Adiciona função Lambda para integração SFTP, S3 e Databricks"
git push origin master
- O GitLab CI/CD detectará o novo commit e iniciará o pipeline automaticamente.
- Acompanhe a execução do pipeline no GitLab acessando a seção CI/CD do seu repositório.
- Se todos os estágios forem bem-sucedidos, sua função Lambda será implantada e pronta para ser usada.
Lembre-se de que você precisará configurar as variáveis de ambiente no GitLab CI/CD para armazenar informações sensíveis, como tokens de acesso e chaves privadas. Isso pode ser feito na seção ‘Settings’ > ‘CI / CD’ > ‘Variables’ do seu projeto GitLab.
Além disso, certifique-se de que o token do Databricks tenha as permissões necessárias para acionar jobs e que o job exista com o ID fornecido.
Conclusão
A automação de tarefas de engenharia de dados pode ser significativamente simplificada com o uso de ferramentas como GitLab CI/CD, Terraform e AWS Lambda. Ao seguir os passos descritos neste artigo, você pode criar um sistema robusto que automatiza a coleta e integração de dados entre SFTP, S3 e Databricks, tudo isso com a eficiência e a simplicidade da linguagem Go. Com essa abordagem, você estará bem equipado para lidar com os desafios de integração de dados em escala.
Meus contatos:
aws #lambda #terraform #gitlab #ci_cd #go #databricks #dataengineering #automation
Aumentando a Autonomia em Pipelines de Dados com Databricks
Com o aumento da demanda por maior autonomia em pipelines de dados, a plataforma de análise de dados Databricks tornou-se uma solução popular para empresas que desejam desenvolver pipelines escaláveis e eficientes. Durante um projeto freelancer no qual me foi solicitado uma maior autonomia sobre o core da empresa, que basicamente é definido por um pipeline de dados que importa arquivos de vários SFTP diferentes, faz a ingestão para o S3, uma série de tratamentos de dados para seguir um layout mandatório e, por fim, o envio para um sistema de fila onde os demais sistemas poderiam usufruir dos dados refinados. Pensei em como poderia atender aos seguintes requisitos com pouco ou quase nenhum desenvolvimento além do que já existe de recurso no Databricks. Por isso, falaremos neste artigo como você pode utilizar a API do Databricks como um serviço interno e dar autonomia para:
- Reprocessar um arquivo desde sua origem (SFTP).
- Gerar apenas o JSON que é enviado para o SQS.
- Processar um arquivo em ambiente de homologação sem enviar para o SQS.
Estas são algumas autonomias que foram exigidas, mas o que será demonstrado aqui vale para qualquer regra de negócio.
dbutils — notebook.run e widgets.get
Antes de algumas evoluções do Databricks em relação a “steps” dentro de um job, eu sempre utilizei uma função muito interessante, que é o dbutils.notebook.run
. Com ele, eu conseguia montar uma espécie de orquestração de notebooks que gostaria de executar tanto em sequência como em paralelo, bem como atribuir retry e tempo (parâmetros que você pode passar nessa função). Desta forma, eu montei um único notebook mais ou menos como o abaixo, que fazia a minha orquestração de extração, transformação e envio dos dados de centenas de arquivos:
Exemplo do Notebook de Orquestração
class PropriedadesNoteBook:
def __init__(self, path, timeout, retry=1):
self.path = path
self.timeout = timeout
self.retry = retry
nbmovefiles = PropriedadesNoteBook("0-move-ftp-to-s3", 500, 3)
nblimpadados = PropriedadesNoteBook("1-notebook_limpa_dados", 1000, 2)
nbimport = PropriedadesNoteBook("2-notebook_importa_arquivos", 1000, 2)
nbidentificadores = PropriedadesNoteBook("3-notebook_processa_identificadores", 1000, 2)
nbprocessjson = PropriedadesNoteBook("4-notebook_processa_json", 2000, 2)
nbexcluidados = PropriedadesNoteBook("6-notebook_envia_json_sqs", 1000, 1)
if dbutils.notebook.run(nbmovefiles.path, nbmovefiles.timeout) != "error":
send_slack_message("", "Notebook " + nbmovefiles.path + " executado com sucesso!")
else:
print("Ocorreu um erro na execução do notebook: " + nbmovefiles.path)
send_slack_message("", f"Ocorreu uma falha na execução do notebook: {nbmovefiles.path}")
raise
# Repita a lógica para os outros notebooks...
No exemplo acima, eu faço toda a minha etapa de “ETL” de forma sequencial, visto que, em meu caso, os arquivos eram disponibilizados no SFTP de 1 a 3 vezes ao dia. Legal, você tem seu job, sua orquestração, seus notebooks que executam tarefas e seu job é executado em uma janela definida de acordo com o que foi definido com o time de negócios. Mas, depois de desenvolvido, testado e validado, esse processo foi crescendo ao longo do tempo, principalmente em três aspectos:
- Muitos setups de novos clientes, que o time de produtos precisava validar o JSON processado pelo Databricks a nível de homologação ou produção.
- Recorrência de clientes que, por um erro do lado deles, o arquivo foi disponibilizado fora do schedule do seu job.
- Necessidade de enviar o fluxo completo, mas para um sistema de mensageria diferente.
Bom, chegamos onde será demonstrado como utilizar e a efetividade das funções abaixo:
-
notebook_params
(parâmetro da API do Databricks). -
dbutils.widgets.getArgument
. -
dbutils.widgets.text
.
Passando e Recebendo Dados Através dos Widgets Entre Notebooks
Conforme havia comentado anteriormente, eu senti a necessidade de trazer mais liberdade para usuários e sistemas e tirar um pouco a preocupação do engenheiro de dados de modificar, executar e monitorar o fluxo de importação, processamento e envio dos dados para o AWS SQS (sistema de fila de mensagens). Desta forma, comecei a elaborar um notebook responsável totalmente pelo reprocessamento das necessidades acima.
Continuando, além do notebook que desenvolvi, que denominei de reprocessamento
, criei um segundo notebook chamado orquestrador_reprocessamento
e, por último, criei um bucket dedicado a esse tipo de necessidade de reprocessamento. Ótimo, agora como o usuário ou aplicação vai chamar esse notebook e especificar o que ele quer exatamente diante dessas três novas funcionalidades? Para isso, utilizei a própria API do Databricks, composta pelo ID do seu cluster seguido de .cloud.databricks.com/api/2.1/jobs/run-now
.
Além disso, conforme mencionei anteriormente, criei um bucket focado em reprocessamento de dados, no qual dentro desse bucket criei as seguintes pastas:
hmle/json
hmle/sqs
json_gerados
prod/json
prod/sqs
Utilizando a API do Databricks para Reprocessamento Completo
Você pode estar se perguntando: por que introduzi uma lógica do usuário disponibilizar o arquivo no bucket nas pastas específicas de acordo com sua necessidade, mas o reprocessamento completo (SFTP -> ingestão S3 -> limpeza dados -> processamento layout -> envio SQS) não segue esse padrão e sim via API Databricks? Este caso de reprocessamento completo envolve uma etapa na qual o usuário ou API interna envia o arquivo já obtido pelo usuário para um dos buckets, que é o acesso ao SFTP de cada cliente (+40 clientes). Além de ser uma informação sensível, eu preciso garantir que, quando manipulo o SFTP do cliente, o arquivo vai chegar até a etapa final, que é o AWS SQS.
Portanto, no meu notebook de orquestração, valido se dbutils.widgets.get("ftp")
existe, se está preenchido como true
ou false
e, caso seja true
, chamo o meu notebook de comunicação e importação com os SFTP dos clientes.
Conclusão
Neste artigo, exploramos as vantagens de utilizar a API do Databricks e seus recursos em um projeto que envolve a importação, processamento e envio de dados para um sistema de fila. Demonstramos como a plataforma Databricks permite a criação de pipelines de dados escaláveis e eficientes, proporcionando maior autonomia aos usuários e sistemas. Utilizando a função dbutils.notebook.run
, foi possível orquestrar notebooks e executar etapas de ETL de forma sequencial e paralela, além de atribuir retry e time. Com a implementação dos widgets e a API do Databricks, conseguimos adaptar o fluxo de trabalho de acordo com as necessidades específicas dos usuários, permitindo o reprocessamento de arquivos, geração de JSONs e processamento em ambiente de homologação ou produção.
Meus contatos:
Top comments (1)
This article is really interesting! I'm impressed by how you've used Databricks' API and widgets to build such a flexible and adaptable data processing pipeline. I can see how this approach would be extremely helpful for managing different data sources and processing scenarios.