- Read data `from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
def read_data(spark, customSchema):
"""
Reads the loan data from the specified S3 location into a Spark DataFrame.
Parameters:
spark (SparkSession): The Spark session object.
customSchema (StructType): The schema for the CSV data.
Returns:
DataFrame: The loaded DataFrame.
"""
# Step 1: Define the S3 bucket and file location
bucket_name = "loan-data" # Replace with the unique bucket name provided
s3_input_path = f"s3://{bucket_name}/Inputfile/loan_data.csv"
# Step 2: Read the CSV file into a DataFrame
df = spark.read.csv(s3_input_path, header=True, schema=customSchema)
# Step 3: Return the DataFrame
return df
`
-
Clean Data
` def clean_data(input_df):
"""
Cleans the loan data by performing the following operations:- Drops rows with null values.
- Removes duplicate rows.
- Drops rows where the 'purpose' column contains the string 'null'.
Parameters:
input_df (DataFrame): The input DataFrame from the read_data function.Returns:
DataFrame: The cleaned DataFrame.
"""Step 1: Drop rows with any null values
df_no_nulls = input_df.dropna()
Step 2: Remove duplicate rows
df_no_duplicates = df_no_nulls.dropDuplicates()
Step 3: Drop rows where the 'purpose' column contains the string 'null'
df_cleaned = df_no_duplicates.filter(df_no_duplicates["purpose"] != "null")
Return the cleaned DataFrame
return df_cleaned
` -
S3_load_data
`def s3_load_data(data, file_name):
"""
Saves the final DataFrame to an S3 bucket as a single CSV file with a header.Parameters:
data (DataFrame): The output data of the result_1 and result_2 functions.
file_name (str): The name of the output file to be stored inside the S3 bucket.
"""Step 1: Mention the bucket name
bucket_name = "loan-data" # Replace with the unique bucket name if applicable
Step 2: Define the output path
output_path = "s3://" + bucket_name + "/output/" + file_name
Step 3: Check if the DataFrame has the expected row count
if data.count() != 0:
print("Loading the data to:", output_path)# Write the DataFrame to S3 as a single partition CSV file data.coalesce(1).write.csv(output_path, mode="overwrite", header=True)
else:
print("Empty DataFrame, hence cannot save the data to:", output_path)
` Result 1
`from pyspark.sql.functions import when, col
def result_1(input_df):
"""
Performs the following operations:
1. Filters rows where 'purpose' is either 'educational' or 'small business'.
2. Creates a new column 'income_to_installment_ratio' as the ratio of 'log annual inc' to 'installment'.
3. Creates a new column 'int_rate_category' based on 'int_rate' categorization:
- "low" if int_rate < 0.1
- "medium" if 0.1 <= int_rate < 0.15
- "high" if int_rate >= 0.15
4. Creates a new column 'high_risk_borrower' with value "1" if:
- dti > 20
- fico < 700
- revol_util > 80
Otherwise, sets it to "0".
Parameters:
input_df (DataFrame): The cleaned data DataFrame.
Returns:
DataFrame: The transformed DataFrame with the new columns.
"""
# Step 1: Filter rows where 'purpose' is 'educational' or 'small business'
filtered_df = input_df.filter((col("purpose") == "educational") | (col("purpose") == "small business"))
# Step 2: Add 'income_to_installment_ratio' column
with_income_ratio = filtered_df.withColumn(
"income_to_installment_ratio", col("log annual inc") / col("installment")
)
# Step 3: Add 'int_rate_category' column
with_int_rate_category = with_income_ratio.withColumn(
"int_rate_category",
when(col("int_rate") < 0.1, "low")
.when((col("int_rate") >= 0.1) & (col("int_rate") < 0.15), "medium")
.otherwise("high")
)
# Step 4: Add 'high_risk_borrower' column
final_df = with_int_rate_category.withColumn(
"high_risk_borrower",
when(
(col("dti") > 20) & (col("fico") < 700) & (col("revol_util") > 80),
1
).otherwise(0)
)
# Return the final DataFrame
return final_df
`
- Result 2
`from pyspark.sql import *
def result_2(input_df):
"""
Calculates the default rate for each purpose. The default rate is defined as:
- The count of loans that are not fully paid (not_fully_paid == 1) divided by the total count of loans for each purpose.
Parameters:
input_df (DataFrame): The cleaned data DataFrame.
Returns:
DataFrame: The DataFrame with purpose and default_rate columns.
"""
# Step 1: Calculate the total number of loans and number of not fully paid loans for each purpose
total_loans = input_df.groupBy("purpose").count().withColumnRenamed("count", "total_loans")
not_fully_paid_loans = input_df.filter(col("not_fully_paid") == 1).groupBy("purpose").count().withColumnRenamed("count", "not_fully_paid")
# Step 2: Join the two DataFrames to get total loans and not fully paid loans in the same table
result_df = total_loans.join(not_fully_paid_loans, on="purpose", how="left_outer").fillna(0)
# Step 3: Calculate the default rate as the ratio of not fully paid loans to total loans
result_df = result_df.withColumn(
"default_rate",
round(result_df["not_fully_paid"] / result_df["total_loans"], 2)
)
# Step 4: Select the desired columns and return the DataFrame
final_df = result_df.select("purpose", "default_rate")
return final_df
`
- Redshift load data
`def redshift_load_data(data):
"""
Loads the final DataFrame to the Redshift table.
Parameters:
data (DataFrame): The output of the result_2 function.
"""
# Step 1: Define the Redshift connection parameters
jdbcUrl = "jdbc:redshift://<your-cluster-endpoint>:5439/<your-database-name>"
username = "<your-username>"
password = "<your-password>"
table_name = "result_2_table" # Specify the Redshift table where data will be loaded
# Step 2: Load data into Redshift
data.write \
.format("jdbc") \
.option("url", jdbcUrl) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.mode("overwrite") \
.save()
print(f"Data successfully loaded into the {table_name} table in Redshift.")
`
Top comments (0)