DEV Community

Cover image for Primeiros passos com o Apache Airflow
Jean Cabral
Jean Cabral

Posted on • Updated on

Primeiros passos com o Apache Airflow

Introdução

Olar pessoal! Tive a oportunidade de conhecer o Apache Airflow e como achei muito bacana resolvi aprofundar um pouco nos conceitos dessa ferramenta. Este artigo vai ser algo bem introdutório, vou expor aqui alguns conceitos básicos sobre seu funcionamento, instalação e a criação de um fluxo de trabalho.

Airflow é uma plataforma criada pelo Airbnb, escrita em Python, que se tornou open-source em 2015 e logo depois cedida para o Apache Foundation.

O Airflow é um orquestrador de fluxos de tabalho. Com ele podemos programar, agendar e monitorar consultas de diversas fontes de dados, fazer tratamentos de forma simples. Veja alguns exemplos de casos de uso:

  • Criar um schedule, de forma periódica ou não, de migração de dados de uma tabela para outra;
  • Importar dados a partir de várias fontes e unificar em uma base centralizada.

Instalando

Primeiramente você deve ter um ambiente funcional com Python 3 antes de prosseguir ou se preferir rodar através do docker compose disponivel no site do Apache Airflow.

Optei pela instalação local para ficar mais didático. Iremos fazer a instalação do Airflow em sua versão estável (v2.0.2). Conforme sugerida em sua documentação oficial devemos fazer sua instalação usando o comando pip, ou seja, poetry e pip-tools não são recomendados.

Crie um diretório, onde ficará o projeto.

$ mkdir apache-airflow
$ cd apache-airflow
$ python3 -m venv .venv
$ source .venv/bin/activate
Enter fullscreen mode Exit fullscreen mode

Agora vamos instalar o airflow, execute o comando abaixo:

$ pip install apache-airflow
Enter fullscreen mode Exit fullscreen mode

Após a instalação, iremos definir uma variável de ambiente que indica onde o airflow está instalado.

$ export AIRFLOW_HOME=$PWD
Enter fullscreen mode Exit fullscreen mode

Agora vamos inicializar o ambiente.

$ airflow db init
Enter fullscreen mode Exit fullscreen mode

Após executar esse passo, notamos que ele cria o arquivo de configuração, um banco de dados sqlite onde será armazenados os metadados dos workflows, o arquivo de configuração do Airflow webserver e a um diretório de logs.

AIRFLOW_HOME

├── airflow.cfg

├── airflow.db

├── logs

└── webserver_config.cfg
Enter fullscreen mode Exit fullscreen mode

Antes de subir o servidor do airflow, primeiramente vamos criar um usuário via cli do airflow, conforme o exemplo abaixo e definir sua senha.

$ airflow users create \
    --username admin \
    --firstname Jean \
    --lastname Cabral \
    --role Admin \
    --email seumelhor@email.com
Enter fullscreen mode Exit fullscreen mode

Definindo o acesso, vamos agora subir o nosso servidor e acessar o Dashboard, executando o comando abaixo.

$ airflow webserver -p 8084
Enter fullscreen mode Exit fullscreen mode

No comando acima estou executando o servidor na porta 8084, caso o parametro port nao seja passado ele será executado na porta padrão 8080. Para ver mais parametros execute airflow webserver --help.

Depois de efetuar o login, podemos ver a lista todos os workflows que vem por 'default' na instalação da ferramenta como exemplos, que servem de base para construirmos nossas pipelines. É possível ocultar esses exemplos alterando a propriedade load_examples no arquivo airflow.cfg.

O Dashboard do Airflow é bem intuitivo, através dele podemos ter controle das execuções e o histórico de cada um deles, log de execução, gráficos e etc.

