How to install and use Apache Spark on windows

We are already aware that we need Hadoop to store both structured and unstructured data data that is so large that it won’t be a fine choice to store it normally on a drive but what to do with it afterwards? that’s where Apache Spark comes in. According to their official site, Apache Spark™ is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters. Hence you get to make sense out of the large clusters of data you’ve gathered.

Installing and Setting up Apache Spark

First and foremost, go the link https://spark.apache.org/downloads.html and download the Spark version that suits you along with the matching Hadoop package of it.

Now remember the version of Hadoop package you are downloading with, it will come in handy later on, the link will take you to official Apache where a new generated link is present that you can download.

Unzip the downloaded file into your C drive so the directed path will look like this C:\Apache Spark Extracted Folder.

Now we need to download winutils from this link that matches the version of your Hadoop package. So in case you downloaded package 3.2, you will download winutils.exe of version 3.2.0 by hitting the download button as shown.

Now create a new folder with the name Hadoop and inside of it create another folder named bin and place that winutils.exe into it, hence the directory should look like this, C:\Users(username)\Desktop\Hadoop\bin\winutils.exe.

Once we are done with that, we now have to set up environment variables for these files(Apache Spark and winutils) we’ve placed. Go to your search bar and type “environment” and click on the shown option below.

Click on Environment Variables and then click on “New”, afterwards, make a new variable as shown below that contains the variable name and the directory in which it is found.

For the winutils file we’ve placed.
For the Spark directory (your spark folder name may vary).

Now that we are done with that, we are ready to use Spark on windows in local mode, go to your Command Prompt and from there go to your Spark directory -> bin folder and run the following command “spark-shell”, if everything went right, you should be able to see the following screen.

It will state your version in there and not 2.4.3.

After we are done with the installation, we can test our Spark installation in google collab environment.

import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()
# Test the spark
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])

