Spark Machine Learning

Spark version: 2.1.0
Scala version: 2.12.1
Hadoop version: 2.7.3

Still using the same setup, this time we will dive into Spark ML. We will use logistic regression to train a “model” over a set of news articles, and then use the model to check other articles whether they are of a particular type.

Sample data are from here: https://kdd.ics.uci.edu/databases/20newsgroups/. You can use wget to download 20_newsgroups.tar.gz and mini_newsgroups.tar.gz to the hadoop cluster. mini_newsgroups.tar.gz has 100 files in each of 20 news groups. 20_newsgroups.tar.gz has 1000 files in each of 20 news groups.

Summary of this exercise:

We will tokenize words in about 2,000 articles and distill features out of those tokens, and from there, we build a Machine Learning model that can tell if an article is mainly about the subject of Science. We fine tune the model to achieve a high accuracy >95%.

Next, we will apply the model to 20,000 articles and run the model on my 6-node cluster in parallel. This is to verify the Spark ML performance and of course further validate the accuracy of the model.

Step 1: Get the sample files and upload to HDFS

On nnode1, while login as Hadoop, run following to get sample files.


[hadoop@nnode1 ~]$ mkdir spark
[hadoop@nnode1 ~]$ cd spark
[hadoop@nnode1 spark]$ mkdir 20newsgroup
[hadoop@nnode1 spark]$ cd 20newsgroup
[hadoop@nnode1 20newsgroup]$ wget https://kdd.ics.uci.edu/databases/20newsgroups/mini_newsgroups.tar.gz
[hadoop@nnode1 20newsgroup]$ wget https://kdd.ics.uci.edu/databases/20newsgroups/20_newsgroups.tar.gz
[hadoop@nnode1 20newsgroup]$ tar xvfz mini_newsgroups.tar.gz
[hadoop@nnode1 20newsgroup]$ tar xvfz 20_newsgroups.tar.gz
[hadoop@nnode1 20newsgroup]$ hadoop fs -mkdir newsgroupml
[hadoop@nnode1 20newsgroup]$ hadoop fs -put ./mini_newsgroups/* newsgroupml
[hadoop@nnode1 20newsgroup]$ hadoop fs -mkdir sparkmltextclass
[hadoop@nnode1 20newsgroup]$ hadoop fs -put ./20_newsgroups/* sparkmltextclass

Step 2: Build a Spark Machine Learning model


import spark.implicits._
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, Tokenizer, StopWordsRemover, IDF}
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

// Create a RDD from list of files, wholeTextFiles reader will scoop all sub-folders
val path = "newsgroupml/*"
val newsgroupsRawData = sc.wholeTextFiles(path)

// These are for data analysis. You can show contents to get familiar with the data structure
val filepath = newsgroupsRawData.map{case(filepath, text) => (filepath)}
val text = newsgroupsRawData.map{case(filepath, text) => (text)}
val id = filepath.map(filepath => (filepath.split("/").takeRight(1))(0))
val topic = filepath.map(filepath => (filepath.split("/").takeRight(2))(0))

// Create a data frame from the RDD, which has the list of files (file path, text)
case class newsgroupsCaseClass(id: String, text: String, topic: String)
val newsgroups = newsgroupsRawData.map{case (filepath, text) =>
val id = filepath.split("/").takeRight(1)(0)
val topic = filepath.split("/").takeRight(2)(0)
newsgroupsCaseClass(id, text, topic)}.toDF()

// Peek into the data frame
newsgroups.cache()
newsgroups.printSchema()
newsgroups.sample(false, 0.005, 10L).show(5)
newsgroups.groupBy("topic").count().show()

// Create the column “label” from the column “topic”
// We know when the topic starts with “sci”, then the article is about science
// Therefore, the label should be one. This is the information we use to train and just the model.

val labelednewsgroups = newsgroups.withColumn("label", newsgroups("topic").like("sci%").cast("double"))

// We break the data set into 2, one set for training the model, the other set for testing
val Array(training, test) = labelednewsgroups.randomSplit(Array(0.9, 0.1), seed = 12345)

// We create a pipeline of transformers and estimators
// These are the core components of the model
// At this moment, the model only get some default parameters
// These parameters are to be fine tuned in later part of the program

val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
val remover = new StopWordsRemover().setInputCol("words").setOutputCol("filtered").setCaseSensitive(false)
val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol("filtered").setOutputCol("rawFeatures")
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features").setMinDocFreq(0)
val lr = new LogisticRegression().setRegParam(0.01).setThreshold(0.5).setMaxIter(10)
val pipeline = new Pipeline().setStages(Array(tokenizer, remover, hashingTF, idf, lr))

// Peek into the default parameters
println(tokenizer.explainParams())
println(remover.explainParams())
remover.getStopWords.foreach(println)

// We “fit” the model using the training data. “fit” is the word used when we train the model
// By training, the model will infer key correlations among features

val model = pipeline.fit(training)

// Once the model learned how to derived the subject of the articles by analyzing the words in each article
// We ask the model to give it a try using the test data
// Here the model will not use the actual label in the testing data
// Instead, it will use the information it learned to predict a label for each article
// If the prediction is 1 then the article is guessed to be a science article

val predications = model.transform(test)

// Pick into the results, see how accurate it is from the first pass
// You shall see some times it predicts correctly, other times it predicts incorrectly

predications.select("id","topic","probability","prediction","label").
sample(false,0.01,10L).show(5)
predications.select("id","topic","probability","prediction","label").
filter(predications("topic").like("comp%")).
sample(false,0.1,10L).show(5)
predications.sample(false,0.01,10L).show(5)

// The best way to just the model’s performance is to check the true position vs false positive (odds).
// Any accuracy close to or less than 0.5 means it is no good.
// 0.5 accuracy is like flipping coins – a pure guess

val evaluator = new BinaryClassificationEvaluator()
println("Area under the ROC curve = " + evaluator.evaluate(predications))

Note the accuracy is only 84.7%
Area under the ROC curve = 0.8476355247981538

Step 3: Tune the Spark Machine Learning model

Next, we will tune the parameters. Note that we have 2000 articles. There are many more than 1000 different words in them. So if we limit the number of features to 1000, that for sure will cause skew. We will try to create 1K, 10K, and 100K buckets, and see if that will increase the accuracy.

val paramGrid = new ParamGridBuilder().
addGrid(hashingTF.numFeatures, Array(1000,10000,100000)).
addGrid(idf.minDocFreq, Array(0, 10, 100)).
build()
val cv = new CrossValidator().
setEstimator(pipeline).
setEvaluator(evaluator).
setEstimatorParamMaps(paramGrid).
setNumFolds(2)
val cvModel = cv.fit(training)
println("Area under the ROC curve for best fitted model = " + evaluator.evaluate(cvModel.transform(test)))
cvModel.transform(test).sample(false, 0.01, 0L).show(5)

After tuning the parameters, the model predication accuracy improved to 96.5%. We can also sample records see if prediction is the same as actual label.

Step 4: Test Spark ML performance

Next, we will use the same model to process 20K documents. Since the data volume is much higher, we will break the data set into 10 partitions. And we will run the learning from the cluster, using all 4 nodes.

This is not a great performance test as we are running the cluster on a shoe string. But the idea is the check how well Spark parallel executions works.

To do that, we will put all code into a scala file. And we will submit the job to Yarn to run it in parallel. Note the code is somewhat changed. That is because the scala in interactive mode is different. Here we run scala from command line, and it will call a slightly newer version scala.

Source code TextClassificationWithSparkML.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{HashingTF, Tokenizer, StopWordsRemover, IDF}
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.ml.linalg.Vector

object TextClassificationWithSparkML {
case class newsgroupsCaseClass(id: String, text: String, topic: String)

def main(args: Array[String]) {
val spark = SparkSession.builder.appName("News Groups Text Classification Application").getOrCreate()
import spark.implicits._

val path = "/user/hadoop/sparkmltextclass/*"
val newsgroupsRawData = spark.sparkContext.wholeTextFiles(path, 10)
val newsgroups = newsgroupsRawData.map{case(filepath,text) =>
newsgroupsCaseClass(filepath.split("/").takeRight(1)(0),
text,
filepath.split("/").takeRight(2)(0))}
.toDF("id", "text", "topic")

newsgroups.cache()
println("Total files read: " + newsgroups.count())

val labelednewsgroups = newsgroups.withColumn("label", newsgroups("topic").like("sci%").cast("double"))
val Array(training, test) = labelednewsgroups.randomSplit(Array(0.85, 0.15), seed = 999)

val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
val remover = new StopWordsRemover().setInputCol("words").setOutputCol("filtered").setCaseSensitive(false)
val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol("filtered").setOutputCol("tokenHash")
val idf = new IDF().setInputCol("tokenHash").setOutputCol("features").setMinDocFreq(0)
val lr = new LogisticRegression().setRegParam(0.01).setThreshold(0.5).setMaxIter(10)
val pipeline = new Pipeline().setStages(Array(tokenizer, remover, hashingTF, idf, lr))

println("Tokenizer parameters:\n" + tokenizer.explainParams() + "\n")
println("Stop-word remover parameters:\n" + remover.explainParams() + "\n")
println("Term Frequency transformer parameters:\n" + hashingTF.explainParams() + "\n")
println("Inverse Document Frequency transformer parameters:\n" + idf.explainParams() + "\n")
println("Logistic Regression estimator parameters:\n" + lr.explainParams() + "\n")

println("Stop words: \n")
remover.getStopWords.foreach(println)

val model = pipeline.fit(training)
val predications = model.transform(test)
predications
.select("id","topic","probability","prediction","label")
.sample(false,0.01,10L)
.show(5)
predications
.select("id","topic","probability","prediction","label")
.filter(predications("topic").like("sci%"))
.sample(false,0.1,10L)
.show(5)
predications.sample(false,0.01,10L).show(5)

val evaluator = new BinaryClassificationEvaluator()
println("Area under the ROC curve = " + evaluator.evaluate(predications))

val paramGrid = new ParamGridBuilder()
.addGrid(hashingTF.numFeatures, Array(1000,10000,100000))
.addGrid(idf.minDocFreq, Array(0, 10, 100))
.build()
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2)
val cvModel = cv.fit(training)
println("Area under the ROC curve for best fitted model = " + evaluator.evaluate(cvModel.transform(test)))
cvModel.transform(test).sample(false, 0.01, 0L).show(5)

// Stop the Spark Context
spark.stop()
}

After compiling the code, we submit the job through following command.

spark-submit --class "TextClassificationWithSparkML" --master yarn --deploy-mode cluster --num-executors 10 target/scala-2.11/practice_2.11-1.0.jar

Once the job started running, we monitor it from Yarn. And we can see how much resource the application is consuming. And we also see there are 10 tasks running across 4 nodes.

The whole job takes about 16 minutes to finish, which is not too bad, given our data size and the cluster configuration.

Next we drill down in Yarn to the application master’s log files, and there we see the accuracy before tuning is 91%. And after tuning it is about 99%. Looks the sample size helped a lot. The tuned accuracy improvement from 96% to 99% is a big deal.

Area under the ROC curve = 0.9130221762216543
Area under the ROC curve for best fitted model = 0.9944581561528715

Additional configuration

Note in this practiced, we used Spark ML in a Scala program. To use Spark ML in a python program, there is an additional dependency on NumPy, which is a scientific computation package for Python that supports sophisticated multiple dimensional array computation. This dependency is documented here. NumPy is also available for download from Source Forge. Without NumPy, Spark ML will not run in Python programs.

Spark ML for Scala and Java also has a dependency. The dependency is on netlib-java. However, JDK has a simulation. Therefore, the Spark ML function will not break if it is used with Scala or Java. See Spark Dependencies for more information.

On Oracle Linux 6 or 7, if you have already setup Oracle public yum repository properly, then NumPy can be installed by running:
sudo yum install numpy