Get data from Rest Api
Save Dictionary to Azure Data Lake in JSON format
Environment Specification
- Azure Synapse Runtime for Apache Spark 3.4
- Azure Data Lake Storage
- Azure Key Vault
Import dependencies
import requests
from pyspark.sql import types as T, functions as F
import json
import datetime
import logging
Get data from Rest Api
Get data from Rest Api endpoint using Azure Key Vault secret, returning a python Dictionary object
def get_api_as_dict(azure_key_vault_resource_name, secret_name, key_vault_link_name, base_url, api_endpoint, api_para):
full_apikey = mssparkutils.credentials.getSecret(azure_key_vault_resource_name, secret_name, key_vault_link_name)
full_url = f'{base_url}{api_endpoint}'
call_headers = {
'apikey': full_apikey
}
call_url = f'{full_url}?{api_para}'
response_dict = None
try:
response = requests.get(call_url, headers=call_headers)
if response.status_code == 200:
response_dict = response.json()
else:
logging.warning(f'No data ingested from {api_endpoint}')
except Exception as e:
logging.exception(f'{api_endpoint} ingestion failed')
return response_dict
Save Dictionary to Azure Data Lake in JSON format
Save python Dictionary object to Azure Data Lake storage container in JSON format, under the path:
{data_root_path}/{raw_data_object_name}/{year}/{month}/{day}/
def dict_save_to_json(dict, storage_container, storage_account, mx_lake_path, data_root_path, raw_data_object_name, log_config):
try:
utc_today = datetime.datetime.utcnow()
path_year = utc_today.year
path_month = utc_today.month
path_day = utc_today.day
path_time = utc_today.strftime('%Y%m%d_%H%M%S_%f')
full_output_path = f'abfss://{storage_container}@{storage_account}.dfs.core.windows.net{mx_lake_path}{data_root_path}/{raw_data_object_name}/{path_year}/{path_month}/{path_day}/{raw_data_object_name}_{path_time}.json'
mssparkutils.fs.put(full_output_path, json.dumps(dict), True)
except Exception as e:
logging.exception(f'{raw_data_id} saving to {storage_container} failed)
Top comments (0)