Introdução
Olar pessoal! Tive a oportunidade de conhecer o Apache Airflow e como achei muito bacana resolvi aprofundar um pouco nos conceitos dessa ferramenta. Este artigo vai ser algo bem introdutório, vou expor aqui alguns conceitos básicos sobre seu funcionamento, instalação e a criação de um fluxo de trabalho.
Airflow é uma plataforma criada pelo Airbnb, escrita em Python, que se tornou open-source em 2015 e logo depois cedida para o Apache Foundation.
O Airflow é um orquestrador de fluxos de tabalho. Com ele podemos programar, agendar e monitorar consultas de diversas fontes de dados, fazer tratamentos de forma simples. Veja alguns exemplos de casos de uso:
- Criar um schedule, de forma periódica ou não, de migração de dados de uma tabela para outra;
- Importar dados a partir de várias fontes e unificar em uma base centralizada.
Instalando
Primeiramente você deve ter um ambiente funcional com Python 3 antes de prosseguir ou se preferir rodar através do docker compose disponivel no site do Apache Airflow.
Optei pela instalação local para ficar mais didático. Iremos fazer a instalação do Airflow em sua versão estável (v2.0.2). Conforme sugerida em sua documentação oficial devemos fazer sua instalação usando o comando pip
, ou seja, poetry
e pip-tools
não são recomendados.
Crie um diretório, onde ficará o projeto.
$ mkdir apache-airflow
$ cd apache-airflow
$ python3 -m venv .venv
$ source .venv/bin/activate
Agora vamos instalar o airflow, execute o comando abaixo:
$ pip install apache-airflow
Após a instalação, iremos definir uma variável de ambiente que indica onde o airflow está instalado.
$ export AIRFLOW_HOME=$PWD
Agora vamos inicializar o ambiente.
$ airflow db init
Após executar esse passo, notamos que ele cria o arquivo de configuração, um banco de dados sqlite onde será armazenados os metadados dos workflows, o arquivo de configuração do Airflow webserver e a um diretório de logs.
AIRFLOW_HOME
├── airflow.cfg
├── airflow.db
├── logs
└── webserver_config.cfg
Antes de subir o servidor do airflow, primeiramente vamos criar um usuário via cli do airflow, conforme o exemplo abaixo e definir sua senha.
$ airflow users create \
--username admin \
--firstname Jean \
--lastname Cabral \
--role Admin \
--email seumelhor@email.com
Definindo o acesso, vamos agora subir o nosso servidor e acessar o Dashboard, executando o comando abaixo.
$ airflow webserver -p 8084
No comando acima estou executando o servidor na porta 8084, caso o parametro port nao seja passado ele será executado na porta padrão 8080. Para ver mais parametros execute airflow webserver --help
.
Depois de efetuar o login, podemos ver a lista todos os workflows que vem por 'default' na instalação da ferramenta como exemplos, que servem de base para construirmos nossas pipelines. É possível ocultar esses exemplos alterando a propriedade load_examples
no arquivo airflow.cfg
.
O Dashboard do Airflow é bem intuitivo, através dele podemos ter controle das execuções e o histórico de cada um deles, log de execução, gráficos e etc.
Antes de criarmos o nosso primeiro Workflow, vamos ver uns conceitos importantes:
ETL: (Extract, Transform, Load): Procedimento geral para copiar de uma ou mais fontes de dados para um determinado destino;
DAG: (Directed Acyclic Graph): Coleção de todas as tarefas a serem executadas, organizadas de forma que reflete a relação e dependências entre elas. Em termos simples, a DAG é uma coleção de todas as pequenas tarefas que se unem para realizar uma grande tarefa.
-
Operator: Enquanto a DAG define como o fluxo vai ser executado, o Operator define o que será feito;
- BashOperator: Executa comandos no bash
- PythonOperator: Usado para chamar python function na DAG
- EmailOperator: Envio de email
- SimpleHttpOperator: Fazer requisições HTTP
Task: Instância de um Operator em execução;
Worker: Unidade de Trabalho (processo, container ou serviço) que realiza o processamento de uma tarefa de cada vez;
Scheduler: Unidade de agendamento requerido para orquestrar a execução de trabalhos agendados;
Criando nosso workflow
O fluxo de trabalho pode ser um cálculo simples, uma consulta no banco de dados, comando bash, script python, consultas Postgres, consultas BigQuery, etc. O fluxo de trabalho é dividido em uma ou mais tarefas que se relacionam entre si e formam um DAG.
Vamos criar um diretório chamado dags
e dentro dele iremos criar um script python chamado print_date_file.py
, que fará o seguinte: criar um arquivo texto num dado diretório e escrever a data atual.
Vamos ao código
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta
Essas são as libs que iremos utilizar em nossa DAG. Como iremos usar o BashOperator precisamos importar ela da biblioteca do airflow.
default_args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
Em default_args
podemos definir um dicionário de parametros padrão que serão passados para o construtor de cada tarefa.
dag = DAG(
dag_id='save_date_in_file_txt',
default_args=default_args,
schedule_interval=timedelta(days=1),
dagrun_timeout=timedelta(minutes=60),
tags=['print_date_file']
)
# Imprime a data na saída padrão.
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag
)
# Cria a pasta tmp caso ela não exista.
t2 = BashOperator(
task_id="make_directory",
bash_command="mkdir -p /home/jean/Code/PlayGround/apache-airflow/tmp",
dag=dag
)
# Faz uma sleep de 5 segundos.
t3 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag
)
# Salve a data em um arquivo texto.
t4 = BashOperator(
task_id='save_date',
bash_command='date > /home/jean/Code/PlayGround/apache-airflow/tmp/date_output.txt',
retries=3,
dag=dag
)
Veja que nas tasks (t1, t2, t3 e t4) são valores que estão chamando a classe BashOperator
e recebe os argumentos necessários para execução.
Cada tarefa tem seu task_id
que define de forma única uma tarefa e argumentos necessários com base no operador que estamos utilizando.
Em nosso DAG, estamos executando quatro tasks diferentes, a primeira está imprime a data na stdout, a segunda cria o diretório tmp; a t3 faz uma pausa de 5 segundos e a última cria um arquivo texto, escreve a data, e é armazenado no diretório que criamos na t2.
t1 >> t2 >> t3 >> t4 #Fluxo de execução das tasks
Portanto, seu t1 óbvio deve ser executado antes de t2, portanto, definimos um fluxo de execução das tarefas.
Por fim, precisamos colocar nosso arquivo print_date_file.py na pasta DAG e, em seguida, ele será carregado no servidor automaticamente.
Em Graph View podemos ver a visualização do gráfico de tarefas:
Veja que a 'seta' indica a relação de dependência. Na task make_directory
depende de print_date
.
Bem, este foi um exemplo muito simples de como criamos tarefas e executamos o fluxo de trabalho. O intuíto foi trazer uma visão geral do Airflow, e o potencial que ele tem para auxiliar a criação de fluxos de trabalho em geral.
Até mais!
Top comments (0)