DEV Community

Cover image for DAG no Airflow para invocar Google Cloud Function
Maíra Ottoni
Maíra Ottoni

Posted on

DAG no Airflow para invocar Google Cloud Function

O Airflow disponibiliza alguns operadores para trabalhar com as Cloud Functions do Google.

Se quiséssemos invocar alguma Cloud Function já implementada em nosso projeto na GCP, intuitivamente poderíamos tentar utilizar o operador CloudFunctionInvokeFunctionOperator. Porém esse operador permite um número limitado de “chamadas”, visto que ele deve ser usado somente para realização de testes.

Caso o objetivo seja utilizar em produção uma DAG que invoca uma Cloud Function podemos usar o SimpleHttpOperator, criado pela comunidade do Airflow. A seguir eu explico o passo a passo de como utilizá-lo.

Primeiramente você deverá criar uma nova conexão para armazenar os dados necessários para acessar a Cloud Function.
Na interface do Airflow, acesse na guia superior Admin > Connections. Será exibida a lista das conexões existentes, caso existam. Em seguida clique no botão + para criar uma nova conexão.

Image description

Será aberta uma página para inserção dos dados da nova conexão:

Image description

Connection id: nome da conexão que estamos criando e será utilizado na construção da DAG;
Connection Type: tipo de conexão, nesse caso selecionamos HTTP.
Host: URL de onde armazenamos as CloudFunctions do projeto. Para obtê-lo, vá até a listagem das Cloud Functions, clique nos três pontinhos ao final da linha de alguma Function e depois em Copiar Função:

Image description

Copie a URL somente até “.net”, deixando de fora a barra e o nome da função, pois precisamos apenas do endereço “geral” onde armazenamos as funções.

Image description

Com a conexão criada, partimos para a criação da DAG. Iniciamos com a importação dos módulos:

import datetime
import json

import google.auth.transport.requests
from google.cloud import bigquery

from airflow import DAG
from airflow.providers.google.common.utils import id_token_credentials as id_token_credential_utils

from airflow.providers.http.operators.http import (
    SimpleHttpOperator
)
Enter fullscreen mode Exit fullscreen mode

Repare que importamos o módulo id_token_credentials. Ele será necessário para obter os tokens necessários para invocar a Function.
Criaremos uma função Python que invoca uma Google Cloud Function e retorna as credenciais de acesso, tomando como parâmetro a URL de onde estão armazenadas as Cloud Functions e o nome da Function desejada:

def obter_credenciais_cloud_function(url, nome_funcao):
    """
    Invoca uma Cloud Function no Google Cloud e retorna as credenciais do token de identificação.

    Args:
        url (str): A URL base da Cloud Function.
        nome_funcao (str): O nome da função Cloud.

    Retorna:
        google.auth.transport.requests.IdTokenCredentials: Credenciais do token de identificação.
    """
    url_completa = f"{url}/{nome_funcao}"
    requisicao = google.auth.transport.requests.Request()  
    credenciais_token = id_token_credential_utils.get_default_id_token_credentials(url_completa, request=requisicao)

    return credenciais_token
Enter fullscreen mode Exit fullscreen mode

Em seguida, definimos as variáveis com os dados necessários como nome da DAG, Nome da conexão Airflow criada anteriormente, HOST com a URL base das Cloud Functions, nome da Function que desejamos invocar e o token de acesso obtido a partir da função de obter as credenciais.
Também instanciamos nossa DAG com os parâmetros desejados.

DAG_ID = "invoca_cloud_function"
HTTP_CONN_ID="http_cloud_function" # Nome da connection do Airflow
HOST = "https://us-central1-nome-projeto-123456.cloudfunctions.net"
FUNCTION_ID = "nome_funcao"
TOKEN = obter_credenciais_cloud_function(url=HOST, nome_funcao=FUNCTION_ID)


dag = DAG(
    DAG_ID,
    default_args={
        "retries": 3, "retry_exponential_backoff": True, "retry_delay": 30
    },
    start_date=datetime.datetime(2024, 4, 4),
    catchup=False,
    schedule_interval="0 */12 * * *",
)
Enter fullscreen mode Exit fullscreen mode

Por fim, criamos a nossa task que irá chamar a Cloud Function através do SimpleHttpOperator, utilizando os parâmetros:

task_id: nome da task que estamos criando
method: método HTTP, neste caso “POST”
http_conn_id: =Nome da conexão no Airflow configurada
endpoint: nome da Cloud Function que desejamos invocar
data: parâmetos utilizados para executar a função, passados em formato JSON
headers: cabeçalho para execução da requisição. Note que utilizamos as credenciais obtidas pela função que criamos através da variável TOKEN.

task_invoca_cloud_function = SimpleHttpOperator(
            task_id= "invoca_funcao_01",
            method="POST",
            http_conn_id=HTTP_CONN_ID,
            endpoint=FUNCTION_ID,
            data=json.dumps({"chave": "valor"}), # parametros necessarios para a função caso haja
            headers={'Authorization': f"bearer {TOKEN}", "Content-Type": "application/json"},
    )

task_invoca_cloud_function
Enter fullscreen mode Exit fullscreen mode

Confira o código completo nesse link.

Image of Docusign

🛠️ Bring your solution into Docusign. Reach over 1.6M customers.

Docusign is now extensible. Overcome challenges with disconnected products and inaccessible data by bringing your solutions into Docusign and publishing to 1.6M customers in the App Center.

Learn more

Top comments (0)

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay