If data is the new oil, then Spark will be the new engine. In the recent few years, we have witnessed exponential growth in data. This is due to the increased use of smart devices and sensors that collect data in real-time. Businesses that will master how to use such data will outdo their counterparts in making intelligent decisions. To make an intelligent decision from such data, we need to understand and process it. Spark comes in handy when we want to process huge data sets because it uses parallel clusters during data processing. I am going to cover some basics of Spark. Spark supports different languages, and in this tutorial, I will be using Python. Before proceeding, set your environment and install Pyspark on your machine. I am using this dataset
(https://drive.google.com/file/d/1b2oL92aRU5_xLkGBoxLwBfWY5MYv9xpG/view?usp=sharing)
To create a spark resilient distributed dataset (RDD), we start by creating a SparkSession. Sparksession is an object that a programmer creates using a SparkSession builder pattern. Spark session is the first code in the Spark program. Spark session includes all the available Apis in different contexts. These APIS incldes.
- SparkContext
- StreamingContext
- SQLContext
- HiveContext
The following line of code will create our session in this tutorial.
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName('Erick').getOrCreate()
df_spark= spark.read.csv('salary.csv')
df_spark.show()
SparkSession.builder() returns a builder class with methods such as master(), appName(),and getOrCreate(). After creating the session the following lines of code loads our data sets and the data is stored in the data frame named df_spark. When we execute df_spark.show() we will get the following output
By default, the Spark will display the first 20 rows. However, as seen in the output, the header of the dataset is wrongly placed. To write this correctly, we have to set the header to true.
df_spark=spark.read.option('header','true').csv('salary.csv')
df_spark.show()
The output should now be as follows;
The headers are now correctly shown. We can see the number of rows by running
df_spark.head(5)
In this case, I have specified the number of rows to be 5, but you can specify to any number. When you execute this, your output should look like
[Row(MMM-YY='1/1/2016', Emp_ID=1, Age=28, Gender='Male', City='C23', Education_Level='Master', Salary=57387),
Row(MMM-YY='2/1/2016', Emp_ID=1, Age=28, Gender='Male', City='C23', Education_Level='Master', Salary=57387),
Row(MMM-YY=None, Emp_ID=None, Age=None, Gender=None, City=None, Education_Level=None, Salary=None),
Row(MMM-YY='11/1/2017', Emp_ID=2, Age=31, Gender='Male', City='C7', Education_Level='Master', Salary=67016),
Row(MMM-YY='12/1/2017', Emp_ID=2, Age=31, Gender='Male', City='C7', Education_Level='Master', Salary=67016)]
Pyspark enables us to see the datatypes of each column by executing the printschema() method. Before this, we should set inferschema to True when loading the dataset. This is done as
df_spark=spark.read.option('header','true').csv('salary.csv', inferSchema=True)
Then we should check the datatype of schema by
df_spark.printSchema()
When we run this, we should be able to see the datatypes in the following format.
When you printschema before setting inferSchema = True, your output will be as follows
This is because Spark, by default, will interpret everything as a string
This tutorial will cover select and drop functions in Spark, renaming columns, filling the missing values, filtering, and grouping by functions in Spark.
Select function.
To select the column from the dataset or data frame Select() is used. When selecting Age, gender, City, Education_Level, and Salary, we can use the following syntax.
df_spark.select(['Age','gender','City','Education_Level','Salary']).show()
When we select these columns, our output will be as follows,
Sometimes we might need to add a column in the dataset. For example, we can add a column for age after 5years
df_spark.withColumn('age after 5 years',df_spark['age']+5).show()
The output of this line is as follows.
At times we can have columns that are irrelevant to our datasets.for example we might not be in need of Emp_ID in our analysis. Therefore we can drop it as
df_spark.drop('Emp_ID').show()
As seen above, the Emp_ID has been deleted.
We can also need to rename specific columns, which can be done as follows.
df_spark.withColumnRenamed('MMM-YY','date_employed').show()
This will rename MMM-YY to date_employed.
We also need to have means of handling the missing data in the dataset. We can either drop the rows with missing values or fill in the missing values.we can decide to drop all the rows with the missing values and this is done as follows
df_spark.na.drop().show()
When we look at the output of this, we notice that all rows with some missing values have been deleted. The output will be as shown below.
We can also give some conditions using a threshold. For example, we can decide to delete rows with more than two missing values. This can be done as
df_spark.na.drop(how='any',thresh=2).show()
The output of this is as shown below. Rows with one missing value have not to be deleted
We can also decide to delete specific rows with missing values. This is done as follows
df_spark.na.drop(how="any",subset=['city']).show()
Here we will delete all city column rows with missing values. The output is as follows
We can also decide to fill in the missing values rather than deleting them. This can be done as follows
df_spark.na.fill('missing value',['age','salary']).show()
Here all the missing values in the age and salary columns will be filled with (missing value). The output is as shown below.
Pyspark has a class pyspark.ml.feature .Imputer that is used for completing missing values using mean, mode or median of the column that has a missing value. The input column of this class should be numeric because the class is not currently supporting categorical features. This class is used as shown below
from pyspark.ml.feature import Imputer
imputer = Imputer(
inputCols = ['Age', 'Salary'],
outputCols = ["{}_imputed".format(a) for a in ['Age ', 'salary']]
).setStrategy("median")
imputer.fit(df_spark).transform(df_spark).show()
The output of this will add Age_imputed and Salary_imputed columns, as shown below.
We can also need to filter our results. For example, we can filter ages equal to or below 30 years as
df_spark.filter("Age <= 30").show()
The output of this becomes as seen the displayed ages are below 31 years
We can also select columns to be filtered as
df_spark.filter("Age <= 30").select(['Gender','Education_Level','salary']).show()
The output becomes
We might also need to use more than one condition statement. Such as we can need to filter both salary and age as
df_spark.filter((df_spark['Age']<=30)&
(df_spark['Salary']>= 170000)).select(['Gender','Education_Level','salary']).show()
The output becomes
We can use not comparison statement as
df_spark.filter(~((df_spark['Age']<=30)&
(df_spark['Salary']>=170000)))
.select(['Gender','Education_Level','salary']).show()
The output becomes
We can also need to group our data into various columns. For example, we can group salary with education_level and gender as
df_spark.groupBy('Education_Level','Gender').max('salary').show()
The output is as
In summary, pyspark is the Python API for Apache spark that helps data developers to carry out data processing tasks on large datasets using distributed computing framework. The select () function is used in the selection of a specific column from a data frame. It can take one or multiple names of the column as an argument, and it returns a new data frame with only the selected column. The drop () function is used to delete one or more columns from the data frames. It takes the columns to be deleted as its arguments and returns a new dataframe without the deleted columns. Another function we have used in this tutorial is a withColumnRenamed () function. This function is used to rename a specific column in the data frame. It takes two arguments first, the name of the old column followed by the new name and it will return the renamed column. Finally, we looked at the groupBy() function that is used to group data in the data frame by one or more columns. It returns grouped data frame that can be used to [perform operations such as sum, count,or mean in the grouped data.
Good news.. you can find full notebook here (https://drive.google.com/file/d/1Iy69g13tzCCksbl8DLuQevnq2kWJns3d/view?usp=sharing)
You can follow me (https://twitter.com/ErickNdurumo) or (http://www.linkedin.com/in/erick-muriungi-1500a6122)
Happy coding!!!
Top comments (0)