Nesse post irei apresentar como utilizar o Apache Airflow, uma das mais conhecidas ferramenta para gerenciamente de fluxos, para automatizar um fluxo de Extração, Transformação e Carregamento do inglês Extract, Transform, Load (ETL). Para abordar melhor os tópicos envolvidos nesse projeto, o mesmo foi dividido em duas partes que irão originar duas publicações.
Nessa primeira parte mostrarei como realizar o desenvolvimento do início de um processo ETL utilizando Airflow. Constuíremos um Web Scraper utilizando BeautifulSoup para extrair informações de avaliação de filmes do site AdoroCinema, com os dados obtidos nós iremos escrever um arquivo .csv e enviar para o AWS S3. Na segunda parte desse projeto, iremos realizar a integração do Airflow com o Apache Spark para realizar um pequeno processamento em cima dos arquivos enviados para a AWS S3 e então armazenaremos o resultado do processamento na Amazon Redshift.
O código do projeto está disponível aqui.
Airflow
Foi um projeto desenvolvido internamente pela equipe de desenvolvimento da empresa Aibnb, surgiu como uma ferramenta para solucionar o problema que o setor encontrava com a crescente complexidade no ambiente de dados da empresa. O Airflow surgiu no final de 2014 e foi apresentado no blog da Airbnb em 2015, e desde então se tornou popular no meio da comunidade de engenharia de dados. Por fim, depois de algum tempo o projeto foi cedida para o Apache que hoje é o responsável por mantér o projeto, hoje o projeto se chama Apache Airflow.
Configuração inicial do projeto
Para esta etapa do desenvolvimento será necessário ter disponível:
- docker e docker-compose em sua máquina.
- Uma conta AWS.
- aws cli instalado em sua máquina.
- Bucket S3 configurado.
A imagem do docker utilizada foi a puckel/docker-airflow onde acrescentei o BeautifulSoup como dependência para criação da imagem em minha máquina.
As configurações de credenciais da AWS utilizadas pelo Airflow dentro do container Docker são obtidas do arquivo local .aws gerado no momento que você executa o seguinte comando no bash:
$ aws configure
Fluxo a ser implementado
O projeto consistirá em duas etapas, nessa primeira publicação implementaremos o fluxo da parte 1.
Web Scraping com BeautifulSoup
Web Scraping é o processo de obter informações que estão disponíveis na internet de forma automática. Para realizar esse processo, antes precisamos conhecer melhor sobre a página web que iremos explorar, averiguar o HTML da página é um passo muito importante para poder entender quais atributos você irá buscar durante a construção do seu Web Scraper. Para isso você pode utilizar a funcionalidade Developer Tools presente no seu navegador e explorar a estrutura do HTML da informação que você gostaria de obter.
Para a minha coleta escolhi selecionar informações referente ao título, gênero, url da foto do poster, sinopse resumida, data de lançamento, sinope completa, as avaliações disponíveis e o link para a publicação do adorocinema sobre o dado filme. A sinopse completa foi obtida a partir do link da publicação, visto que nessa página de listagem não é mostrada a sinopse completa. As vezes navegar por links para montar uma resposta é necessário pois a informação pode não está sendo exibida de forma completa na página que você iniciou a busca.
Além disso outro tópico que levei em consideração foi as avaliações, eu queria obter todas as notas disponíveis e é importante ressaltar que existem 3 classificações de avaliação: AdoroCinema, Imprensa e Leitores e que para alguns filmes uma classificação ou mais pode não possuir nota. Para isso foi necessário fazer um tratamento no momento de selecionar o componente HTML correto, para o avaliador que não possuísse classificação eu completei com uma informação nula pois futuramente posso aplicar algumas técnicas de Data Cleaning.
from bs4 import BeautifulSoup
from urllib.request import urlopen, Request
def scraping_avaliacao_adoro_cinema(page = 1):
URL = f"http://www.adorocinema.com/filmes/todos-filmes/notas-espectadores/?page={page}"
html_doc = urlopen(URL).read()
soup = BeautifulSoup(html_doc, "html.parser")
data = []
for dataBox in soup.find_all("div", class_="data_box"):
titleObj = dataBox.find("a", class_="no_underline")
imgObj = dataBox.find(class_="img_side_content").find_all(class_="acLnk")[0]
sinopseObj = dataBox.find("div", class_="content").find_all("p")[0]
dateObj = dataBox.find("div", class_="content").find("div", class_="oflow_a")
movieLinkObj = dataBox.find(class_="img_side_content").find_all("a")[0]
generoObj = dataBox.find("div", class_="content").find_all('li')[3].find('div',class_="oflow_a")
detailsLink = 'http://www.adorocinema.com' + movieLinkObj.attrs['href']
avaliacoesMeios = dataBox.find("div", class_="margin_10v").find_all('span', class_="acLnk")
avaliacoesNotas = dataBox.find("div", class_="margin_10v").find_all('span', class_="note")
# tratar a lista de avaliações
avaliadores = [ elem.text.strip() for elem in avaliacoesMeios if elem.text.strip() != "" ]
# vincular com notas
avaliacoesValores = {"AdoroCinema" : None, "Leitores" : None, "Imprensa" : None}
for index, avaliador in enumerate(avaliadores):
avaliacoesValores[avaliador] = avaliacoesNotas[index].text.strip()
#Carregar a sinopse completa
htmldocMovieDetail = urlopen(detailsLink).read()
soupMovieDetail = BeautifulSoup(htmldocMovieDetail, "html.parser")
fullSinopse = soupMovieDetail.find(class_="content-txt")
fullImgObj = soupMovieDetail.find("meta", property="og:image")
data.append({'titulo': titleObj.text.strip(),
'genero': generoObj.text.replace('\n','').strip(),
'poster' : fullImgObj["content"],
'sinopse' : sinopseObj.text.strip(),
'data' : dateObj.text[0:11].strip(),
'link' : detailsLink,
'avaliacoes' : avaliacoesValores,
'sinopseFull': fullSinopse.text})
return data
Aqui você pode conferir um artigo muito completo explicando a utilização do BeautifulSoup.
DAGs
O Airflow utiliza uma estrutura chamada DAG - Directed Acyclic Graph, que de modo geral é um grafo acíclico onde todas as tarefas a serem executadas são estruturadas refletindo suas dependências e relacionamentos. Existem muitas configuração que podem ser feitas, como aplicação de condicionais de uso como executar um nó apenas se o anterior tiver resultado em sucesso e coisas assim, é possível configurar timeout, tempo de reiniciação e etc.
Por default o Aifrlow identifica as DAGs do projeto como todo arquivo python armazenado na pasta dags, mas essa é uma configuração que pode ser sobrescrita caso você deseje mudar a localização das suas DAGs.
Criando uma DAG
Para criar uma DAG precisamos definir alguns atributos iniciais, como a dag_id, schedule_interval e alguns args.
- dag_id: é o identificador único do fluxo, aqui chamamos de 'etl_movie_review'.
- schedule_interval: indica a periodicidade da execução das tasks.
- default_args: são informações que serão refletidos em todos os nós do DAGs.
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from etl_scripts.load_to_s3 import local_to_s3
from etl_scripts.web_scraping import collect_adoro_cinema_data
from etl_scripts.remove_local_file import remove_local_file
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2021, 1, 1),
"retries": 0,
}
with DAG(
"etl_movie_review",
schedule_interval=timedelta(minutes=1),
catchup=False,
default_args=default_args
) as dag:
(...)
Criando as tasks da DAG
Para construir nossos nós o Airflow utiliza os Operators, eles são classes que encapsulam a lógica para realizar o trabalho para determinados tipos de conexão. No nosso projeto utilizaremos por enquanto apenas o PythonOperator e o DummyOperator, para realizar chamadas de execução de código python e para sinalizar inicio e fim de fluxos, respectivamente. Além desses existem alguns outros, você pode encontrar a lista de Operators aqui.
(...)
# Inicio do Pipeline
start_of_data_pipeline = DummyOperator(task_id='start_of_data_pipeline', dag=dag)
# Definindo a tarefa para realização de web scrapping
movie_review_web_scraping_stage = PythonOperator(
task_id='movie_review_web_scraping_stage',
python_callable=collect_adoro_cinema_data,
op_kwargs={
'quantidade_paginas': 1,
},
)
# definindo a tarefa para enviar o arquivo csv para S3.
movie_review_to_s3_stage = PythonOperator(
task_id='movie_review_to_s3_stage',
python_callable=local_to_s3,
op_kwargs={
'bucket_name': '<nome-do-seu-bucket>'
},
)
movie_review_remove_local = PythonOperator(
task_id='movie_review_remove_local',
python_callable=remove_local_file,
)
# Fim da Pipeline
end_of_data_pipeline = DummyOperator(task_id='end_of_data_pipeline', dag=dag)
Assim, nós temos nossa DAG pronta, ela possui 2 tarefas Dummy, ou seja apenas para representação de início e fim do fluxo, e 3 tarefas que executam funcionalidades em código Python. Cada tarefa criado também possui um ID único para identificação, é importante esses IDs também serem únicos pois de outro modo o Airflow vai retornar um erro informando que foi identificado a criação de ciclos.
Para finalizar, nós precisamos criar a ligação entre as tarefas e definir a ordem de execução, o Airflow permite algumas formas de declaração dessa ordem, para esse tutorial vamos seguir o formato mais simples.
start_of_data_pipeline >> movie_review_web_scraping_stage >> movie_review_to_s3_stage >> movie_review_remove_local >> end_of_data_pipeline
Nas nossas tasks PythonOperator, nós realizamos a chamada de 3 funções python: collect_adoro_cinema_data, local_to_s3 e remove_local_file. A primeira encapsula o Web Scraper e a escrita dos dados coletados em um arquivo .csv numa pasta temporária, já a segunda e a terceira são as funções responsáveis por integrar com o S3 e enviar os arquivos, e então remover o arquivo .csv do disco.
import os
import glob
from airflow.hooks.S3_hook import S3Hook
def local_to_s3(bucket_name, filepath='./dags/data/*.csv'):
s3 = S3Hook()
for f in glob.glob(filepath):
key = 'movie_review/'+f.split('/')[-1]
s3.load_file(filename=f, bucket_name=bucket_name,
replace=True, key=key)
def remove_local_file(filepath='./dags/data/*.csv'):
files = glob.glob(filepath)
for f in files:
os.remove(f)
Executando a DAG pelo Painel
Na tela inicial do Airflow nós iremos ver a listagem de todas as DAGs identificadas na aplicação, para executá-las basta arrastar o botão OFF para ON.
Nós podemos entrar no modo de visualização Grafo e verificar as nossas tasks e suas relações, elas vão alterando as cores de acordo com o status de execução, ao terminar a execução com sucesso suas bordas ficam verdes. No painel você também consegue identificar os tipos de Operators usados no fluxo.
O Airflow disponibiliza muitas formas de visualizar o fluxo executando, você pode acompanhar os graus de sucesso e falha por execução no modo Tree View ou pode acompanhar outros tipos de gráficos mostrando informações da execução. Você também tem acesso ao log de execução de cada tarefas, e acesso ao código da DAG, é um painel bastante completo para realizar monitoramento das tafegas criadas.
Por fim, ao final da execução das tarefas temos um arquivo .csv gerado com as informações de filmes obtidas no AdoroCinema e enviadas para o bucket S3 configurado.
Conclusão
Por fim, temos a primeira etapa do nosso projeto concluída, agora temos o S3 informações a respeito de filmes coletados na internet a partir do site AdoroCinema. Para a próxima etapa, nós iremos ler esse arquivo do S3 e realizar um pequeno processamento desses dados usando Apache Spark, a partir daí poderemos disponibilizar o resultado desses processamento em um Data Warehouse na Amazon Redshift.
Espero que tenham gostado, qualquer dúvida pode deixar nos comentários. Até a próxima!
Top comments (1)
Parabéns !!! Excelente tutorial.