In this article we will build a multilayer perceptron, using Spark. The dataset that we are going to use for this exercise contains close to 75k records, with some sample customer journey data on a retail web site. There are 16 input features to predict whether the visitor is likely to convert. We have a balanced target class in this dataset. We will use MultilayerPerceptronClassifier
from Spark's ML library. We start by importing a few important dependencies.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('deep_learning').getOrCreate()
import os
import numpy as np
import pandas as pd
from pyspark.sql.types import *
Now we load the dataset into Spark, for feature engineering and model training. As mentioned, there are 16 input features and 1 output column (Orders_Normalized)
.
[In]: data = spark.read.csv('dl_data.csv', header=True, inferSchema=True)
[In]: data.printSchema()
[Out]: root
|-- Visit_Number_Bucket: string (nullable = true)
|-- Page_Views_Normalized: double (nullable = true)
|-- Orders_Normalized: integer (nullable = true)
|-- Internal_Search_Successful_Normalized: double (nullable = true)
|-- Internal_Search_Null_Normalized: double (nullable = true)
|-- Email_Signup_Normalized: double (nullable = true)
|-- Total_Seconds_Spent_Normalized: double (nullable = true)
|-- Store_Locator_Search_Normalized: double (nullable = true)
|-- Mapped_Last_Touch_Channel: string (nullable = true)
|-- Mapped_Mobile_Device_Type: string (nullable = true)
|-- Mapped_Browser_Type: string (nullable = true)
|-- Mapped_Entry_Pages: string (nullable = true)
|-- Mapped_Site_Section: string (nullable = true)
|-- Mapped_Promo_Code: string (nullable = true)
|-- Maped_Product_Name: string (nullable = true)
|-- Mapped_Search_Term: string (nullable = true)
|-- Mapped_Product_Collection: string (nullable = true)
We change the name of the label column from Orders_Normalized
to label
, to be able to train the model.
[In]: data = data.withColumnRenamed('Orders_Normalized', 'label')
[In]: data.printSchema()
[Out]: root
|-- Visit_Number_Bucket: string (nullable = true)
|-- Page_Views_Normalized: double (nullable = true)
|-- label: integer (nullable = true)
|-- Internal_Search_Successful_Normalized: double (nullable = true)
|-- Internal_Search_Null_Normalized: double (nullable = true)
|-- Email_Signup_Normalized: double (nullable = true)
|-- Total_Seconds_Spent_Normalized: double (nullable = true)
|-- Store_Locator_Search_Normalized: double (nullable = true)
|-- Mapped_Last_Touch_Channel: string (nullable = true)
|-- Mapped_Mobile_Device_Type: string (nullable = true)
|-- Mapped_Browser_Type: string (nullable = true)
|-- Mapped_Entry_Pages: string (nullable = true)
|-- Mapped_Site_Section: string (nullable = true)
|-- Mapped_Promo_Code: string (nullable = true)
|-- Maped_Product_Name: string (nullable = true)
|-- Mapped_Search_Term: string (nullable = true)
|-- Mapped_Product_Collection: string (nullable = true)
Because we are dealing with both numerical and categorical coluns, we must write a pipeline to create features combinind both for model training. Therefore, we import Pipeline, VectorAssembler, and OneHotEncoder, to create feature vectors. We will also import MultiClassificationEvaluator and MultilayerPerceptron, to check the performance of the model.
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf, StringType
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import MultilayerPerceptronClassifier
Split into Train and Test Sets
We now split the data into train, validation, and test sets, for the training of the model.
train, validation, test = data.randomSplit([0.7, 0.2, 0.1], 1234)
Data Preprocessing
We create separate lsits of categorical columns and numeric columns based on datatypes.
categorical_columns = [item[0] for item in data.dtypes if item[1].startswith(
'string')]
numeric_columns = [item[0] for item in data.dtypes if item[1].startswith(
'double')]
indexers = [StringIndexer(inputCol=column, outputCol='{0}_index'.format(
column)) for column in categorical_columns]
We now create consolidated feature vectors, using VectorAssembler
:
featuresCreator = VectorAssembler(
inputCols=[indexer.getOutputCol() for indexer in indexers] + numeric_columns,
outputCol='features')
layers = [len(featuresCreator.getInputCols()), 4, 2, 2]
Model Building
The next step is to build the MultilayerPerceptron model. One can play around with different hyperparameters, such as number of layers and maxiters, to improve performance of the model.
classifier = MultilayerPerceptronClassifier(labelCol='label',
featuresCol='features',
maxIter=100,
layers=layers,
blockSize=128,
seed=1234)
Now that we have defined every stage, we add all these steps to the pipeline and tun it on the training data.
pipeline = Pipeline(stages=indexers + [featuresCreator, classifier])
model = pipeline.fit(train)
We now calculate the predictions of the model on train, validation and test datasets.
train_output_df = model.transform(train)
validation_output_df = model.transform(validation)
test_output_df = model.transform(test)
train_predictionAndLabels = train_output_df.select('prediction', 'label')
validation_predictionAndLabels = validation_output_df.select('prediction', 'label')
test_predictionAndLabels = test_output_df.select('prediction', 'label')
Model Evaluation
We define three different metrics, to evaluate the performance of the model.
[In]: metrics = ['weightedPrecision', 'weightedRecall', 'accuracy']
[In]: for metric in metrics:
evaluator = MulticlassClassificationEvaluator(metricName=metric)
print('Train ' + metric + ' = ' + str(evaluator.evaluate(
train_predictionAndLabels)))
print('Validation ' + metric + ' = ' + str(evaluator.evaluate(
validation_predictionAndLabels)))
print('Test ' + metric + ' = ' + str(evaluator.evaluate(
test_predictionAndLabels)))
[Out]: Train weightedPrecision = 0.9722605697126978
[Out]: Validation weightedPrecision = 0.9734944186485901
[Out]: Test weightedPrecision = 0.9710090865749514
[Out]: Train weightedRecall = 0.9718655625913297
[Out]: Validation weightedRecall = 0.9731379731379731
[Out]: Test weightedRecall = 0.9706199460916443
[Out]: Train accuracy = 0.9718655625913297
[Out]: Validation accuracy = 0.9731379731379731
[Out]: Test accuracy = 0.9706199460916443
As we can see, the deep learning model is doing reasonably well on the test data, based on the input signal.
Top comments (0)