Spark MLLib

MLLib Pipeline

Generally, use of MLLIb for supervised and unsupervised learning follow some or all of the stages in the following template:

  • Get data
  • Pre-process the data
  • Convert data to a form that MLLib functions require (*)
  • Build a model
  • Optimize and fit the model to the data
  • Post-processing and model evaluation

This is often assembled as a pipeline for convenience and reproducibility. This is very similar to what you would do with sklearn, except that MLLib allows you to handle massive datasets by distributing the analysis to multiple computers.

Set up Spark and Spark SQL contexts

In [1]:
%%spark
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
151application_1522938745830_0108pysparkidleLinkLink
SparkSession available as 'spark'.

Spark MLLib imports

The older mllib package works on RDDs. The newer ml package works on DataFrames. We will show examples using both, but it is more convenient to use the ml package.

In [2]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import PCA
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.clustering import GaussianMixture

Unsupervised Learning

We saw this machine learning problem previously with sklearn, where the task is to distinguish rocks from mines using 60 sonar numerical features. We will illustrate some of the mechanics of how to work with MLLib - this is not intended to be a serious attempt at modeling the data.

Obtain data

NAME: Sonar, Mines vs. Rocks

SUMMARY: This is the data set used by Gorman and Sejnowski in their study of the classification of sonar signals using a neural network [1]. The task is to train a network to discriminate between sonar signals bounced off a metal cylinder and those bounced off a roughly cylindrical rock.

SOURCE: The data set was contributed to the benchmark collection by Terry Sejnowski, now at the Salk Institute and the University of California at San Deigo. The data set was developed in collaboration with R. Paul Gorman of Allied-Signal Aerospace Technology Center.

See description

In [3]:
import pandas as pd
In [4]:
url = 'https://astro.temple.edu/~alan/sonar_all-data.txt'
data = pd.read_csv(url, header=None)
In [5]:
data.shape
(208, 61)
In [6]:
print(data.iloc[:3, :5])
        0       1       2       3       4
0  0.0200  0.0371  0.0428  0.0207  0.0954
1  0.0453  0.0523  0.0843  0.0689  0.1183
2  0.0262  0.0582  0.1099  0.1083  0.0974
In [7]:
print(data.iloc[:3, -5:])
       56      57      58      59 60
0  0.0180  0.0084  0.0090  0.0032  R
1  0.0140  0.0049  0.0052  0.0044  R
2  0.0316  0.0164  0.0095  0.0078  R
In [8]:
cols = ['C%02d' % i for i in range(60)] + ['raw_label']
df = spark.createDataFrame(data, cols)
In [9]:
df.printSchema()
root
 |-- C00: double (nullable = true)
 |-- C01: double (nullable = true)
 |-- C02: double (nullable = true)
 |-- C03: double (nullable = true)
 |-- C04: double (nullable = true)
 |-- C05: double (nullable = true)
 |-- C06: double (nullable = true)
 |-- C07: double (nullable = true)
 |-- C08: double (nullable = true)
 |-- C09: double (nullable = true)
 |-- C10: double (nullable = true)
 |-- C11: double (nullable = true)
 |-- C12: double (nullable = true)
 |-- C13: double (nullable = true)
 |-- C14: double (nullable = true)
 |-- C15: double (nullable = true)
 |-- C16: double (nullable = true)
 |-- C17: double (nullable = true)
 |-- C18: double (nullable = true)
 |-- C19: double (nullable = true)
 |-- C20: double (nullable = true)
 |-- C21: double (nullable = true)
 |-- C22: double (nullable = true)
 |-- C23: double (nullable = true)
 |-- C24: double (nullable = true)
 |-- C25: double (nullable = true)
 |-- C26: double (nullable = true)
 |-- C27: double (nullable = true)
 |-- C28: double (nullable = true)
 |-- C29: double (nullable = true)
 |-- C30: double (nullable = true)
 |-- C31: double (nullable = true)
 |-- C32: double (nullable = true)
 |-- C33: double (nullable = true)
 |-- C34: double (nullable = true)
 |-- C35: double (nullable = true)
 |-- C36: double (nullable = true)
 |-- C37: double (nullable = true)
 |-- C38: double (nullable = true)
 |-- C39: double (nullable = true)
 |-- C40: double (nullable = true)
 |-- C41: double (nullable = true)
 |-- C42: double (nullable = true)
 |-- C43: double (nullable = true)
 |-- C44: double (nullable = true)
 |-- C45: double (nullable = true)
 |-- C46: double (nullable = true)
 |-- C47: double (nullable = true)
 |-- C48: double (nullable = true)
 |-- C49: double (nullable = true)
 |-- C50: double (nullable = true)
 |-- C51: double (nullable = true)
 |-- C52: double (nullable = true)
 |-- C53: double (nullable = true)
 |-- C54: double (nullable = true)
 |-- C55: double (nullable = true)
 |-- C56: double (nullable = true)
 |-- C57: double (nullable = true)
 |-- C58: double (nullable = true)
 |-- C59: double (nullable = true)
 |-- raw_label: string (nullable = true)

