O Airflow disponibiliza alguns operadores para trabalhar com as Cloud Functions do Google.
Se quiséssemos invocar alguma Cloud Function já implementada em nosso projeto na GCP, intuitivamente poderíamos tentar utilizar o operador CloudFunctionInvokeFunctionOperator
. Porém esse operador permite um número limitado de “chamadas”, visto que ele deve ser usado somente para realização de testes.
Caso o objetivo seja utilizar em produção uma DAG que invoca uma Cloud Function podemos usar o SimpleHttpOperator
, criado pela comunidade do Airflow. A seguir eu explico o passo a passo de como utilizá-lo.
Primeiramente você deverá criar uma nova conexão para armazenar os dados necessários para acessar a Cloud Function.
Na interface do Airflow, acesse na guia superior Admin > Connections. Será exibida a lista das conexões existentes, caso existam. Em seguida clique no botão + para criar uma nova conexão.
Será aberta uma página para inserção dos dados da nova conexão:
Connection id: nome da conexão que estamos criando e será utilizado na construção da DAG;
Connection Type: tipo de conexão, nesse caso selecionamos HTTP.
Host: URL de onde armazenamos as CloudFunctions do projeto. Para obtê-lo, vá até a listagem das Cloud Functions, clique nos três pontinhos ao final da linha de alguma Function e depois em Copiar Função:
Copie a URL somente até “.net”, deixando de fora a barra e o nome da função, pois precisamos apenas do endereço “geral” onde armazenamos as funções.
Com a conexão criada, partimos para a criação da DAG. Iniciamos com a importação dos módulos:
import datetime
import json
import google.auth.transport.requests
from google.cloud import bigquery
from airflow import DAG
from airflow.providers.google.common.utils import id_token_credentials as id_token_credential_utils
from airflow.providers.http.operators.http import (
SimpleHttpOperator
)
Repare que importamos o módulo id_token_credentials
. Ele será necessário para obter os tokens necessários para invocar a Function.
Criaremos uma função Python que invoca uma Google Cloud Function e retorna as credenciais de acesso, tomando como parâmetro a URL de onde estão armazenadas as Cloud Functions e o nome da Function desejada:
def obter_credenciais_cloud_function(url, nome_funcao):
"""
Invoca uma Cloud Function no Google Cloud e retorna as credenciais do token de identificação.
Args:
url (str): A URL base da Cloud Function.
nome_funcao (str): O nome da função Cloud.
Retorna:
google.auth.transport.requests.IdTokenCredentials: Credenciais do token de identificação.
"""
url_completa = f"{url}/{nome_funcao}"
requisicao = google.auth.transport.requests.Request()
credenciais_token = id_token_credential_utils.get_default_id_token_credentials(url_completa, request=requisicao)
return credenciais_token
Em seguida, definimos as variáveis com os dados necessários como nome da DAG, Nome da conexão Airflow criada anteriormente, HOST com a URL base das Cloud Functions, nome da Function que desejamos invocar e o token de acesso obtido a partir da função de obter as credenciais.
Também instanciamos nossa DAG com os parâmetros desejados.
DAG_ID = "invoca_cloud_function"
HTTP_CONN_ID="http_cloud_function" # Nome da connection do Airflow
HOST = "https://us-central1-nome-projeto-123456.cloudfunctions.net"
FUNCTION_ID = "nome_funcao"
TOKEN = obter_credenciais_cloud_function(url=HOST, nome_funcao=FUNCTION_ID)
dag = DAG(
DAG_ID,
default_args={
"retries": 3, "retry_exponential_backoff": True, "retry_delay": 30
},
start_date=datetime.datetime(2024, 4, 4),
catchup=False,
schedule_interval="0 */12 * * *",
)
Por fim, criamos a nossa task que irá chamar a Cloud Function através do SimpleHttpOperator
, utilizando os parâmetros:
task_id: nome da task que estamos criando
method: método HTTP, neste caso “POST”
http_conn_id: =Nome da conexão no Airflow configurada
endpoint: nome da Cloud Function que desejamos invocar
data: parâmetos utilizados para executar a função, passados em formato JSON
headers: cabeçalho para execução da requisição. Note que utilizamos as credenciais obtidas pela função que criamos através da variável TOKEN.
task_invoca_cloud_function = SimpleHttpOperator(
task_id= "invoca_funcao_01",
method="POST",
http_conn_id=HTTP_CONN_ID,
endpoint=FUNCTION_ID,
data=json.dumps({"chave": "valor"}), # parametros necessarios para a função caso haja
headers={'Authorization': f"bearer {TOKEN}", "Content-Type": "application/json"},
)
task_invoca_cloud_function
Confira o código completo nesse link.
Top comments (0)