DEV Community

Cover image for DAG no Airflow para invocar Google Cloud Function
Maíra Ottoni
Maíra Ottoni

Posted on

DAG no Airflow para invocar Google Cloud Function

O Airflow disponibiliza alguns operadores para trabalhar com as Cloud Functions do Google.

Se quiséssemos invocar alguma Cloud Function já implementada em nosso projeto na GCP, intuitivamente poderíamos tentar utilizar o operador CloudFunctionInvokeFunctionOperator. Porém esse operador permite um número limitado de “chamadas”, visto que ele deve ser usado somente para realização de testes.

Caso o objetivo seja utilizar em produção uma DAG que invoca uma Cloud Function podemos usar o SimpleHttpOperator, criado pela comunidade do Airflow. A seguir eu explico o passo a passo de como utilizá-lo.

Primeiramente você deverá criar uma nova conexão para armazenar os dados necessários para acessar a Cloud Function.
Na interface do Airflow, acesse na guia superior Admin > Connections. Será exibida a lista das conexões existentes, caso existam. Em seguida clique no botão + para criar uma nova conexão.

Image description

Será aberta uma página para inserção dos dados da nova conexão:

Image description

Connection id: nome da conexão que estamos criando e será utilizado na construção da DAG;
Connection Type: tipo de conexão, nesse caso selecionamos HTTP.
Host: URL de onde armazenamos as CloudFunctions do projeto. Para obtê-lo, vá até a listagem das Cloud Functions, clique nos três pontinhos ao final da linha de alguma Function e depois em Copiar Função:

Image description

Copie a URL somente até “.net”, deixando de fora a barra e o nome da função, pois precisamos apenas do endereço “geral” onde armazenamos as funções.

Image description

Com a conexão criada, partimos para a criação da DAG. Iniciamos com a importação dos módulos:

import datetime
import json

import google.auth.transport.requests
from google.cloud import bigquery

from airflow import DAG
from airflow.providers.google.common.utils import id_token_credentials as id_token_credential_utils

from airflow.providers.http.operators.http import (
    SimpleHttpOperator
)
Enter fullscreen mode Exit fullscreen mode

Repare que importamos o módulo id_token_credentials. Ele será necessário para obter os tokens necessários para invocar a Function.
Criaremos uma função Python que invoca uma Google Cloud Function e retorna as credenciais de acesso, tomando como parâmetro a URL de onde estão armazenadas as Cloud Functions e o nome da Function desejada:

def obter_credenciais_cloud_function(url, nome_funcao):
    """
    Invoca uma Cloud Function no Google Cloud e retorna as credenciais do token de identificação.

    Args:
        url (str): A URL base da Cloud Function.
        nome_funcao (str): O nome da função Cloud.

    Retorna:
        google.auth.transport.requests.IdTokenCredentials: Credenciais do token de identificação.
    """
    url_completa = f"{url}/{nome_funcao}"
    requisicao = google.auth.transport.requests.Request()  
    credenciais_token = id_token_credential_utils.get_default_id_token_credentials(url_completa, request=requisicao)

    return credenciais_token
Enter fullscreen mode Exit fullscreen mode

Em seguida, definimos as variáveis com os dados necessários como nome da DAG, Nome da conexão Airflow criada anteriormente, HOST com a URL base das Cloud Functions, nome da Function que desejamos invocar e o token de acesso obtido a partir da função de obter as credenciais.
Também instanciamos nossa DAG com os parâmetros desejados.

DAG_ID = "invoca_cloud_function"
HTTP_CONN_ID="http_cloud_function" # Nome da connection do Airflow
HOST = "https://us-central1-nome-projeto-123456.cloudfunctions.net"
FUNCTION_ID = "nome_funcao"
TOKEN = obter_credenciais_cloud_function(url=HOST, nome_funcao=FUNCTION_ID)


dag = DAG(
    DAG_ID,
    default_args={
        "retries": 3, "retry_exponential_backoff": True, "retry_delay": 30
    },
    start_date=datetime.datetime(2024, 4, 4),
    catchup=False,
    schedule_interval="0 */12 * * *",
)
Enter fullscreen mode Exit fullscreen mode

Por fim, criamos a nossa task que irá chamar a Cloud Function através do SimpleHttpOperator, utilizando os parâmetros:

task_id: nome da task que estamos criando
method: método HTTP, neste caso “POST”
http_conn_id: =Nome da conexão no Airflow configurada
endpoint: nome da Cloud Function que desejamos invocar
data: parâmetos utilizados para executar a função, passados em formato JSON
headers: cabeçalho para execução da requisição. Note que utilizamos as credenciais obtidas pela função que criamos através da variável TOKEN.

task_invoca_cloud_function = SimpleHttpOperator(
            task_id= "invoca_funcao_01",
            method="POST",
            http_conn_id=HTTP_CONN_ID,
            endpoint=FUNCTION_ID,
            data=json.dumps({"chave": "valor"}), # parametros necessarios para a função caso haja
            headers={'Authorization': f"bearer {TOKEN}", "Content-Type": "application/json"},
    )

task_invoca_cloud_function
Enter fullscreen mode Exit fullscreen mode

Confira o código completo nesse link.

Top comments (0)