df.show(3, False)/content/spark-3.1.2-bin-hadoop3.2/python/pyspark/sql/session.py:381: UserWarning: inferring schema from dict is deprecated,please use pyspark.sql.Row instead
warnings.warn("inferring schema from dict is deprecated,"


+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows
# make sure the version of pyspark
import pyspark
print(pyspark.__version__)
3.1.2

Classification algorithms with Spark

Now that we are done with the version testing of Spark, we can try out an example of how ML is performed with it i.e using a supervised clustering algorithm on an iris dataset, run the following command to download it from github.

# Downloading the clustering data
!wget -q "https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/iris.csv"
df = pd.read_csv("https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/iris.csv", header=None)df.head()

After running df.head(), you should be able to see the following output

we will now create dataframes according to the columns we have

spark.createDataFrame(df, columns)DataFrame[c_0: double, c_1: double, c_2: double, c_3: double, c4 : string]

We now apply preprocessing to understand the iris dataset to yield the best output when applying the ML algorithm.

from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer# Read the iris data
df_iris = pd.read_csv("https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/iris.csv", header=None)
iris_df = spark.createDataFrame(df_iris)iris_df.show(5, False)
+------------+-----------+------------+-----------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|species |
+------------+-----------+------------+-----------+-----------+
|5.1 |3.5 |1.4 |0.2 |Iris-setosa|
|4.9 |3.0 |1.4 |0.2 |Iris-setosa|
|4.7 |3.2 |1.3 |0.2 |Iris-setosa|
|4.6 |3.1 |1.5 |0.2 |Iris-setosa|
|5.0 |3.6 |1.4 |0.2 |Iris-setosa|
+------------+-----------+------------+-----------+-----------+
only showing top 5 rows# Rename the columns
iris_df = iris_df.select(col("0").alias("sepal_length"),
col("1").alias("sepal_width"),
col("2").alias("petal_length"),
col("3").alias("petal_width"),
col("4").alias("species"),
)# Converting the columns into features
vectorAssembler = VectorAssembler(inputCols = ["sepal_length", "sepal_width", "petal_length", "petal_width"],
outputCol = "features")
viris_df = vectorAssembler.transform(iris_df)viris_df.show(5, False)
+------------+-----------+------------+-----------+-----------+-----------------+
|sepal_length|sepal_width|petal_length|petal_width|species |features |
+------------+-----------+------------+-----------+-----------+-----------------+
|5.1 |3.5 |1.4 |0.2 |Iris-setosa|[5.1,3.5,1.4,0.2]|
|4.9 |3.0 |1.4 |0.2 |Iris-setosa|[4.9,3.0,1.4,0.2]|
|4.7 |3.2 |1.3 |0.2 |Iris-setosa|[4.7,3.2,1.3,0.2]|
|4.6 |3.1 |1.5 |0.2 |Iris-setosa|[4.6,3.1,1.5,0.2]|
|5.0 |3.6 |1.4 |0.2 |Iris-setosa|[5.0,3.6,1.4,0.2]|
+------------+-----------+------------+-----------+-----------+-----------------+
only showing top 5 rowsindexer = StringIndexer(inputCol="species", outputCol = "label")
iviris_df = indexer.fit(viris_df).transform(viris_df)iviris_df.show(2, False)
+------------+-----------+------------+-----------+-----------+-----------------+-----+
|sepal_length|sepal_width|petal_length|petal_width|species |features |label|
+------------+-----------+------------+-----------+-----------+-----------------+-----+
|5.1 |3.5 |1.4 |0.2 |Iris-setosa|[5.1,3.5,1.4,0.2]|0.0 |
|4.9 |3.0 |1.4 |0.2 |Iris-setosa|[4.9,3.0,1.4,0.2]|0.0 |
+------------+-----------+------------+-----------+-----------+-----------------+-----+
only showing top 2 rows

once our data is prepared we can now apply our classification algorithm i.e The Naive Bayes Classifier.

from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#Create the traing and test splits
splits = iviris_df.randomSplit([0.6,0.4], 1)
train_df = splits[0]
test_df = splits[1]
#Apply the Naive bayes classifier
nb = NaiveBayes(modelType="multinomial")
nbmodel = nb.fit(train_df)
predictions_df = nbmodel.transform(test_df)predictions_df.show(1, False)
+------------+-----------+------------+-----------+-----------+-----------------+-----+------------------------------------------------------------+------------------------------------------------------------+----------+
|sepal_length|sepal_width|petal_length|petal_width|species |features |label|rawPrediction |probability |prediction|
+------------+-----------+------------+-----------+-----------+-----------------+-----+------------------------------------------------------------+------------------------------------------------------------+----------+
|4.3 |3.0 |1.1 |0.1 |Iris-setosa|[4.3,3.0,1.1,0.1]|0.0 |[-9.966434726497221,-11.294595492758821,-11.956012812323921]|[0.7134106367667451,0.18902823898426235,0.09756112424899269]|0.0 |
+------------+-----------+------------+-----------+-----------+-----------------+-----+------------------------------------------------------------+------------------------------------------------------------+----------+
only showing top 1 row

Another common example of classification would be to do with Decision Tree classification.

from pyspark.ml.classification import DecisionTreeClassifier
#Define the DT Classifier
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
#Evaluate the DT Classifier
dt_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
dt_accuracy = dt_evaluator.evaluate(dt_predictions)
dt_accuracy0.9827586206896551

Using Regression with Spark

We now explore Machine Learning model of Regression to test on Combined Cycle Power Plant dataset.

Download the model from the github link by running the code below

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
#Read the dataset
df_ccpp = pd.read_csv("https://raw.githubusercontent.com/amjadraza/blogs-data/master/spark_ml/ccpp.csv")
pp_df = spark.createDataFrame(df_ccpp)pp_df.show(2, False)
+-----+-----+-------+-----+------+
|AT |V |AP |RH |PE |
+-----+-----+-------+-----+------+
|14.96|41.76|1024.07|73.17|463.26|
|25.18|62.96|1020.04|59.08|444.37|
+-----+-----+-------+-----+------+
only showing top 2 rows
#Create the feature column using VectorAssembler class
vectorAssembler = VectorAssembler(inputCols =["AT", "V", "AP", "RH"], outputCol = "features")
vpp_df = vectorAssembler.transform(pp_df)
vpp_df.show(2, False)
+-----+-----+-------+-----+------+---------------------------+
|AT |V |AP |RH |PE |features |
+-----+-----+-------+-----+------+---------------------------+
|14.96|41.76|1024.07|73.17|463.26|[14.96,41.76,1024.07,73.17]|
|25.18|62.96|1020.04|59.08|444.37|[25.18,62.96,1020.04,59.08]|
+-----+-----+-------+-----+------+---------------------------+
only showing top 2 rows

Now we can try this dataset out on a simple Linear Regression model.

#Define and fit Linear Regression
lr = LinearRegression(featuresCol="features", labelCol="PE")
lr_model = lr.fit(vpp_df)
#Print and save the Model output
lr_model.coefficients
lr_model.intercept
lr_model.summary.rootMeanSquaredError4.557126016749486
#lr_model.save()

Conclusion

We have seen how we can install Spark on windows and use with a couple of Machine Learning Algorithms. Spark shines over MapReduce as it gathers data in parallelism onto a single node with much more efficient processing hence the reason why it is widely used to analyze big data.

A curious minded fella who's head is over the clouds and beyond most of the time.