Antes de criarmos o nosso primeiro Workflow, vamos ver uns conceitos importantes:

  • ETL: (Extract, Transform, Load): Procedimento geral para copiar de uma ou mais fontes de dados para um determinado destino;

  • DAG: (Directed Acyclic Graph): Coleção de todas as tarefas a serem executadas, organizadas de forma que reflete a relação e dependências entre elas. Em termos simples, a DAG é uma coleção de todas as pequenas tarefas que se unem para realizar uma grande tarefa.

  • Operator: Enquanto a DAG define como o fluxo vai ser executado, o Operator define o que será feito;

    • BashOperator: Executa comandos no bash
    • PythonOperator: Usado para chamar python function na DAG
    • EmailOperator: Envio de email
    • SimpleHttpOperator: Fazer requisições HTTP
  • Task: Instância de um Operator em execução;

  • Worker: Unidade de Trabalho (processo, container ou serviço) que realiza o processamento de uma tarefa de cada vez;

  • Scheduler: Unidade de agendamento requerido para orquestrar a execução de trabalhos agendados;

Criando nosso workflow

O fluxo de trabalho pode ser um cálculo simples, uma consulta no banco de dados, comando bash, script python, consultas Postgres, consultas BigQuery, etc. O fluxo de trabalho é dividido em uma ou mais tarefas que se relacionam entre si e formam um DAG.

Vamos criar um diretório chamado dags e dentro dele iremos criar um script python chamado print_date_file.py, que fará o seguinte: criar um arquivo texto num dado diretório e escrever a data atual.

Vamos ao código

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta
Enter fullscreen mode Exit fullscreen mode

Essas são as libs que iremos utilizar em nossa DAG. Como iremos usar o BashOperator precisamos importar ela da biblioteca do airflow.

default_args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}
Enter fullscreen mode Exit fullscreen mode

Em default_args podemos definir um dicionário de parametros padrão que serão passados para o construtor de cada tarefa.

dag = DAG(
    dag_id='save_date_in_file_txt',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
    dagrun_timeout=timedelta(minutes=60),
    tags=['print_date_file']
)
Enter fullscreen mode Exit fullscreen mode
# Imprime a data na saída padrão.
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag
)

# Cria a pasta tmp caso ela não exista.
t2 = BashOperator(
    task_id="make_directory",
    bash_command="mkdir -p /home/jean/Code/PlayGround/apache-airflow/tmp",
    dag=dag
)

# Faz uma sleep de 5 segundos.
t3 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag
)

# Salve a data em um arquivo texto.
t4 = BashOperator(
    task_id='save_date',
    bash_command='date > /home/jean/Code/PlayGround/apache-airflow/tmp/date_output.txt',
    retries=3,
    dag=dag
)

Enter fullscreen mode Exit fullscreen mode

Veja que nas tasks (t1, t2, t3 e t4) são valores que estão chamando a classe BashOperator e recebe os argumentos necessários para execução.

Cada tarefa tem seu task_id que define de forma única uma tarefa e argumentos necessários com base no operador que estamos utilizando.

Em nosso DAG, estamos executando quatro tasks diferentes, a primeira está imprime a data na stdout, a segunda cria o diretório tmp; a t3 faz uma pausa de 5 segundos e a última cria um arquivo texto, escreve a data, e é armazenado no diretório que criamos na t2.

t1 >> t2 >> t3 >> t4 #Fluxo de execução das tasks
Enter fullscreen mode Exit fullscreen mode

Portanto, seu t1 óbvio deve ser executado antes de t2, portanto, definimos um fluxo de execução das tarefas.

Por fim, precisamos colocar nosso arquivo print_date_file.py na pasta DAG e, em seguida, ele será carregado no servidor automaticamente.

DAG print_date_file

Em Graph View podemos ver a visualização do gráfico de tarefas:

Graph View

Veja que a 'seta' indica a relação de dependência. Na task make_directory depende de print_date.

Bem, este foi um exemplo muito simples de como criamos tarefas e executamos o fluxo de trabalho. O intuíto foi trazer uma visão geral do Airflow, e o potencial que ele tem para auxiliar a criação de fluxos de trabalho em geral.

Até mais!

Top comments (0)