Since the release of Apache Spark (an open-source framework for processing Big Data), it has become one of the most widely used technologies for processing large amounts of data in parallel across multiple containers — it prides itself on efficiency and speed compared to similar software that existed before it.
Working with this amazing technology in Python is feasible through PySpark, a Python API that allows you to interact with and tap into ApacheSpark’s amazing potential using the Python programming language.
In this article, you will learn and get started with using PySpark to build a machine-learning model using the Linear Regression algorithm.
Note: Having prior knowledge of Python, an IDE like VSCode, how to use a command prompt/terminal and familiarity with Machine Learning concepts is essential for proper understanding of the concepts contained in this article.
By going through this article, you should be able to:
- Understand what ApacheSpark is.
- Learn about PySpark and how to use it for Machine Learning.
What’s PySpark all about?
According to the Apache Spark official website, PySpark lets you utilize the combined strengths of ApacheSpark (simplicity, speed, scalability, versatility) and Python (rich ecosystem, matured libraries, simplicity) for “data engineering, data science, and machine learning on single-node machines or clusters.”
PySpark is the Python API for ApacheSpark, which means it serves as an interface that lets your code written in Python communicate with the ApacheSpark technology written in Scala. This way, professionals already familiar with the Python ecosystem can quickly utilize the ApacheSpark technology. This also ensures that existing libraries used in Python remain relevant.
Detailed Guide on how to use PySpark for Machine Learning
In the ensuing steps, we will build a machine-learning model using the Linear Regression algorithm:
-
Install project dependencies: I’m assuming that you already have Python installed on your machine. If not, install it before moving to the next step. Open your terminal or command prompt and enter the code below to install the
PySpark library
.
pip install pyspark
You can install these additional Python libraries if you do not have them.
pip install pandas numpy
-
Create a file and import the necessary libraries: Open VSCode, and in your chosen project directory, create a file for your project, e.g
pyspart_model.py
. Open the file and import the necessary libraries for the project.
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import pandas as pd
- Create a spark session: Start a spark session for the project by entering this code under the imports.
spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()
-
Read the CSV file (the dataset you will be working with): If you already have your dataset named
data.csv
in your project directory/folder, load it using the code below.
data = spark.read.csv("data.csv", header=True, inferSchema=True)
- Exploratory data analysis: This step helps you understand the dataset you are working with. Check for null values and decide on the cleansing approach to use.
# Display the schema my
data.printSchema()
# Show the first ten rows
data.show(10)
# Count null values in each column
missing_values = df.select(
[count(when(isnull(c), c)).alias(c) for c in df.columns]
)
# Show the result
missing_values.show()
Optionally, if you are working with a small dataset, you can convert it to a Python data frame and directory and use Python to check for missing values.
pandas_df = data.toPandas()
# Use Pandas to check missing values
print(pandas_df.isna().sum())
- Data preprocessing: This step involves converting the columns/features in the dataset into a format that PySpark’s machine-learning library can easily understand or is compatible with.
Use VectorAssembler to combine all features into a single vector column.
# Combine feature columns into a single vector column
feature_columns = [col for col in data.columns if col != "label"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
# Transform the data
data = assembler.transform(data)
# Select only the 'features' and 'label' columns for training
final_data = data.select("features", "label")
# Show the transformed data
final_data.show(5)
- Split the dataset: Split the dataset in a proportion that is convenient for you. Here, we are using 70% to 30%: 70% for training and 30% for testing the model.
train_data, test_data = final_data.randomSplit([0.7, 0.3], seed=42)
- Train your model: We are using the Logistic Regression algorithm for training our model.
Create an instance of the LogisticRegression class and fit the model.
lr = LogisticRegression(featuresCol="features", labelCol="label")
# Train the model
lr_model = lr.fit(train_data)
- Make predictions with your trained model: Use the model we have trained in the previous step to make predictions
predictions = lr_model.transform(test_data)
# Show predictions
predictions.select("features", "label", "prediction", "probability").show(5)
- Model Evaluation: Here, the model is being evaluated to determine its predictive performance or its level of correctness. We achieve this by using a suitable evaluation metric.
Evaluate the model using the AUC metric
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC")
# Compute the AUC
auc = evaluator.evaluate(predictions)
print(f"Area Under ROC: {auc}")
The end-to-end code used for this article is shown below:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Start Spark session
spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()
# Load and preprocess data
data = spark.read.csv("data.csv", header=True, inferSchema=True)
assembler = VectorAssembler(inputCols=[col for col in data.columns if col != "label"], outputCol="features")
data = assembler.transform(data).select("features", "label")
# Split the data
train_data, test_data = data.randomSplit([0.7, 0.3], seed=42)
# Train the model
lr = LogisticRegression(featuresCol="features", labelCol="label")
lr_model = lr.fit(train_data)
# Make predictions
predictions = lr_model.transform(test_data)
predictions.select("features", "label", "prediction", "probability").show(5)
# Evaluate the model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="label", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"Area Under ROC: {auc}")
Next steps 🤔
We have reached the end of this article. By following the steps above, you have built your machine-learning model using PySpark.
Always ensure that your dataset is clean and free of null values before proceeding to the next steps. Lastly, make sure your features all contain numerical values before going ahead to train your model.
Top comments (0)