DEV Community

Max
Max

Posted on

Azure Synapse PySpark Toolbox 002: DataFrame Transformation

Contents of Toolbox


Basic DataFrame transformation
Create aggregation columns from child DataFrame


Environment Specification

  • Azure Synapse Runtime for Apache Spark 3.4
  • Azure Data Lake Storage
  • Azure Key Vault

Import dependencies

import requests
from pyspark.sql import types as T, functions as F
import json
import datetime
import logging
Enter fullscreen mode Exit fullscreen mode

Basic DataFrame transformation

Basic DataFrame transformation (adding new columns, renaming, fill na values, filter rows, select columns, drop duplicates, returning a PySpark Dataframe object)

def df_transform_basic(
    df_input, #DataFrame
    step1_new_cols, #List of Dictionaries [{'new_col_name': 'xxx', 'new_col_expr': 'expression string for F.expr() function'}]
    step2_col_name_mapping, #Dictionary of old-new names mapping
    step3_na_values,
    step4_filter_expr, #String parameter for df.filter() function
    step5_result_col_list, #List of column names
    step6_drop_dup, #True/False
    timestamp_name
):
    df_output = df_input

    if step1_new_cols:
        for val in step1_new_cols:
            df_output = df_output.withColumn(val['new_col_name'], F.expr(val['new_col_expr']))

    if step2_col_name_mapping:
        df_output = df_output.withColumnsRenamed(step2_col_name_mapping) 
    if step3_na_values:
        df_output = df_output.na.fill(step3_na_values)
    if step4_filter_expr:
        df_output = df_output.filter(step4_filter_expr)
    if step5_result_col_list:
        df_output = df_output.select(*step5_result_col_list)
    if step6_drop_dup:
        df_output = df_output.dropDuplicates()
    if timestamp_name:
        df_output = df_output.withColumn(timestamp_name, F.lit(F.current_timestamp()))

    return df_output
Enter fullscreen mode Exit fullscreen mode

Back to Top


Create aggregation columns from child DataFrame

Create aggregation columns based on data from the child DataFrame, returning the updated Parent DataFrame. Each aggregation's configuration is defined in the summary_config dictionary object.

def df_summary(
    parent_df, #DataFrame
    child_df, #DataFrame
    summary_config
):

# Example of summary_config
# summary_config=[{'agg' : {'RevisionNum': 'count'}, 'agg_result_col_name' : 'CountOfRevisions', 'na_fill_value' : 0, 'join_on' = ['Contract', 'RevisionNum']},
#        {'agg' : {'RevisionNum': 'max'}, 'agg_result_col_name' : 'RecentRevNum', 'na_fill_value' : 0, 'join_on' = ['Contract', 'RevisionNum']}]

    if summary_config:
        for summary_item in summary_config:
            agg_col = list(summary_item['agg'])[0]
            agg_fun = summary_item['agg'][agg_col]
            child_df_grouped = child_df.groupBy(*join_on).agg(summary_item['agg']).withColumnRenamed(f"{agg_fun}({agg_col})", summary_item['agg_result_col_name'])
            parent_df = parent_df.join(child_df_grouped, summary_item['agg']['join_on'], 'left')
            na_value = {}
            na_value[summary_item['agg_result_col_name']] = summary_item['na_fill_value']
            parent_df = parent_df.na.fill(na_value)

    return parent_df
Enter fullscreen mode Exit fullscreen mode

Back to Top

Top comments (0)