DEV Community

Max
Max

Posted on

Azure Synapse PySpark Toolbox 001: Input/Output

Contents of Toolbox


Get data from Rest Api
Save Dictionary to Azure Data Lake in JSON format


Environment Specification

  • Azure Synapse Runtime for Apache Spark 3.4
  • Azure Data Lake Storage
  • Azure Key Vault

Import dependencies

import requests
from pyspark.sql import types as T, functions as F
import json
import datetime
import logging
Enter fullscreen mode Exit fullscreen mode

Get data from Rest Api

Get data from Rest Api endpoint using Azure Key Vault secret, returning a python Dictionary object

def get_api_as_dict(azure_key_vault_resource_name, secret_name, key_vault_link_name, base_url, api_endpoint, api_para):

    full_apikey = mssparkutils.credentials.getSecret(azure_key_vault_resource_name, secret_name, key_vault_link_name)
    full_url = f'{base_url}{api_endpoint}'
    call_headers = {
        'apikey': full_apikey
        }
    call_url = f'{full_url}?{api_para}'
    response_dict = None

    try:
        response = requests.get(call_url, headers=call_headers)
        if response.status_code == 200:
            response_dict = response.json()
        else:
            logging.warning(f'No data ingested from {api_endpoint}') 
    except Exception as e:
        logging.exception(f'{api_endpoint} ingestion failed')

    return response_dict
Enter fullscreen mode Exit fullscreen mode

Back to Top


Save Dictionary to Azure Data Lake in JSON format

Save python Dictionary object to Azure Data Lake storage container in JSON format, under the path:
{data_root_path}/{raw_data_object_name}/{year}/{month}/{day}/

def dict_save_to_json(dict, storage_container, storage_account, mx_lake_path, data_root_path, raw_data_object_name, log_config):

    try:
        utc_today = datetime.datetime.utcnow()
        path_year = utc_today.year
        path_month = utc_today.month
        path_day = utc_today.day
        path_time = utc_today.strftime('%Y%m%d_%H%M%S_%f')
        full_output_path = f'abfss://{storage_container}@{storage_account}.dfs.core.windows.net{mx_lake_path}{data_root_path}/{raw_data_object_name}/{path_year}/{path_month}/{path_day}/{raw_data_object_name}_{path_time}.json'        
        mssparkutils.fs.put(full_output_path, json.dumps(dict), True)
    except Exception as e:
        logging.exception(f'{raw_data_id} saving to {storage_container} failed)
Enter fullscreen mode Exit fullscreen mode

Back to Top

Top comments (0)