Pre-process the data

Transform 60 features into MMlib vectors

In [10]:
assembler = VectorAssembler(
    inputCols=['C%02d' % i for i in range(60)],
    outputCol="raw_features")
output = assembler.transform(df)
In [11]:
output.show(3)
+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+---------+--------------------+
|   C00|   C01|   C02|   C03|   C04|   C05|   C06|   C07|   C08|   C09|   C10|   C11|   C12|   C13|   C14|   C15|   C16|   C17|   C18|   C19|   C20|   C21|   C22|   C23|   C24|   C25|   C26|   C27|   C28|   C29|   C30|   C31|   C32|   C33|   C34|   C35|   C36|   C37|   C38|   C39|   C40|   C41|   C42|   C43|   C44|   C45|   C46|   C47|   C48|   C49|   C50|   C51|   C52|   C53|   C54|   C55|   C56|   C57|   C58|   C59|raw_label|        raw_features|
+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+---------+--------------------+
|  0.02|0.0371|0.0428|0.0207|0.0954|0.0986|0.1539|0.1601|0.3109|0.2111|0.1609|0.1582|0.2238|0.0645| 0.066|0.2273|  0.31|0.2999|0.5078|0.4797|0.5783|0.5071|0.4328| 0.555|0.6711|0.6415|0.7104| 0.808|0.6791|0.3857|0.1307|0.2604|0.5121|0.7547|0.8537|0.8507|0.6692|0.6097|0.4943|0.2744| 0.051|0.2834|0.2825|0.4256|0.2641|0.1386|0.1051|0.1343|0.0383|0.0324|0.0232|0.0027|0.0065|0.0159|0.0072|0.0167| 0.018|0.0084| 0.009|0.0032|        R|[0.02,0.0371,0.04...|
|0.0453|0.0523|0.0843|0.0689|0.1183|0.2583|0.2156|0.3481|0.3337|0.2872|0.4918|0.6552|0.6919|0.7797|0.7464|0.9444|   1.0|0.8874|0.8024|0.7818|0.5212|0.4052|0.3957|0.3914| 0.325|  0.32|0.3271|0.2767|0.4423|0.2028|0.3788|0.2947|0.1984|0.2341|0.1306|0.4182|0.3835|0.1057| 0.184| 0.197|0.1674|0.0583|0.1401|0.1628|0.0621|0.0203| 0.053|0.0742|0.0409|0.0061|0.0125|0.0084|0.0089|0.0048|0.0094|0.0191| 0.014|0.0049|0.0052|0.0044|        R|[0.0453,0.0523,0....|
|0.0262|0.0582|0.1099|0.1083|0.0974| 0.228|0.2431|0.3771|0.5598|0.6194|0.6333| 0.706|0.5544| 0.532|0.6479|0.6931|0.6759|0.7551|0.8929|0.8619|0.7974|0.6737|0.4293|0.3648|0.5331|0.2413| 0.507|0.8533|0.6036|0.8514|0.8512|0.5045|0.1862|0.2709|0.4232|0.3043|0.6116|0.6756|0.5375|0.4719|0.4647|0.2587|0.2129|0.2222|0.2111|0.0176|0.1348|0.0744| 0.013|0.0106|0.0033|0.0232|0.0166|0.0095| 0.018|0.0244|0.0316|0.0164|0.0095|0.0078|        R|[0.0262,0.0582,0....|
+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+---------+--------------------+
only showing top 3 rows

Scale features to have zero mean and unit standard deviation

In [12]:
standardizer = StandardScaler(withMean=True, withStd=True,
                              inputCol='raw_features',
                              outputCol='features')
model = standardizer.fit(output)
output = model.transform(output)

Convert label to numeric index

In [13]:
indexer = StringIndexer(inputCol="raw_label", outputCol="label")
indexed = indexer.fit(output).transform(output)

Extract only columns of interest

In [14]:
sonar = indexed.select(['features', 'label'])
In [15]:
sonar.show(n=3)
+--------------------+-----+
|            features|label|
+--------------------+-----+
|[-0.3985897356694...|  1.0|
|[0.70184498705605...|  1.0|
|[-0.1289179854363...|  1.0|
+--------------------+-----+
only showing top 3 rows

Data conversion

We will first fit a Gaussian Mixture Model with 2 components to the first 2 principal components of the data as an example of unsupervised learning.

In [16]:
import numpy as np
In [17]:
pca = PCA(k=2, inputCol="features", outputCol="pca")
model = pca.fit(sonar)
transformed = model.transform(sonar)
In [18]:
transformed.show(3)
+--------------------+-----+--------------------+
|            features|label|                 pca|
+--------------------+-----+--------------------+
|[-0.3985897356694...|  1.0|[-1.9165444107164...|
|[0.70184498705605...|  1.0|[0.47896904316843...|
|[-0.1289179854363...|  1.0|[-3.8499400285258...|
+--------------------+-----+--------------------+
only showing top 3 rows
In [19]:
features = transformed.select('pca')
In [20]:
features = transformed.select('pca').rdd.map(lambda x: np.array(x))
In [21]:
features.take(3)
[array([[-1.91654441,  1.36759373]]), array([[ 0.47896904, -7.56812953]]), array([[-3.84994003, -6.42436107]])]

Build Model

In [22]:
gmm = GaussianMixture(k=2, seed=123, featuresCol='pca')
In [23]:
model = gmm.fit(transformed)
In [24]:
transformed2 = model.transform(transformed)
In [25]:
transformed2.show(4)
+--------------------+-----+--------------------+----------+--------------------+
|            features|label|                 pca|prediction|         probability|
+--------------------+-----+--------------------+----------+--------------------+
|[-0.3985897356694...|  1.0|[-1.9165444107164...|         0|[0.62298055982439...|
|[0.70184498705605...|  1.0|[0.47896904316843...|         0|[0.99997018397790...|
|[-0.1289179854363...|  1.0|[-3.8499400285258...|         0|[0.83185871352854...|
|[-0.8335441715294...|  1.0|[-4.5863546250792...|         1|[0.00590393467266...|
+--------------------+-----+--------------------+----------+--------------------+
only showing top 4 rows

Get fitted Guassian parameters as DataFrame

In [26]:
model.gaussiansDF.show(truncate=False)
+---------------------------------------+---------------------------------------------------------------------------------+
|mean                                   |cov                                                                              |
+---------------------------------------+---------------------------------------------------------------------------------+
|[1.45630530064182,-0.37037164792437044]|3.615993244446481   1.5178686905624637
1.5178686905624637  10.969615585723204  |
|[-5.357055209406797,1.362421303455668] |7.039461019406999   3.6991498391254107
3.6991498391254107  10.078828243534241  |
+---------------------------------------+---------------------------------------------------------------------------------+

Supervised Learning

We will fit a logistic regression model to the data as an example of supervised learning.

In [27]:
sonar.show(n=3)
+--------------------+-----+
|            features|label|
+--------------------+-----+
|[-0.3985897356694...|  1.0|
|[0.70184498705605...|  1.0|
|[-0.1289179854363...|  1.0|
+--------------------+-----+
only showing top 3 rows

Using ml for logistic regression

Convert to format expected by regression functions in mllib

In [28]:
sonar.printSchema()
root
 |-- features: vector (nullable = true)
 |-- label: double (nullable = true)
In [29]:
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

Split into test and train sets

In [30]:
train, test = sonar.randomSplit([0.7, 0.3])

Fit model to training data

In [31]:
train.show(4)
+--------------------+-----+
|            features|label|
+--------------------+-----+
|[-1.0988663774039...|  1.0|
|[-0.9727295910045...|  1.0|
|[-0.9248846030599...|  1.0|
|[-0.8770396151153...|  1.0|
+--------------------+-----+
only showing top 4 rows
In [32]:
model = lr.fit(train)

Evaluate on test data

In [33]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
In [34]:
predictions = model.transform(test)
In [35]:
predictions.show(3)
+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[-0.9031368812669...|  1.0|[-3.9291177051083...|[0.01928190985745...|       1.0|
|[-0.8726900707567...|  1.0|[-4.1830133029604...|[0.01502333483134...|       1.0|
|[-0.8335441715294...|  1.0|[0.47133208472102...|[0.61569899448073...|       0.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 3 rows
In [36]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
In [37]:
evaluator.getMetricName()
'areaUnderROC'
In [38]:
evaluator.evaluate(predictions)
0.8369905956112853

Using the ml pipeline

We build a pipeline to preoprcess and fit a logistic regression model to the original DataFrame. The pipeline stages consist of

  • Convert featrue columns in DataFrame into a vector of features
  • Scele features to have zero mean and unit standard deviation
  • Convert string labels into numeric labels
  • Reduce dimensionality using PCA with the first 5 PCs
  • Run logistic regression to predict the labels from the feature vector of 5 Principal Components
In [39]:
df.show(1)
+----+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+----+------+------+------+------+------+------+-----+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+-----+------+---------+
| C00|   C01|   C02|   C03|   C04|   C05|   C06|   C07|   C08|   C09|   C10|   C11|   C12|   C13|  C14|   C15| C16|   C17|   C18|   C19|   C20|   C21|   C22|  C23|   C24|   C25|   C26|  C27|   C28|   C29|   C30|   C31|   C32|   C33|   C34|   C35|   C36|   C37|   C38|   C39|  C40|   C41|   C42|   C43|   C44|   C45|   C46|   C47|   C48|   C49|   C50|   C51|   C52|   C53|   C54|   C55|  C56|   C57|  C58|   C59|raw_label|
+----+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+----+------+------+------+------+------+------+-----+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+-----+------+---------+
|0.02|0.0371|0.0428|0.0207|0.0954|0.0986|0.1539|0.1601|0.3109|0.2111|0.1609|0.1582|0.2238|0.0645|0.066|0.2273|0.31|0.2999|0.5078|0.4797|0.5783|0.5071|0.4328|0.555|0.6711|0.6415|0.7104|0.808|0.6791|0.3857|0.1307|0.2604|0.5121|0.7547|0.8537|0.8507|0.6692|0.6097|0.4943|0.2744|0.051|0.2834|0.2825|0.4256|0.2641|0.1386|0.1051|0.1343|0.0383|0.0324|0.0232|0.0027|0.0065|0.0159|0.0072|0.0167|0.018|0.0084|0.009|0.0032|        R|
+----+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+----+------+------+------+------+------+------+-----+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+------+------+------+------+------+------+------+------+------+------+------+------+------+------+-----+------+-----+------+---------+
only showing top 1 row
In [40]:
transformer = VectorAssembler(
    inputCols=['C%02d' % i for i in range(60)],
    outputCol="raw_features"
)
standardizer = StandardScaler(
    withMean=True,
    withStd=True,
    inputCol='raw_features',
    outputCol='features'
)
indexer = StringIndexer(
    inputCol="raw_label",
    outputCol="label"
)
pca = PCA(
    k=5,
    inputCol="features",
    outputCol="pca"
)
lr = LogisticRegression(
    featuresCol='features',
    labelCol='label'
)

pipeline = Pipeline(stages=[transformer, standardizer, indexer, pca, lr])
In [41]:
train, test = df.randomSplit([0.7, 0.3])
In [42]:
model = pipeline.fit(train)
In [43]:
import warnings

with warnings.catch_warnings():
    warnings.simplefilter('ignore')
    prediction = model.transform(test)
In [44]:
score = prediction.select(['label', 'prediction'])
score.show(n=score.count())
+-----+----------+
|label|prediction|
+-----+----------+
|  1.0|       1.0|
|  1.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  0.0|       1.0|
|  1.0|       0.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       1.0|
|  1.0|       0.0|
|  1.0|       1.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  0.0|       1.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       1.0|
|  0.0|       0.0|
|  0.0|       1.0|
|  0.0|       0.0|
|  0.0|       1.0|
|  0.0|       0.0|
|  0.0|       1.0|
|  0.0|       0.0|
|  0.0|       1.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       1.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       1.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
+-----+----------+

Evaluate accuracy

In [45]:
acc = score.rdd.map(lambda x: x[0] == x[1]).sum() / float(score.count())
acc
0.7796610169491526
In [46]:
spark.stop()