Basic DataFrame transformation
Create aggregation columns from child DataFrame
Environment Specification
- Azure Synapse Runtime for Apache Spark 3.4
- Azure Data Lake Storage
- Azure Key Vault
Import dependencies
import requests
from pyspark.sql import types as T, functions as F
import json
import datetime
import logging
Basic DataFrame transformation
Basic DataFrame transformation (adding new columns, renaming, fill na values, filter rows, select columns, drop duplicates, returning a PySpark Dataframe object)
def df_transform_basic(
df_input, #DataFrame
step1_new_cols, #List of Dictionaries [{'new_col_name': 'xxx', 'new_col_expr': 'expression string for F.expr() function'}]
step2_col_name_mapping, #Dictionary of old-new names mapping
step3_na_values,
step4_filter_expr, #String parameter for df.filter() function
step5_result_col_list, #List of column names
step6_drop_dup, #True/False
timestamp_name
):
df_output = df_input
if step1_new_cols:
for val in step1_new_cols:
df_output = df_output.withColumn(val['new_col_name'], F.expr(val['new_col_expr']))
if step2_col_name_mapping:
df_output = df_output.withColumnsRenamed(step2_col_name_mapping)
if step3_na_values:
df_output = df_output.na.fill(step3_na_values)
if step4_filter_expr:
df_output = df_output.filter(step4_filter_expr)
if step5_result_col_list:
df_output = df_output.select(*step5_result_col_list)
if step6_drop_dup:
df_output = df_output.dropDuplicates()
if timestamp_name:
df_output = df_output.withColumn(timestamp_name, F.lit(F.current_timestamp()))
return df_output
Create aggregation columns from child DataFrame
Create aggregation columns based on data from the child DataFrame, returning the updated Parent DataFrame. Each aggregation's configuration is defined in the summary_config dictionary object.
def df_summary(
parent_df, #DataFrame
child_df, #DataFrame
summary_config
):
# Example of summary_config
# summary_config=[{'agg' : {'RevisionNum': 'count'}, 'agg_result_col_name' : 'CountOfRevisions', 'na_fill_value' : 0, 'join_on' = ['Contract', 'RevisionNum']},
# {'agg' : {'RevisionNum': 'max'}, 'agg_result_col_name' : 'RecentRevNum', 'na_fill_value' : 0, 'join_on' = ['Contract', 'RevisionNum']}]
if summary_config:
for summary_item in summary_config:
agg_col = list(summary_item['agg'])[0]
agg_fun = summary_item['agg'][agg_col]
child_df_grouped = child_df.groupBy(*join_on).agg(summary_item['agg']).withColumnRenamed(f"{agg_fun}({agg_col})", summary_item['agg_result_col_name'])
parent_df = parent_df.join(child_df_grouped, summary_item['agg']['join_on'], 'left')
na_value = {}
na_value[summary_item['agg_result_col_name']] = summary_item['na_fill_value']
parent_df = parent_df.na.fill(na_value)
return parent_df
Top comments (0)