DEV Community

SavvyShivam
SavvyShivam

Posted on

Configure python file in vscode

  1. Read data
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
Enter fullscreen mode Exit fullscreen mode
def read_data(spark, customSchema):

    Step 1: Define the S3 bucket and file location
    bucket_name = " loan-data "
    s3_input_path = f"s3://{bucket_name}/Inputfile/loan_data.csv"

    Step 2: Read the CSV file into a DataFrame
    df = spark.read.csv(s3_input_path, header=True, schema=customSchema)

    Step 3: Return the DataFrame
    return df
Enter fullscreen mode Exit fullscreen mode
  1. Clean Data
def clean_data(input_df):

     Step 1: Drop rows with any null values
    df_no_nulls = input_df.dropna()

    Step 2: Remove duplicate rows
    df_no_duplicates = df_no_nulls.dropDuplicates()

    Step 3: Drop rows where the 'purpose' column contains the string 'null'
    df_cleaned = df_no_duplicates.filter(df_no_duplicates["purpose"] != "null")

    Return the cleaned DataFrame
    return df_cleaned
Enter fullscreen mode Exit fullscreen mode
  1. S3_load_data
def s3_load_data(data, file_name):

    Saves the final DataFrame to an S3 bucket as a single CSV file with a header.

    Parameters:
        data (DataFrame): The output data of the result_1 and result_2 functions.
        file_name (str): The name of the output file to be stored inside the S3 bucket.

    Step 1: Mention the bucket name
    bucket_name = "loan-data"  # Replace with the unique bucket name if applicable

    Step 2: Define the output path
    output_path = "s3://" + bucket_name + "/output/" + file_name

    Step 3: Check if the DataFrame has the expected row count
    if data.count() != 0:
        print("Loading the data to:", output_path)

        # Write the DataFrame to S3 as a single partition CSV file
        data.coalesce(1).write.csv(output_path, mode="overwrite", header=True)
    else:
        print("Empty DataFrame, hence cannot save the data to:", output_path)
Enter fullscreen mode Exit fullscreen mode
  1. Result 1
from pyspark.sql.functions import when,col
def result_1(input_df):

    Performs the following operations:
    1. Filters rows where 'purpose' is either 'educational' or 'small business'.
    2. Creates a new column 'income_to_installment_ratio' as the ratio of 'log annual inc' to 'installment'.
    3. Creates a new column 'int_rate_category' based on 'int_rate' categorization:
        - "low" if int_rate < 0.1
        - "medium" if 0.1 <= int_rate < 0.15
        - "high" if int_rate >= 0.15
    4. Creates a new column 'high_risk_borrower' with value "1" if:
        - dti > 20
        - fico < 700
        - revol_util > 80
       Otherwise, sets it to "0".

    Parameters:
        input_df (DataFrame): The cleaned data DataFrame.

    Returns:
        DataFrame: The transformed DataFrame with the new columns.

    Step 1: Filter rows where 'purpose' is 'educational' or 'small business'
    filtered_df = input_df.filter((col("purpose") == "educational") | (col("purpose") == "small business"))

    Step 2: Add 'income_to_installment_ratio' column
    with_income_ratio = filtered_df.withColumn(
        "income_to_installment_ratio", col("log annual inc") / col("installment")
    )

    Step 3: Add 'int_rate_category' column
    with_int_rate_category = with_income_ratio.withColumn(
        "int_rate_category",
        when(col("int_rate") < 0.1, "low")
        .when((col("int_rate") >= 0.1) & (col("int_rate") < 0.15), "medium")
        .otherwise("high")
    )

    Step 4: Add 'high_risk_borrower' column
    final_df = with_int_rate_category.withColumn(
        "high_risk_borrower",
        when(
            (col("dti") > 20) & (col("fico") < 700) & (col("revol_util") > 80),
            1
        ).otherwise(0)
    )

    Return the final DataFrame
    return final_df

Enter fullscreen mode Exit fullscreen mode
  1. Result 2
from pyspark.sql import *

def result_2(input_df):

    Calculates the default rate for each purpose. The default rate is defined as:
    - The count of loans that are not fully paid (not_fully_paid == 1) divided by the total count of loans for each purpose.

    Parameters:
        input_df (DataFrame): The cleaned data DataFrame.

    Returns:
        DataFrame: The DataFrame with purpose and default_rate columns.

    Step 1: Calculate the total number of loans and number of not fully paid loans for each purpose
    total_loans = input_df.groupBy("purpose").count().withColumnRenamed("count", "total_loans")

    not_fully_paid_loans = input_df.filter(col("not_fully_paid") == 1).groupBy("purpose").count().withColumnRenamed("count", "not_fully_paid")

    Step 2: Join the two DataFrames to get total loans and not fully paid loans in the same table
    result_df = total_loans.join(not_fully_paid_loans, on="purpose", how="left_outer").fillna(0)

    #
Step 3: Calculate the default rate as the ratio of not fully paid loans to total loans
    result_df = result_df.withColumn(
        "default_rate", 
        round(result_df["not_fully_paid"] / result_df["total_loans"], 2)
    )

    Step 4: Select the desired columns and return the DataFrame
    final_df = result_df.select("purpose", "default_rate")

    return final_df

Enter fullscreen mode Exit fullscreen mode
  1. Redshift load data
def redshift_load_data(data):

    Loads the final DataFrame to the Redshift table.

    Parameters:
        data (DataFrame): The output of the result_2 function.

    Step 1: Define the Redshift connection parameters
    jdbcUrl = "jdbc:redshift://<your-cluster-endpoint>:5439/<your-database-name>"
    username = "<your-username>"
    password = "<your-password>"
    table_name = "result_2_table"  # Specify the Redshift table where data will be loaded

    Step 2: Load data into Redshift
    data.write.format("jdbc").option("url", jdbcUrl).option("dbtable", table_name).option("user", username).option("password", password) .mode("overwrite").save()

    print(f"Data successfully loaded into the {table_name} table in Redshift.")
Enter fullscreen mode Exit fullscreen mode

Top comments (0)